Skip to content

prefect_sqlalchemy.database

Tasks for querying a database with SQLAlchemy

sqlalchemy_execute async

Executes a SQL DDL or DML statement; useful for creating tables and inserting rows since this task does not return any objects.

Parameters:

Name Type Description Default
statement str

The statement to execute against the database.

required
sqlalchemy_credentials 'DatabaseCredentials'

The credentials to use to authenticate.

required
params Optional[Union[Tuple[Any], Dict[str, Any]]]

The params to replace the placeholders in the query.

None

Examples:

Create table named customers and insert values.

from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver
from prefect_sqlalchemy.database import sqlalchemy_execute
from prefect import flow

@flow
def sqlalchemy_execute_flow():
    sqlalchemy_credentials = DatabaseCredentials(
        driver=AsyncDriver.SQLITE_AIOSQLITE,
        database="prefect.db",
    )
    sqlalchemy_execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);",
        sqlalchemy_credentials,
    )
    sqlalchemy_execute(
        "INSERT INTO customers (name, address) VALUES (:name, :address);",
        sqlalchemy_credentials,
        params={"name": "Marvin", "address": "Highway 42"}
    )

sqlalchemy_execute_flow()

Source code in prefect_sqlalchemy/database.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
@task
async def sqlalchemy_execute(
    statement: str,
    sqlalchemy_credentials: "DatabaseCredentials",
    params: Optional[Union[Tuple[Any], Dict[str, Any]]] = None,
):
    """
    Executes a SQL DDL or DML statement; useful for creating tables and inserting rows
    since this task does not return any objects.

    Args:
        statement: The statement to execute against the database.
        sqlalchemy_credentials: The credentials to use to authenticate.
        params: The params to replace the placeholders in the query.

    Examples:
        Create table named customers and insert values.
        ```python
        from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver
        from prefect_sqlalchemy.database import sqlalchemy_execute
        from prefect import flow

        @flow
        def sqlalchemy_execute_flow():
            sqlalchemy_credentials = DatabaseCredentials(
                driver=AsyncDriver.SQLITE_AIOSQLITE,
                database="prefect.db",
            )
            sqlalchemy_execute(
                "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);",
                sqlalchemy_credentials,
            )
            sqlalchemy_execute(
                "INSERT INTO customers (name, address) VALUES (:name, :address);",
                sqlalchemy_credentials,
                params={"name": "Marvin", "address": "Highway 42"}
            )

        sqlalchemy_execute_flow()
        ```
    """
    # do not return anything or else results in the error:
    # This result object does not return rows. It has been closed automatically
    engine = sqlalchemy_credentials.get_engine()
    async_supported = sqlalchemy_credentials._async_supported
    async with _connect(engine, async_supported) as connection:
        await _execute(connection, statement, params, async_supported)

sqlalchemy_query async

Executes a SQL query; useful for querying data from existing tables.

Parameters:

Name Type Description Default
query str

The query to execute against the database.

required
sqlalchemy_credentials 'DatabaseCredentials'

The credentials to use to authenticate.

required
params Optional[Union[Tuple[Any], Dict[str, Any]]]

The params to replace the placeholders in the query.

None
limit Optional[int]

The number of rows to fetch. Note, this parameter is executed on the client side, i.e. passed to fetchmany. To limit on the server side, add the LIMIT clause, or the dialect's equivalent clause, like TOP, to the query.

None

Returns:

Type Description
List[Tuple[Any]]

The fetched results.

Examples:

Query postgres table with the ID value parameterized.

from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver
from prefect_sqlalchemy.database import sqlalchemy_query
from prefect import flow

@flow
def sqlalchemy_query_flow():
    sqlalchemy_credentials = DatabaseCredentials(
        driver=AsyncDriver.SQLITE_AIOSQLITE,
        database="prefect.db",
    )
    result = sqlalchemy_query(
        "SELECT * FROM customers WHERE name = :name;",
        sqlalchemy_credentials,
        params={"name": "Marvin"},
    )
    return result

sqlalchemy_query_flow()

Source code in prefect_sqlalchemy/database.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@task
async def sqlalchemy_query(
    query: str,
    sqlalchemy_credentials: "DatabaseCredentials",
    params: Optional[Union[Tuple[Any], Dict[str, Any]]] = None,
    limit: Optional[int] = None,
) -> List[Tuple[Any]]:
    """
    Executes a SQL query; useful for querying data from existing tables.

    Args:
        query: The query to execute against the database.
        sqlalchemy_credentials: The credentials to use to authenticate.
        params: The params to replace the placeholders in the query.
        limit: The number of rows to fetch. Note, this parameter is
            executed on the client side, i.e. passed to `fetchmany`.
            To limit on the server side, add the `LIMIT` clause, or
            the dialect's equivalent clause, like `TOP`, to the query.

    Returns:
        The fetched results.

    Examples:
        Query postgres table with the ID value parameterized.
        ```python
        from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver
        from prefect_sqlalchemy.database import sqlalchemy_query
        from prefect import flow

        @flow
        def sqlalchemy_query_flow():
            sqlalchemy_credentials = DatabaseCredentials(
                driver=AsyncDriver.SQLITE_AIOSQLITE,
                database="prefect.db",
            )
            result = sqlalchemy_query(
                "SELECT * FROM customers WHERE name = :name;",
                sqlalchemy_credentials,
                params={"name": "Marvin"},
            )
            return result

        sqlalchemy_query_flow()
        ```
    """
    engine = sqlalchemy_credentials.get_engine()
    async_supported = sqlalchemy_credentials._async_supported
    async with _connect(engine, async_supported) as connection:
        result = await _execute(connection, query, params, async_supported)
        # some databases, like sqlite, require a connection still open to fetch!
        rows = result.fetchall() if limit is None else result.fetchmany(limit)
    return rows