Skip to content

PyDrocsid.database.database

DB

Source code in PyDrocsid/database/database.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
class DB:
    def __init__(
        self,
        driver: str,
        host: str,
        port: int,
        database: str,
        username: str,
        password: str,
        pool_recycle: int = 300,
        pool_size: int = 20,
        max_overflow: int = 20,
        echo: bool = False,
    ):
        """
        :param driver: name of the sql connection driver
        :param host: host of the sql server
        :param port: port of the sql server
        :param database: name of the database
        :param username: name of the sql user
        :param password: password of the sql user
        :param echo: whether sql queries should be logged
        """

        self.engine: AsyncEngine = create_async_engine(
            URL.create(
                drivername=driver, username=username, password=password, host=host, port=port, database=database
            ),
            pool_pre_ping=True,
            pool_recycle=pool_recycle,
            pool_size=pool_size,
            max_overflow=max_overflow,
            echo=echo,
        )

    async def create_tables(self) -> None:
        """Create all tables defined in enabled cog packages."""

        from PyDrocsid.config import get_subclasses_in_enabled_packages

        logger.debug("creating tables")
        tables = [cls.__table__ for cls in get_subclasses_in_enabled_packages(Base)]
        async with self.engine.begin() as conn:
            await conn.run_sync(partial(Base.metadata.create_all, tables=tables))

    async def add(self, obj: T) -> T:
        """
        Add a new row to the database

        :param obj: the row to insert
        :return: the same row
        """

        self.session.add(obj)
        return obj

    async def delete(self, obj: T) -> T:
        """
        Remove a row from the database

        :param obj: the row to remove
        :return: the same row
        """

        await self.session.delete(obj)
        return obj

    async def exec(self, statement: Executable) -> Any:
        """Execute an sql statement and return the result."""

        return await self.session.execute(statement)

    async def stream(self, statement: Executable) -> AsyncIterator[Any]:
        """Execute an sql statement and stream the result."""

        return cast(AsyncIterator[Any], (await self.session.stream(statement)).scalars())

    async def all(self, statement: Executable) -> list[Any]:
        """Execute an sql statement and return all results as a list."""

        return [x async for x in await self.stream(statement)]

    async def first(self, statement: Executable) -> Any | None:
        """Execute an sql statement and return the first result."""

        return (await self.exec(statement)).scalar()

    async def exists(self, statement: Executable, *args: Column[Any], **kwargs: Any) -> bool:
        """Execute an sql statement and return whether it returned at least one row."""

        return cast(bool, await self.first(exists(statement, *args, **kwargs).select()))

    async def count(self, statement: Executable, *args: Column[Any]) -> int:
        """Execute an sql statement and return the number of returned rows."""

        return cast(int, await self.first(select(count()).select_from(statement, *args)))

    async def get(self, cls: Type[T], *args: Column[Any], **kwargs: Any) -> T | None:
        """Shortcut for first(filter_by(...))"""

        return await self.first(filter_by(cls, *args, **kwargs))

    async def commit(self) -> None:
        """Shortcut for :meth:`sqlalchemy.ext.asyncio.AsyncSession.commit`"""

        if sessions := _sessions.get():
            await sessions[-1].session.commit()

    async def close(self) -> None:
        """Close the current session"""

        if sessions := _sessions.get():
            session, close_event = sessions.pop()
            _sessions.set(sessions)
            await session.close()
            close_event.set()

    def create_session(self) -> AsyncSession:
        """Create a new async session and store it in the context variable."""

        session = Session(AsyncSession(self.engine, expire_on_commit=False), Event())
        _sessions.set(_sessions.get() + [session])
        return session.session

    @property
    def session(self) -> AsyncSession:
        """Get the session object for the current task"""

        if not (sessions := _sessions.get()):
            raise RuntimeError("No session available")

        return sessions[-1].session

    async def wait_for_close_event(self) -> None:
        if sessions := _sessions.get():
            await sessions[-1].close_event.wait()

__init__

__init__(driver: str, host: str, port: int, database: str, username: str, password: str, pool_recycle: int = 300, pool_size: int = 20, max_overflow: int = 20, echo: bool = False)

Parameters:

  • driver (str) –

    name of the sql connection driver

  • host (str) –

    host of the sql server

  • port (int) –

    port of the sql server

  • database (str) –

    name of the database

  • username (str) –

    name of the sql user

  • password (str) –

    password of the sql user

  • echo (bool) –

    whether sql queries should be logged

Source code in PyDrocsid/database/database.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def __init__(
    self,
    driver: str,
    host: str,
    port: int,
    database: str,
    username: str,
    password: str,
    pool_recycle: int = 300,
    pool_size: int = 20,
    max_overflow: int = 20,
    echo: bool = False,
):
    """
    :param driver: name of the sql connection driver
    :param host: host of the sql server
    :param port: port of the sql server
    :param database: name of the database
    :param username: name of the sql user
    :param password: password of the sql user
    :param echo: whether sql queries should be logged
    """

    self.engine: AsyncEngine = create_async_engine(
        URL.create(
            drivername=driver, username=username, password=password, host=host, port=port, database=database
        ),
        pool_pre_ping=True,
        pool_recycle=pool_recycle,
        pool_size=pool_size,
        max_overflow=max_overflow,
        echo=echo,
    )

add async

add(obj: T) -> T

Add a new row to the database

Parameters:

  • obj (T) –

    the row to insert

Returns:

  • T

    the same row

Source code in PyDrocsid/database/database.py
158
159
160
161
162
163
164
165
166
167
async def add(self, obj: T) -> T:
    """
    Add a new row to the database

    :param obj: the row to insert
    :return: the same row
    """

    self.session.add(obj)
    return obj

all async

all(statement: Executable) -> list[Any]

Execute an sql statement and return all results as a list.

Source code in PyDrocsid/database/database.py
190
191
192
193
async def all(self, statement: Executable) -> list[Any]:
    """Execute an sql statement and return all results as a list."""

    return [x async for x in await self.stream(statement)]

close async

close() -> None

Close the current session

Source code in PyDrocsid/database/database.py
221
222
223
224
225
226
227
228
async def close(self) -> None:
    """Close the current session"""

    if sessions := _sessions.get():
        session, close_event = sessions.pop()
        _sessions.set(sessions)
        await session.close()
        close_event.set()

commit async

commit() -> None

Shortcut for :meth:sqlalchemy.ext.asyncio.AsyncSession.commit

Source code in PyDrocsid/database/database.py
215
216
217
218
219
async def commit(self) -> None:
    """Shortcut for :meth:`sqlalchemy.ext.asyncio.AsyncSession.commit`"""

    if sessions := _sessions.get():
        await sessions[-1].session.commit()

count async

count(statement: Executable, *args: Column[Any]) -> int

Execute an sql statement and return the number of returned rows.

Source code in PyDrocsid/database/database.py
205
206
207
208
async def count(self, statement: Executable, *args: Column[Any]) -> int:
    """Execute an sql statement and return the number of returned rows."""

    return cast(int, await self.first(select(count()).select_from(statement, *args)))

create_session

create_session() -> AsyncSession

Create a new async session and store it in the context variable.

Source code in PyDrocsid/database/database.py
230
231
232
233
234
235
def create_session(self) -> AsyncSession:
    """Create a new async session and store it in the context variable."""

    session = Session(AsyncSession(self.engine, expire_on_commit=False), Event())
    _sessions.set(_sessions.get() + [session])
    return session.session

create_tables async

create_tables() -> None

Create all tables defined in enabled cog packages.

Source code in PyDrocsid/database/database.py
148
149
150
151
152
153
154
155
156
async def create_tables(self) -> None:
    """Create all tables defined in enabled cog packages."""

    from PyDrocsid.config import get_subclasses_in_enabled_packages

    logger.debug("creating tables")
    tables = [cls.__table__ for cls in get_subclasses_in_enabled_packages(Base)]
    async with self.engine.begin() as conn:
        await conn.run_sync(partial(Base.metadata.create_all, tables=tables))

delete async

delete(obj: T) -> T

Remove a row from the database

Parameters:

  • obj (T) –

    the row to remove

Returns:

  • T

    the same row

Source code in PyDrocsid/database/database.py
169
170
171
172
173
174
175
176
177
178
async def delete(self, obj: T) -> T:
    """
    Remove a row from the database

    :param obj: the row to remove
    :return: the same row
    """

    await self.session.delete(obj)
    return obj

exec async

exec(statement: Executable) -> Any

Execute an sql statement and return the result.

Source code in PyDrocsid/database/database.py
180
181
182
183
async def exec(self, statement: Executable) -> Any:
    """Execute an sql statement and return the result."""

    return await self.session.execute(statement)

exists async

exists(statement: Executable, *args: Column[Any], **kwargs: Any) -> bool

Execute an sql statement and return whether it returned at least one row.

Source code in PyDrocsid/database/database.py
200
201
202
203
async def exists(self, statement: Executable, *args: Column[Any], **kwargs: Any) -> bool:
    """Execute an sql statement and return whether it returned at least one row."""

    return cast(bool, await self.first(exists(statement, *args, **kwargs).select()))

first async

first(statement: Executable) -> Any | None

Execute an sql statement and return the first result.

Source code in PyDrocsid/database/database.py
195
196
197
198
async def first(self, statement: Executable) -> Any | None:
    """Execute an sql statement and return the first result."""

    return (await self.exec(statement)).scalar()

get async

get(cls: Type[T], *args: Column[Any], **kwargs: Any) -> T | None

Shortcut for first(filter_by(...))

Source code in PyDrocsid/database/database.py
210
211
212
213
async def get(self, cls: Type[T], *args: Column[Any], **kwargs: Any) -> T | None:
    """Shortcut for first(filter_by(...))"""

    return await self.first(filter_by(cls, *args, **kwargs))

session property

session() -> AsyncSession

Get the session object for the current task

Source code in PyDrocsid/database/database.py
237
238
239
240
241
242
243
244
@property
def session(self) -> AsyncSession:
    """Get the session object for the current task"""

    if not (sessions := _sessions.get()):
        raise RuntimeError("No session available")

    return sessions[-1].session

stream async

stream(statement: Executable) -> AsyncIterator[Any]

Execute an sql statement and stream the result.

Source code in PyDrocsid/database/database.py
185
186
187
188
async def stream(self, statement: Executable) -> AsyncIterator[Any]:
    """Execute an sql statement and stream the result."""

    return cast(AsyncIterator[Any], (await self.session.stream(statement)).scalars())

delete

delete(table: Any) -> Delete

Shortcut for :meth:sqlalchemy.sql.expression.delete

Source code in PyDrocsid/database/database.py
79
80
81
82
def delete(table: Any) -> Delete:
    """Shortcut for :meth:`sqlalchemy.sql.expression.delete`"""

    return sa_delete(table)

exists

exists(statement: Executable, *entities: Column[Any], **kwargs: Any) -> Exists

Shortcut for :meth:sqlalchemy.future.select

Source code in PyDrocsid/database/database.py
73
74
75
76
def exists(statement: Executable, *entities: Column[Any], **kwargs: Any) -> Exists:
    """Shortcut for :meth:`sqlalchemy.future.select`"""

    return sa_exists(statement, *entities, **kwargs)

filter_by

filter_by(cls: Any, *args: Column[Any], **kwargs: Any) -> Select

Shortcut for :meth:sqlalchemy.future.Select.filter_by

Source code in PyDrocsid/database/database.py
67
68
69
70
def filter_by(cls: Any, *args: Column[Any], **kwargs: Any) -> Select:
    """Shortcut for :meth:`sqlalchemy.future.Select.filter_by`"""

    return select(cls, *args).filter_by(**kwargs)

get_database

get_database() -> DB

Create a database connection object using the environment variables

Returns:

  • DB

    The DB object

Source code in PyDrocsid/database/database.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def get_database() -> DB:
    """
    Create a database connection object using the environment variables

    :return: The DB object
    """

    return DB(
        driver=DB_DRIVER,
        host=DB_HOST,
        port=DB_PORT,
        database=DB_DATABASE,
        username=DB_USERNAME,
        password=DB_PASSWORD,
        pool_recycle=POOL_RECYCLE,
        pool_size=POOL_SIZE,
        max_overflow=MAX_OVERFLOW,
        echo=SQL_SHOW_STATEMENTS,
    )

select

select(entity: Any, *args: Column[Any]) -> Select

Shortcut for :meth:sqlalchemy.future.select

Source code in PyDrocsid/database/database.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def select(entity: Any, *args: Column[Any]) -> Select:
    """Shortcut for :meth:`sqlalchemy.future.select`"""

    if not args:
        return sa_select(entity)

    options = []
    for arg in args:
        if isinstance(arg, (tuple, list)):
            head, *tail = arg
            opt = selectinload(head)
            for x in tail:
                opt = opt.selectinload(x)
            options.append(opt)
        else:
            options.append(selectinload(arg))

    return sa_select(entity).options(*options)