Skip to content

API Reference

discobase.database

References = Annotated[T, WrapValidator(_validate_ref)] module-attribute

T = TypeVar('T', bound=Type[Table]) module-attribute

__all__ = ('Database', 'References') module-attribute

Database

Top level class representing a Discord database bot controller.

Source code in src/discobase/database.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
class Database:
    """
    Top level class representing a Discord
    database bot controller.
    """

    def __init__(self, name: str) -> None:
        """
        Args:
            name: Name of the Discord server that will be used as the database.
        """
        self.name = name
        """Name of the Discord-database server."""
        self.bot = commands.Bot(
            intents=discord.Intents.all(),
            command_prefix="!",
        )
        """discord.py `Bot` instance."""
        self.guild: discord.Guild | None = None
        """discord.py `Guild` used as the database server."""
        self.tables: dict[str, type[Table]] = {}
        """Dictionary of `Table` objects attached to this database."""
        self.open: bool = False
        """Whether the database is connected."""
        self._metadata_channel: discord.TextChannel | None = None
        """discord.py `TextChannel` that acts as the metadata channel."""
        self._database_cursors: dict[str, TableCursor] = {}
        """A dictionary containing all of the table `Metadata` entries"""
        self._task: asyncio.Task[None] | None = None
        self.bot.db = self
        # We need to keep a strong reference to the free-flying
        # task
        self._setup_event = asyncio.Event()
        self._internal_setup_event = asyncio.Event()
        self._on_ready_exc: BaseException | None = None

        # Here be dragons: https://github.com/ZeroIntensity/discobase/issues/49
        #
        # `on_ready` in discord.py swallows all exceptions, which
        # goes against some connect-and-disconnect behavior
        # that we want to allow in discobase.
        #
        # We need to store the exception, and then raise in wait_ready()
        # in order to properly handle it, otherwise the discord.py
        # logger just swallows it and pretends nothing happened.
        #
        # This also caused a deadlock with _setup_event, which caused
        # CI to run indefinitely.
        @self.bot.event
        @logger.catch(reraise=True)
        async def on_ready() -> None:
            try:
                await self.init()
            except BaseException as e:
                await self.bot.close()
                if self._task:
                    self._task.cancel("bot startup failed")

                self._setup_event.set()
                self._on_ready_exc = e
                raise  # This is swallowed!

    def _not_connected(self) -> NoReturn:
        """
        Complain about the database not being connected.

        Generally speaking, this is called when `guild` or something
        other is `None`.
        """

        raise NotConnectedError(
            "not connected to the database, did you forget to login?"
        )

    async def _metadata_init(self) -> discord.TextChannel:
        """
        Find the metadata channel.
        If it doesn't exist, this method creates one.

        Returns:
            discord.TextChannel: The metadata channel, either created or found.
        """
        metadata_channel_name = "_dbmetadata"
        found_channel: discord.TextChannel | None = None
        assert self.guild is not None

        for channel in self.guild.text_channels:
            if channel.name == metadata_channel_name:
                found_channel = channel
                logger.info("Found metadata channel!")
                break

        return found_channel or await self.guild.create_text_channel(
            name=metadata_channel_name
        )

    # This needs to be async for use in gather()
    async def _set_open(self) -> None:
        logger.debug("_set_open waiting on internal setup event")
        await self._internal_setup_event.wait()
        logger.debug(
            "Internal setup event dispatched! Database has been marked as open."  # noqa
        )
        self.open = True
        # See https://github.com/ZeroIntensity/discobase/issues/68
        #
        # If `wait_ready()` is never called, then the error does not propagate.
        if self._on_ready_exc:
            raise self._on_ready_exc

    async def init(self) -> None:
        """
        Initializes the database server.

        Generally, you don't want to call this manually, but
        this is considered to be a public interface.
        """
        logger.info("Initializing the bot.")
        # Load external commands
        coros: list[asyncio.Task] = []
        for module in iter_modules(path=["cogs"], prefix="cogs."):
            logger.debug(f"Loading module with cog: {module}")
            coros.append(
                asyncio.create_task(self.bot.load_extension(module.name))
            )

        await asyncio.gather(*coros)
        logger.info("Syncing slash commands, this might take a minute.")
        await self.bot.tree.sync()
        logger.info("Waiting until bot is logged in.")
        await self.bot.wait_until_ready()
        logger.info("Bot is ready!")
        found_guild: discord.Guild | None = None
        for guild in self.bot.guilds:
            if guild.name == self.name:
                found_guild = guild
                break

        if not found_guild:
            logger.info("No guild found, making one.")
            self.guild = await self.bot.create_guild(name=self.name)
        else:
            logger.info("Found an existing guild.")
            self.guild = found_guild

        # Unlock database, but don't wakeup the user.
        self._internal_setup_event.set()
        await self.build_tables()
        # At this point, the database is marked as "ready" to the user.
        self._setup_event.set()

        assert self._metadata_channel is not None
        logger.info(
            f"Invite to server: {await self._metadata_channel.create_invite()}"
        )

    async def build_tables(self) -> None:
        """
        Generate all internal metadata and construct tables.

        Calling this manually is useful if e.g. you want
        to load tables *after* calling `login` (or `login_task`, or
        `conn`, same difference.)

        This method is safe to call multiple times.

        Example:
            ```py
            import asyncio
            import discobase

            async def main():
                db = discobase.Database("My database")
                db.login_task("My bot token")

                @db.table
                class MyLateTable(discobase.Table):
                    something: str

                # Load MyLateTable into database
                await db.build_tables()

            asyncio.run(main())
            ```
        """
        if not self.guild:
            self._not_connected()

        self._metadata_channel = await self._metadata_init()
        tasks = [
            asyncio.ensure_future(
                TableCursor.create_table(
                    table,
                    self._metadata_channel,
                    self.guild,
                )
            )
            for table in self.tables.values()
        ]
        logger.debug(f"Creating tables with gather(): {tasks}")
        await asyncio.gather(*tasks)

    async def wait_ready(self) -> None:
        """
        Wait until the database is ready.
        """
        logger.info("Waiting until the database is ready.")
        await self._setup_event.wait()
        logger.info("Done waiting!")
        # See #49, we need to propagate errors in `on_ready` here.
        if self._on_ready_exc:
            logger.error("on_ready() failed, propagating now.")
            raise self._on_ready_exc

    def _find_channel(self, cid: int) -> discord.TextChannel:
        # TODO: Implement caching for this function.
        if not self.guild:
            self._not_connected()

        index_channel = [
            channel for channel in self.guild.channels if channel.id == cid
        ][0]

        if not isinstance(index_channel, discord.TextChannel):
            raise DatabaseCorruptionError(
                f"expected {index_channel!r} to be a TextChannel"
            )

        logger.debug(f"Found channel ID {cid}: {index_channel!r}")
        return index_channel

    async def clean(self) -> None:
        """
        Perform a full clean of the database.

        This method erases all metadata, all entries, and all tables. After
        calling this, a server loses any trace of the database ever being
        there.

        Note that this does *not* clean the existing tables from memory, but
        instead just marks them all as not ready.

        This action is irreversible.
        """
        if not self._metadata_channel:
            self._not_connected()

        logger.info("Cleaning the database!")

        coros: list[Coroutine] = []
        for table, cursor in self._database_cursors.items():
            metadata = cursor.metadata
            logger.info(f"Cleaning table {table}")
            table_channel = self._find_channel(metadata.table_channel)
            coros.append(table_channel.delete())

            for cid in metadata.index_channels.values():
                channel = self._find_channel(cid)
                coros.append(channel.delete())

        for schema in self.tables.values():
            schema.__disco_cursor__ = None

        logger.debug(f"Gathering deletion coros: {coros}")
        await asyncio.gather(*coros)
        logger.info("Deleting database metadata.")
        self._database_cursors = {}
        await self._metadata_channel.delete()

    async def login(self, bot_token: str) -> None:
        """
        Start running the bot.

        Args:
            bot_token: Discord API bot token to log in with.
        """
        if self.open:
            raise RuntimeError(
                "connection is already open, did you call login() twice?"
            )

        # We use _set_open() with a gather to keep a finer link
        # between the `open` attribute and whether it's actually
        # running.
        await asyncio.gather(self.bot.start(token=bot_token), self._set_open())

    def login_task(self, bot_token: str) -> asyncio.Task[None]:
        """
        Call `login()` as a free-flying task, instead of
        blocking the event loop.

        Note that this method stores a reference to the created
        task object, allowing it to be truly "free-flying."

        Args:
            bot_token: Discord API bot token to log in to.

        Returns:
            asyncio.Task[None]: The created `asyncio.Task` object.
            Note that the database will store this internally, so you
            don't have to worry about losing the reference. By default,
            this task will never get `await`ed, so this function will not
            keep the event loop running. If you want to keep the event loop
            running, make sure to `await` the returned task object later.

        Example:
            ```py
            import asyncio
            import os

            import discobase


            async def main():
                db = discobase.Database("test")
                task = await db.login_task("...")
                await db.wait_ready()
                # ...
                await task  # Keep the event loop running

            asyncio.run(main())
            ```
        """
        task = asyncio.create_task(self.login(bot_token))
        self._task = task
        return task

    async def close(self) -> None:
        """
        Close the bot connection.
        """
        if not self.open:
            # If something went wrong in startup, for example, then
            # we need to release the setup lock.
            self._setup_event.set()
            raise ValueError(
                "cannot close a connection that is not open",
            )
        self.open = False
        await self.bot.close()

    @asynccontextmanager
    async def conn(self, bot_token: str):
        """
        Connect to the bot under a context manager.
        This is the recommended method to use for logging in.

        Args:
            bot_token: Discord API bot token to log in to.

        Returns:
            AsyncGeneratorContextManager: An asynchronous context manager.
            See `contextlib.asynccontextmanager` for details.

        Example:
            ```py
            import asyncio
            import os

            import discobase


            async def main():
                db = discobase.Database("test")
                async with db.conn(os.getenv("BOT_TOKEN")):
                    ...  # Your database code


            asyncio.run(main())
            ```
        """
        try:
            self.login_task(bot_token)
            await self.wait_ready()
            yield
        finally:
            if self.open:  # Something could have gone wrong
                await self.close()

    def table(self, clas: T) -> T:
        """
        Mark a `Table` type as part of a database.
        This method is meant to be used as a decorator.

        Args:
            clas: Type object to attach.

        Example:
            ```py
            import discobase

            db = discobase.Database("My database")

            @db.table
            class MySchema(discobase.Table):
                foo: int
                bar: str

            # ...
            ```

        Returns:
            Type[Table]: The same object passed to `clas` -- this is in order
            to allow use as a decorator.
        """
        if not issubclass(clas, Table):
            raise DatabaseTableError(
                f"{clas} is not a subclass of discobase.Table, did you forget it?",  # noqa
            )

        clas.__disco_name__ = clas.__name__.lower()
        if clas.__disco_name__ in self.tables:
            raise DatabaseTableError(f"table {clas.__name__} already exists")

        if clas.__disco_database__ is not None:
            raise DatabaseTableError(
                f"{clas!r} can only be attached to one database"
            )

        clas.__disco_database__ = self

        # This is up for criticism -- instead of using Pydantic's
        # `model_fields` attribute, we invent our own `__disco_keys__` instead.
        #
        # Partially, this is due to the fact that we want `__disco_keys__` to
        # be, more or less, stable throughout the codebase.
        #
        # However, I don't think Pydantic would mess with `model_fields`, as
        # that's a public API, and hence why this could possibly be
        # considered as bad design.
        for field in clas.model_fields:
            clas.__disco_keys__.add(field)

        self.tables[clas.__disco_name__] = clas
        return clas

bot = commands.Bot(intents=discord.Intents.all(), command_prefix='!') instance-attribute

discord.py Bot instance.

guild: discord.Guild | None = None instance-attribute

discord.py Guild used as the database server.

name = name instance-attribute

Name of the Discord-database server.

open: bool = False instance-attribute

Whether the database is connected.

tables: dict[str, type[Table]] = {} instance-attribute

Dictionary of Table objects attached to this database.

__init__(name: str) -> None

Parameters:

Name Type Description Default
name str

Name of the Discord server that will be used as the database.

required
Source code in src/discobase/database.py
def __init__(self, name: str) -> None:
    """
    Args:
        name: Name of the Discord server that will be used as the database.
    """
    self.name = name
    """Name of the Discord-database server."""
    self.bot = commands.Bot(
        intents=discord.Intents.all(),
        command_prefix="!",
    )
    """discord.py `Bot` instance."""
    self.guild: discord.Guild | None = None
    """discord.py `Guild` used as the database server."""
    self.tables: dict[str, type[Table]] = {}
    """Dictionary of `Table` objects attached to this database."""
    self.open: bool = False
    """Whether the database is connected."""
    self._metadata_channel: discord.TextChannel | None = None
    """discord.py `TextChannel` that acts as the metadata channel."""
    self._database_cursors: dict[str, TableCursor] = {}
    """A dictionary containing all of the table `Metadata` entries"""
    self._task: asyncio.Task[None] | None = None
    self.bot.db = self
    # We need to keep a strong reference to the free-flying
    # task
    self._setup_event = asyncio.Event()
    self._internal_setup_event = asyncio.Event()
    self._on_ready_exc: BaseException | None = None

    # Here be dragons: https://github.com/ZeroIntensity/discobase/issues/49
    #
    # `on_ready` in discord.py swallows all exceptions, which
    # goes against some connect-and-disconnect behavior
    # that we want to allow in discobase.
    #
    # We need to store the exception, and then raise in wait_ready()
    # in order to properly handle it, otherwise the discord.py
    # logger just swallows it and pretends nothing happened.
    #
    # This also caused a deadlock with _setup_event, which caused
    # CI to run indefinitely.
    @self.bot.event
    @logger.catch(reraise=True)
    async def on_ready() -> None:
        try:
            await self.init()
        except BaseException as e:
            await self.bot.close()
            if self._task:
                self._task.cancel("bot startup failed")

            self._setup_event.set()
            self._on_ready_exc = e
            raise  # This is swallowed!

build_tables() -> None async

Generate all internal metadata and construct tables.

Calling this manually is useful if e.g. you want to load tables after calling login (or login_task, or conn, same difference.)

This method is safe to call multiple times.

Example
import asyncio
import discobase

async def main():
    db = discobase.Database("My database")
    db.login_task("My bot token")

    @db.table
    class MyLateTable(discobase.Table):
        something: str

    # Load MyLateTable into database
    await db.build_tables()

asyncio.run(main())
Source code in src/discobase/database.py
async def build_tables(self) -> None:
    """
    Generate all internal metadata and construct tables.

    Calling this manually is useful if e.g. you want
    to load tables *after* calling `login` (or `login_task`, or
    `conn`, same difference.)

    This method is safe to call multiple times.

    Example:
        ```py
        import asyncio
        import discobase

        async def main():
            db = discobase.Database("My database")
            db.login_task("My bot token")

            @db.table
            class MyLateTable(discobase.Table):
                something: str

            # Load MyLateTable into database
            await db.build_tables()

        asyncio.run(main())
        ```
    """
    if not self.guild:
        self._not_connected()

    self._metadata_channel = await self._metadata_init()
    tasks = [
        asyncio.ensure_future(
            TableCursor.create_table(
                table,
                self._metadata_channel,
                self.guild,
            )
        )
        for table in self.tables.values()
    ]
    logger.debug(f"Creating tables with gather(): {tasks}")
    await asyncio.gather(*tasks)

clean() -> None async

Perform a full clean of the database.

This method erases all metadata, all entries, and all tables. After calling this, a server loses any trace of the database ever being there.

Note that this does not clean the existing tables from memory, but instead just marks them all as not ready.

This action is irreversible.

Source code in src/discobase/database.py
async def clean(self) -> None:
    """
    Perform a full clean of the database.

    This method erases all metadata, all entries, and all tables. After
    calling this, a server loses any trace of the database ever being
    there.

    Note that this does *not* clean the existing tables from memory, but
    instead just marks them all as not ready.

    This action is irreversible.
    """
    if not self._metadata_channel:
        self._not_connected()

    logger.info("Cleaning the database!")

    coros: list[Coroutine] = []
    for table, cursor in self._database_cursors.items():
        metadata = cursor.metadata
        logger.info(f"Cleaning table {table}")
        table_channel = self._find_channel(metadata.table_channel)
        coros.append(table_channel.delete())

        for cid in metadata.index_channels.values():
            channel = self._find_channel(cid)
            coros.append(channel.delete())

    for schema in self.tables.values():
        schema.__disco_cursor__ = None

    logger.debug(f"Gathering deletion coros: {coros}")
    await asyncio.gather(*coros)
    logger.info("Deleting database metadata.")
    self._database_cursors = {}
    await self._metadata_channel.delete()

close() -> None async

Close the bot connection.

Source code in src/discobase/database.py
async def close(self) -> None:
    """
    Close the bot connection.
    """
    if not self.open:
        # If something went wrong in startup, for example, then
        # we need to release the setup lock.
        self._setup_event.set()
        raise ValueError(
            "cannot close a connection that is not open",
        )
    self.open = False
    await self.bot.close()

conn(bot_token: str) async

Connect to the bot under a context manager. This is the recommended method to use for logging in.

Parameters:

Name Type Description Default
bot_token str

Discord API bot token to log in to.

required

Returns:

Name Type Description
AsyncGeneratorContextManager

An asynchronous context manager.

See contextlib.asynccontextmanager for details.

Example
import asyncio
import os

import discobase


async def main():
    db = discobase.Database("test")
    async with db.conn(os.getenv("BOT_TOKEN")):
        ...  # Your database code


asyncio.run(main())
Source code in src/discobase/database.py
@asynccontextmanager
async def conn(self, bot_token: str):
    """
    Connect to the bot under a context manager.
    This is the recommended method to use for logging in.

    Args:
        bot_token: Discord API bot token to log in to.

    Returns:
        AsyncGeneratorContextManager: An asynchronous context manager.
        See `contextlib.asynccontextmanager` for details.

    Example:
        ```py
        import asyncio
        import os

        import discobase


        async def main():
            db = discobase.Database("test")
            async with db.conn(os.getenv("BOT_TOKEN")):
                ...  # Your database code


        asyncio.run(main())
        ```
    """
    try:
        self.login_task(bot_token)
        await self.wait_ready()
        yield
    finally:
        if self.open:  # Something could have gone wrong
            await self.close()

init() -> None async

Initializes the database server.

Generally, you don't want to call this manually, but this is considered to be a public interface.

Source code in src/discobase/database.py
async def init(self) -> None:
    """
    Initializes the database server.

    Generally, you don't want to call this manually, but
    this is considered to be a public interface.
    """
    logger.info("Initializing the bot.")
    # Load external commands
    coros: list[asyncio.Task] = []
    for module in iter_modules(path=["cogs"], prefix="cogs."):
        logger.debug(f"Loading module with cog: {module}")
        coros.append(
            asyncio.create_task(self.bot.load_extension(module.name))
        )

    await asyncio.gather(*coros)
    logger.info("Syncing slash commands, this might take a minute.")
    await self.bot.tree.sync()
    logger.info("Waiting until bot is logged in.")
    await self.bot.wait_until_ready()
    logger.info("Bot is ready!")
    found_guild: discord.Guild | None = None
    for guild in self.bot.guilds:
        if guild.name == self.name:
            found_guild = guild
            break

    if not found_guild:
        logger.info("No guild found, making one.")
        self.guild = await self.bot.create_guild(name=self.name)
    else:
        logger.info("Found an existing guild.")
        self.guild = found_guild

    # Unlock database, but don't wakeup the user.
    self._internal_setup_event.set()
    await self.build_tables()
    # At this point, the database is marked as "ready" to the user.
    self._setup_event.set()

    assert self._metadata_channel is not None
    logger.info(
        f"Invite to server: {await self._metadata_channel.create_invite()}"
    )

login(bot_token: str) -> None async

Start running the bot.

Parameters:

Name Type Description Default
bot_token str

Discord API bot token to log in with.

required
Source code in src/discobase/database.py
async def login(self, bot_token: str) -> None:
    """
    Start running the bot.

    Args:
        bot_token: Discord API bot token to log in with.
    """
    if self.open:
        raise RuntimeError(
            "connection is already open, did you call login() twice?"
        )

    # We use _set_open() with a gather to keep a finer link
    # between the `open` attribute and whether it's actually
    # running.
    await asyncio.gather(self.bot.start(token=bot_token), self._set_open())

login_task(bot_token: str) -> asyncio.Task[None]

Call login() as a free-flying task, instead of blocking the event loop.

Note that this method stores a reference to the created task object, allowing it to be truly "free-flying."

Parameters:

Name Type Description Default
bot_token str

Discord API bot token to log in to.

required

Returns:

Type Description
Task[None]

asyncio.Task[None]: The created asyncio.Task object.

Task[None]

Note that the database will store this internally, so you

Task[None]

don't have to worry about losing the reference. By default,

Task[None]

this task will never get awaited, so this function will not

Task[None]

keep the event loop running. If you want to keep the event loop

Task[None]

running, make sure to await the returned task object later.

Example
import asyncio
import os

import discobase


async def main():
    db = discobase.Database("test")
    task = await db.login_task("...")
    await db.wait_ready()
    # ...
    await task  # Keep the event loop running

asyncio.run(main())
Source code in src/discobase/database.py
def login_task(self, bot_token: str) -> asyncio.Task[None]:
    """
    Call `login()` as a free-flying task, instead of
    blocking the event loop.

    Note that this method stores a reference to the created
    task object, allowing it to be truly "free-flying."

    Args:
        bot_token: Discord API bot token to log in to.

    Returns:
        asyncio.Task[None]: The created `asyncio.Task` object.
        Note that the database will store this internally, so you
        don't have to worry about losing the reference. By default,
        this task will never get `await`ed, so this function will not
        keep the event loop running. If you want to keep the event loop
        running, make sure to `await` the returned task object later.

    Example:
        ```py
        import asyncio
        import os

        import discobase


        async def main():
            db = discobase.Database("test")
            task = await db.login_task("...")
            await db.wait_ready()
            # ...
            await task  # Keep the event loop running

        asyncio.run(main())
        ```
    """
    task = asyncio.create_task(self.login(bot_token))
    self._task = task
    return task

table(clas: T) -> T

Mark a Table type as part of a database. This method is meant to be used as a decorator.

Parameters:

Name Type Description Default
clas T

Type object to attach.

required
Example
import discobase

db = discobase.Database("My database")

@db.table
class MySchema(discobase.Table):
    foo: int
    bar: str

# ...

Returns:

Type Description
T

Type[Table]: The same object passed to clas -- this is in order

T

to allow use as a decorator.

Source code in src/discobase/database.py
def table(self, clas: T) -> T:
    """
    Mark a `Table` type as part of a database.
    This method is meant to be used as a decorator.

    Args:
        clas: Type object to attach.

    Example:
        ```py
        import discobase

        db = discobase.Database("My database")

        @db.table
        class MySchema(discobase.Table):
            foo: int
            bar: str

        # ...
        ```

    Returns:
        Type[Table]: The same object passed to `clas` -- this is in order
        to allow use as a decorator.
    """
    if not issubclass(clas, Table):
        raise DatabaseTableError(
            f"{clas} is not a subclass of discobase.Table, did you forget it?",  # noqa
        )

    clas.__disco_name__ = clas.__name__.lower()
    if clas.__disco_name__ in self.tables:
        raise DatabaseTableError(f"table {clas.__name__} already exists")

    if clas.__disco_database__ is not None:
        raise DatabaseTableError(
            f"{clas!r} can only be attached to one database"
        )

    clas.__disco_database__ = self

    # This is up for criticism -- instead of using Pydantic's
    # `model_fields` attribute, we invent our own `__disco_keys__` instead.
    #
    # Partially, this is due to the fact that we want `__disco_keys__` to
    # be, more or less, stable throughout the codebase.
    #
    # However, I don't think Pydantic would mess with `model_fields`, as
    # that's a public API, and hence why this could possibly be
    # considered as bad design.
    for field in clas.model_fields:
        clas.__disco_keys__.add(field)

    self.tables[clas.__disco_name__] = clas
    return clas

wait_ready() -> None async

Wait until the database is ready.

Source code in src/discobase/database.py
async def wait_ready(self) -> None:
    """
    Wait until the database is ready.
    """
    logger.info("Waiting until the database is ready.")
    await self._setup_event.wait()
    logger.info("Done waiting!")
    # See #49, we need to propagate errors in `on_ready` here.
    if self._on_ready_exc:
        logger.error("on_ready() failed, propagating now.")
        raise self._on_ready_exc

discobase.table

__all__ = ('Table') module-attribute

Table

Bases: BaseModel

Source code in src/discobase/table.py
class Table(BaseModel):
    __disco_database__: ClassVar[Optional[Database]]
    """Attached `Database` object. Set by the `table()` decorator."""
    __disco_cursor__: ClassVar[Optional[TableCursor]]
    """Internal table cursor, set at initialization time."""
    __disco_keys__: ClassVar[Set[str]]
    """All keys of the table, this may not change once set by `table()`."""
    __disco_name__: ClassVar[str]
    """Internal name of the table. Set by the `table()` decorator."""
    __disco_id__: int = -1
    """Message ID of the record. This is only present if it was saved."""

    def __init__(self, /, **data: Any) -> None:
        super().__init__(**data)
        self.__disco_id__ = -1

    def __init_subclass__(cls, **kwargs: Unpack[ConfigDict]) -> None:
        super().__init_subclass__(**kwargs)
        cls.__disco_database__ = None
        cls.__disco_cursor__ = None
        cls.__disco_keys__ = set()
        cls.__disco_name__ = "_notset"

    @classmethod
    def _ensure_db(cls) -> None:
        if not cls.__disco_database__:
            raise DatabaseTableError(
                f"{cls.__name__} has no attached database, did you forget to call @db.table?"  # noqa
            )

        if not cls.__disco_database__.open:
            raise NotConnectedError(
                "database is not connected! did you forget to open it?"
            )

        if not cls.__disco_cursor__:
            raise DatabaseTableError(
                f"{cls.__name__} is not ready, you might want to add a call to build_tables()",  # noqa
            )

    def save(self) -> asyncio.Task[discord.Message]:
        """
        Save the entry to the database as a new record.

        Example:
            ```py
            import discobase

            db = discobase.Database("My database")

            @db.table
            class User(discobase.Table):
                name: str
                password: str

            # Using top-level await for this example
            await User(name="Peter", password="foobar").save()
            ```
        """
        self._ensure_db()
        assert self.__disco_cursor__

        if self.__disco_id__ != -1:
            raise DatabaseStorageError(
                "this entry has already been written, did you mean to call update()?",  # noqa
            )
        task = free_fly(self.__disco_cursor__.add_record(self))

        def _cb(fut: asyncio.Task[discord.Message]) -> None:
            msg = fut.result()
            self.__disco_id__ = msg.id

        task.add_done_callback(_cb)
        return task

    def _ensure_written(self) -> None:
        if self.__disco_id__ == -1:
            raise DatabaseStorageError(
                "this entry has not been written, did you mean to call save()?",  # noqa
            )

    def update(self) -> asyncio.Task[discord.Message]:
        """
        Update the entry in-place.

        Example:
            ```py
            import discobase

            db = discobase.Database("My database")

            @db.table
            class User(discobase.Table):
                name: str
                password: str

            # Using top-level await for this example
            user = await User.find_unique(name="Peter", password="foobar")
            user.password = str(hash(password))
            await user.update()
            ```
        """

        self._ensure_db()
        self._ensure_written()
        assert self.__disco_cursor__
        if self.__disco_id__ == -1:
            raise DatabaseStorageError(
                "this entry has not been written, did you mean to call save()?",  # noqa
            )
        return free_fly(self.__disco_cursor__.update_record(self))

    def commit(self) -> asyncio.Task[discord.Message]:
        """
        Save the current entry, or update it if it already exists in the
        database.
        """
        if self.__disco_id__ == -1:
            return self.save()
        else:
            return self.update()

    def delete(self) -> asyncio.Task[None]:
        """
        Delete the current entry from the database.
        """

        self._ensure_written()
        assert self.__disco_cursor__
        return free_fly(self.__disco_cursor__.delete_record(self))

    @classmethod
    async def find(cls, **kwargs: Any) -> list[Self]:
        """
        Find a list of entries in the database.

        Args:
            **kwargs: Values to search for. These should be keys in the schema.

        Returns:
            list[Table]: The list of objects that match the values in kwargs
        Example:
            ```py
            import discobase
            db = discobase.Database("My database")
            @db.table
            class User(discobase.Table):
                name: str
                password: str
            # Using top-level await for this example
            await User.find(password="foobar").save()
            ```
        """
        cls._ensure_db()
        assert cls.__disco_cursor__
        return await cls.__disco_cursor__.find_records(
            cls,
            kwargs,
        )

    @classmethod
    @overload
    async def find_unique(
        cls,
        *,
        strict: Literal[True] = True,
        **kwargs: Any,
    ) -> Self: ...

    @classmethod
    @overload
    async def find_unique(
        cls,
        *,
        strict: Literal[False] = False,
        **kwargs: Any,
    ) -> Self | None: ...

    @classmethod
    async def find_unique(
        cls,
        *,
        strict: bool = True,
        **kwargs: Any,
    ) -> Self | None:
        """
        Find a unique entry in the database.

        Args:
            **kwargs: Values to search for. These should be keys in the schema.

        Returns:
            Table | None: Returns a single object that matches the values in
            kwargs or None if no match is found.
        """

        if not kwargs:
            raise ValueError("a query must be passed to find_unique")

        values: list[Self] = await cls.find(**kwargs)

        if not len(values):
            if strict:
                raise DatabaseLookupError(
                    f"no entry found with query {kwargs}",
                )

            return None

        if strict and (1 < len(values)):
            raise DatabaseLookupError(
                "more than one entry was found with find_unique"
            )

        return values[0]

__disco_cursor__: Optional[TableCursor] class-attribute

Internal table cursor, set at initialization time.

__disco_database__: Optional[Database] class-attribute

Attached Database object. Set by the table() decorator.

__disco_id__: int = -1 class-attribute instance-attribute

Message ID of the record. This is only present if it was saved.

__disco_keys__: Set[str] class-attribute

All keys of the table, this may not change once set by table().

__disco_name__: str class-attribute

Internal name of the table. Set by the table() decorator.

__init__(**data: Any) -> None

Source code in src/discobase/table.py
def __init__(self, /, **data: Any) -> None:
    super().__init__(**data)
    self.__disco_id__ = -1

__init_subclass__(**kwargs: Unpack[ConfigDict]) -> None

Source code in src/discobase/table.py
def __init_subclass__(cls, **kwargs: Unpack[ConfigDict]) -> None:
    super().__init_subclass__(**kwargs)
    cls.__disco_database__ = None
    cls.__disco_cursor__ = None
    cls.__disco_keys__ = set()
    cls.__disco_name__ = "_notset"

commit() -> asyncio.Task[discord.Message]

Save the current entry, or update it if it already exists in the database.

Source code in src/discobase/table.py
def commit(self) -> asyncio.Task[discord.Message]:
    """
    Save the current entry, or update it if it already exists in the
    database.
    """
    if self.__disco_id__ == -1:
        return self.save()
    else:
        return self.update()

delete() -> asyncio.Task[None]

Delete the current entry from the database.

Source code in src/discobase/table.py
def delete(self) -> asyncio.Task[None]:
    """
    Delete the current entry from the database.
    """

    self._ensure_written()
    assert self.__disco_cursor__
    return free_fly(self.__disco_cursor__.delete_record(self))

find(**kwargs: Any) -> list[Self] async classmethod

Find a list of entries in the database.

Parameters:

Name Type Description Default
**kwargs Any

Values to search for. These should be keys in the schema.

{}

Returns:

Type Description
list[Self]

list[Table]: The list of objects that match the values in kwargs

Example:

import discobase
db = discobase.Database("My database")
@db.table
class User(discobase.Table):
    name: str
    password: str
# Using top-level await for this example
await User.find(password="foobar").save()

Source code in src/discobase/table.py
@classmethod
async def find(cls, **kwargs: Any) -> list[Self]:
    """
    Find a list of entries in the database.

    Args:
        **kwargs: Values to search for. These should be keys in the schema.

    Returns:
        list[Table]: The list of objects that match the values in kwargs
    Example:
        ```py
        import discobase
        db = discobase.Database("My database")
        @db.table
        class User(discobase.Table):
            name: str
            password: str
        # Using top-level await for this example
        await User.find(password="foobar").save()
        ```
    """
    cls._ensure_db()
    assert cls.__disco_cursor__
    return await cls.__disco_cursor__.find_records(
        cls,
        kwargs,
    )

find_unique(*, strict: bool = True, **kwargs: Any) -> Self | None async classmethod

Find a unique entry in the database.

Parameters:

Name Type Description Default
**kwargs Any

Values to search for. These should be keys in the schema.

{}

Returns:

Type Description
Self | None

Table | None: Returns a single object that matches the values in

Self | None

kwargs or None if no match is found.

Source code in src/discobase/table.py
@classmethod
async def find_unique(
    cls,
    *,
    strict: bool = True,
    **kwargs: Any,
) -> Self | None:
    """
    Find a unique entry in the database.

    Args:
        **kwargs: Values to search for. These should be keys in the schema.

    Returns:
        Table | None: Returns a single object that matches the values in
        kwargs or None if no match is found.
    """

    if not kwargs:
        raise ValueError("a query must be passed to find_unique")

    values: list[Self] = await cls.find(**kwargs)

    if not len(values):
        if strict:
            raise DatabaseLookupError(
                f"no entry found with query {kwargs}",
            )

        return None

    if strict and (1 < len(values)):
        raise DatabaseLookupError(
            "more than one entry was found with find_unique"
        )

    return values[0]

save() -> asyncio.Task[discord.Message]

Save the entry to the database as a new record.

Example
import discobase

db = discobase.Database("My database")

@db.table
class User(discobase.Table):
    name: str
    password: str

# Using top-level await for this example
await User(name="Peter", password="foobar").save()
Source code in src/discobase/table.py
def save(self) -> asyncio.Task[discord.Message]:
    """
    Save the entry to the database as a new record.

    Example:
        ```py
        import discobase

        db = discobase.Database("My database")

        @db.table
        class User(discobase.Table):
            name: str
            password: str

        # Using top-level await for this example
        await User(name="Peter", password="foobar").save()
        ```
    """
    self._ensure_db()
    assert self.__disco_cursor__

    if self.__disco_id__ != -1:
        raise DatabaseStorageError(
            "this entry has already been written, did you mean to call update()?",  # noqa
        )
    task = free_fly(self.__disco_cursor__.add_record(self))

    def _cb(fut: asyncio.Task[discord.Message]) -> None:
        msg = fut.result()
        self.__disco_id__ = msg.id

    task.add_done_callback(_cb)
    return task

update() -> asyncio.Task[discord.Message]

Update the entry in-place.

Example
import discobase

db = discobase.Database("My database")

@db.table
class User(discobase.Table):
    name: str
    password: str

# Using top-level await for this example
user = await User.find_unique(name="Peter", password="foobar")
user.password = str(hash(password))
await user.update()
Source code in src/discobase/table.py
def update(self) -> asyncio.Task[discord.Message]:
    """
    Update the entry in-place.

    Example:
        ```py
        import discobase

        db = discobase.Database("My database")

        @db.table
        class User(discobase.Table):
            name: str
            password: str

        # Using top-level await for this example
        user = await User.find_unique(name="Peter", password="foobar")
        user.password = str(hash(password))
        await user.update()
        ```
    """

    self._ensure_db()
    self._ensure_written()
    assert self.__disco_cursor__
    if self.__disco_id__ == -1:
        raise DatabaseStorageError(
            "this entry has not been written, did you mean to call save()?",  # noqa
        )
    return free_fly(self.__disco_cursor__.update_record(self))

discobase.exceptions

DatabaseCorruptionError

Bases: DiscobaseError

The database was corrupted somehow.

Source code in src/discobase/exceptions.py
class DatabaseCorruptionError(DiscobaseError):
    """
    The database was corrupted somehow.
    """

DatabaseLookupError

Bases: DiscobaseError

Something went wrong with an entry lookup.

Source code in src/discobase/exceptions.py
class DatabaseLookupError(DiscobaseError):
    """
    Something went wrong with an entry lookup.
    """

DatabaseStorageError

Bases: DiscobaseError

Failed store something in the database.

Source code in src/discobase/exceptions.py
class DatabaseStorageError(DiscobaseError):
    """
    Failed store something in the database.
    """

DatabaseTableError

Bases: DiscobaseError

Something is wrong with a Table type.

Source code in src/discobase/exceptions.py
class DatabaseTableError(DiscobaseError):
    """
    Something is wrong with a `Table` type.
    """

DiscobaseError

Bases: Exception

Base discobase exception class.

Source code in src/discobase/exceptions.py
1
2
3
4
class DiscobaseError(Exception):
    """
    Base discobase exception class.
    """

NotConnectedError

Bases: DiscobaseError

The database is not connected.

Source code in src/discobase/exceptions.py
class NotConnectedError(DiscobaseError):
    """
    The database is not connected.
    """