Skip to content

prefect_sqlalchemy.credentials

Credential classes used to perform authenticated interactions with SQLAlchemy

AsyncDriver

Known dialects with their corresponding async drivers.

Attributes:

Name Type Description
POSTGRESQL_ASYNCPG Enum

postgresql+asyncpg

SQLITE_AIOSQLITE Enum

sqlite+aiosqlite

MYSQL_ASYNCMY Enum

mysql+asyncmy

MYSQL_AIOMYSQL Enum

mysql+aiomysql

Source code in prefect_sqlalchemy/credentials.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class AsyncDriver(Enum):
    """
    Known dialects with their corresponding async drivers.

    Attributes:
        POSTGRESQL_ASYNCPG (Enum): [postgresql+asyncpg](https://docs.sqlalchemy.org/en/14/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.asyncpg)

        SQLITE_AIOSQLITE (Enum): [sqlite+aiosqlite](https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#module-sqlalchemy.dialects.sqlite.aiosqlite)

        MYSQL_ASYNCMY (Enum): [mysql+asyncmy](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#module-sqlalchemy.dialects.mysql.asyncmy)
        MYSQL_AIOMYSQL (Enum): [mysql+aiomysql](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#module-sqlalchemy.dialects.mysql.aiomysql)
    """  # noqa

    POSTGRESQL_ASYNCPG = "postgresql+asyncpg"

    SQLITE_AIOSQLITE = "sqlite+aiosqlite"

    MYSQL_ASYNCMY = "mysql+asyncmy"
    MYSQL_AIOMYSQL = "mysql+aiomysql"

DatabaseCredentials

Block used to manage authentication with a database.

Attributes:

Name Type Description
driver Optional[Union[AsyncDriver, SyncDriver, str]]

The driver name, e.g. "postgresql+asyncpg"

database Optional[str]

The name of the database to use.

username Optional[str]

The user name used to authenticate.

password Optional[SecretStr]

The password used to authenticate.

host Optional[str]

The host address of the database.

port Optional[str]

The port to connect to the database.

query Optional[Dict[str, str]]

A dictionary of string keys to string values to be passed to the dialect and/or the DBAPI upon connect. To specify non-string parameters to a Python DBAPI directly, use connect_args.

url Optional[AnyUrl]

Manually create and provide a URL to create the engine, this is useful for external dialects, e.g. Snowflake, because some of the params, such as "warehouse", is not directly supported in the vanilla sqlalchemy.engine.URL.create method; do not provide this alongside with other URL params as it will raise a ValueError.

connect_args Optional[Dict[str, Any]]

The options which will be passed directly to the DBAPI's connect() method as additional keyword arguments.

Example

Load stored database credentials:

from prefect_sqlalchemy import DatabaseCredentials

database_block = DatabaseCredentials.load("BLOCK_NAME")

Source code in prefect_sqlalchemy/credentials.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
class DatabaseCredentials(Block):
    """
    Block used to manage authentication with a database.

    Attributes:
        driver: The driver name, e.g. "postgresql+asyncpg"
        database: The name of the database to use.
        username: The user name used to authenticate.
        password: The password used to authenticate.
        host: The host address of the database.
        port: The port to connect to the database.
        query: A dictionary of string keys to string values to be passed to
            the dialect and/or the DBAPI upon connect. To specify non-string
            parameters to a Python DBAPI directly, use connect_args.
        url: Manually create and provide a URL to create the engine,
            this is useful for external dialects, e.g. Snowflake, because some
            of the params, such as "warehouse", is not directly supported in
            the vanilla `sqlalchemy.engine.URL.create` method; do not provide
            this alongside with other URL params as it will raise a `ValueError`.
        connect_args: The options which will be passed directly to the
            DBAPI's connect() method as additional keyword arguments.

    Example:
        Load stored database credentials:
        ```python
        from prefect_sqlalchemy import DatabaseCredentials

        database_block = DatabaseCredentials.load("BLOCK_NAME")
        ```
    """

    _block_type_name = "Database Credentials"
    _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/3xLant5G70S4vJpmdWCYmr/8fdb19f15b97c3a07c3af3efde4d28fb/download.svg.png?h=250"  # noqa

    driver: Optional[Union[AsyncDriver, SyncDriver, str]] = None
    username: Optional[str] = None
    password: Optional[SecretStr] = None
    database: Optional[str] = None
    host: Optional[str] = None
    port: Optional[str] = None
    query: Optional[Dict[str, str]] = None
    url: Optional[AnyUrl] = None
    connect_args: Optional[Dict[str, Any]] = None

    def block_initialization(self):
        """
        Initializes the engine.
        """
        if isinstance(self.driver, AsyncDriver):
            drivername = self.driver.value
            self._async_supported = True
        elif isinstance(self.driver, SyncDriver):
            drivername = self.driver.value
            self._async_supported = False
        else:
            drivername = self.driver
            self._async_supported = drivername in AsyncDriver._value2member_map_

        url_params = dict(
            drivername=drivername,
            username=self.username,
            password=self.password.get_secret_value() if self.password else None,
            database=self.database,
            host=self.host,
            port=self.port,
            query=self.query,
        )
        if not self.url:
            required_url_keys = ("drivername", "database")
            if not all(url_params[key] for key in required_url_keys):
                required_url_keys = ("driver", "database")
                raise ValueError(
                    f"If the `url` is not provided, "
                    f"all of these URL params are required: "
                    f"{required_url_keys}"
                )
            self.rendered_url = URL.create(
                **{
                    url_key: url_param
                    for url_key, url_param in url_params.items()
                    if url_param is not None
                }
            )  # from params
        else:
            if any(val for val in url_params.values()):
                raise ValueError(
                    f"The `url` should not be provided "
                    f"alongside any of these URL params: "
                    f"{url_params.keys()}"
                )
            self.rendered_url = make_url(str(self.url))

    def get_engine(self) -> Union["Connection", "AsyncConnection"]:
        """
        Returns an authenticated engine that can be
        used to query from databases.

        Returns:
            The authenticated SQLAlchemy Connection / AsyncConnection.

        Examples:
            Create an asynchronous engine to PostgreSQL using URL params.
            ```python
            from prefect import flow
            from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver

            @flow
            def sqlalchemy_credentials_flow():
                sqlalchemy_credentials = DatabaseCredentials(
                    driver=AsyncDriver.POSTGRESQL_ASYNCPG,
                    username="prefect",
                    password="prefect_password",
                    database="postgres"
                )
                print(sqlalchemy_credentials.get_engine())

            sqlalchemy_credentials_flow()
            ```

            Create a synchronous engine to Snowflake using the `url` kwarg.
            ```python
            from prefect import flow
            from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver

            @flow
            def sqlalchemy_credentials_flow():
                url = (
                    "snowflake://<user_login_name>:<password>"
                    "@<account_identifier>/<database_name>"
                    "?warehouse=<warehouse_name>"
                )
                sqlalchemy_credentials = DatabaseCredentials(url=url)
                print(sqlalchemy_credentials.get_engine())

            sqlalchemy_credentials_flow()
            ```
        """
        engine_kwargs = dict(
            url=self.rendered_url,
            connect_args=self.connect_args or {},
            poolclass=NullPool,
        )
        if self._async_supported:
            engine = create_async_engine(**engine_kwargs)
        else:
            engine = create_engine(**engine_kwargs)
        return engine

block_initialization

Initializes the engine.

Source code in prefect_sqlalchemy/credentials.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def block_initialization(self):
    """
    Initializes the engine.
    """
    if isinstance(self.driver, AsyncDriver):
        drivername = self.driver.value
        self._async_supported = True
    elif isinstance(self.driver, SyncDriver):
        drivername = self.driver.value
        self._async_supported = False
    else:
        drivername = self.driver
        self._async_supported = drivername in AsyncDriver._value2member_map_

    url_params = dict(
        drivername=drivername,
        username=self.username,
        password=self.password.get_secret_value() if self.password else None,
        database=self.database,
        host=self.host,
        port=self.port,
        query=self.query,
    )
    if not self.url:
        required_url_keys = ("drivername", "database")
        if not all(url_params[key] for key in required_url_keys):
            required_url_keys = ("driver", "database")
            raise ValueError(
                f"If the `url` is not provided, "
                f"all of these URL params are required: "
                f"{required_url_keys}"
            )
        self.rendered_url = URL.create(
            **{
                url_key: url_param
                for url_key, url_param in url_params.items()
                if url_param is not None
            }
        )  # from params
    else:
        if any(val for val in url_params.values()):
            raise ValueError(
                f"The `url` should not be provided "
                f"alongside any of these URL params: "
                f"{url_params.keys()}"
            )
        self.rendered_url = make_url(str(self.url))

get_engine

Returns an authenticated engine that can be used to query from databases.

Returns:

Type Description
Union['Connection', 'AsyncConnection']

The authenticated SQLAlchemy Connection / AsyncConnection.

Examples:

Create an asynchronous engine to PostgreSQL using URL params.

from prefect import flow
from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver

@flow
def sqlalchemy_credentials_flow():
    sqlalchemy_credentials = DatabaseCredentials(
        driver=AsyncDriver.POSTGRESQL_ASYNCPG,
        username="prefect",
        password="prefect_password",
        database="postgres"
    )
    print(sqlalchemy_credentials.get_engine())

sqlalchemy_credentials_flow()

Create a synchronous engine to Snowflake using the url kwarg.

from prefect import flow
from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver

@flow
def sqlalchemy_credentials_flow():
    url = (
        "snowflake://<user_login_name>:<password>"
        "@<account_identifier>/<database_name>"
        "?warehouse=<warehouse_name>"
    )
    sqlalchemy_credentials = DatabaseCredentials(url=url)
    print(sqlalchemy_credentials.get_engine())

sqlalchemy_credentials_flow()

Source code in prefect_sqlalchemy/credentials.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def get_engine(self) -> Union["Connection", "AsyncConnection"]:
    """
    Returns an authenticated engine that can be
    used to query from databases.

    Returns:
        The authenticated SQLAlchemy Connection / AsyncConnection.

    Examples:
        Create an asynchronous engine to PostgreSQL using URL params.
        ```python
        from prefect import flow
        from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver

        @flow
        def sqlalchemy_credentials_flow():
            sqlalchemy_credentials = DatabaseCredentials(
                driver=AsyncDriver.POSTGRESQL_ASYNCPG,
                username="prefect",
                password="prefect_password",
                database="postgres"
            )
            print(sqlalchemy_credentials.get_engine())

        sqlalchemy_credentials_flow()
        ```

        Create a synchronous engine to Snowflake using the `url` kwarg.
        ```python
        from prefect import flow
        from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver

        @flow
        def sqlalchemy_credentials_flow():
            url = (
                "snowflake://<user_login_name>:<password>"
                "@<account_identifier>/<database_name>"
                "?warehouse=<warehouse_name>"
            )
            sqlalchemy_credentials = DatabaseCredentials(url=url)
            print(sqlalchemy_credentials.get_engine())

        sqlalchemy_credentials_flow()
        ```
    """
    engine_kwargs = dict(
        url=self.rendered_url,
        connect_args=self.connect_args or {},
        poolclass=NullPool,
    )
    if self._async_supported:
        engine = create_async_engine(**engine_kwargs)
    else:
        engine = create_engine(**engine_kwargs)
    return engine

SyncDriver

Known dialects with their corresponding sync drivers.

Attributes:

Name Type Description
POSTGRESQL_PSYCOPG2 Enum

postgresql+psycopg2

POSTGRESQL_PG8000 Enum

postgresql+pg8000

POSTGRESQL_PSYCOPG2CFFI Enum

postgresql+psycopg2cffi

POSTGRESQL_PYPOSTGRESQL Enum

postgresql+pypostgresql

POSTGRESQL_PYGRESQL Enum

postgresql+pygresql

MYSQL_MYSQLDB Enum

mysql+mysqldb

MYSQL_PYMYSQL Enum

mysql+pymysql

MYSQL_MYSQLCONNECTOR Enum

mysql+mysqlconnector

MYSQL_CYMYSQL Enum

mysql+cymysql

MYSQL_OURSQL Enum

mysql+oursql

MYSQL_PYODBC Enum

mysql+pyodbc

SQLITE_PYSQLITE Enum

sqlite+pysqlite

SQLITE_PYSQLCIPHER Enum

sqlite+pysqlcipher

ORACLE_CX_ORACLE Enum

oracle+cx_oracle

MSSQL_PYODBC Enum

mssql+pyodbc

MSSQL_MXODBC Enum

mssql+mxodbc

MSSQL_PYMSSQL Enum

mssql+pymssql

Source code in prefect_sqlalchemy/credentials.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class SyncDriver(Enum):
    """
    Known dialects with their corresponding sync drivers.

    Attributes:
        POSTGRESQL_PSYCOPG2 (Enum): [postgresql+psycopg2](https://docs.sqlalchemy.org/en/14/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.psycopg2)
        POSTGRESQL_PG8000 (Enum): [postgresql+pg8000](https://docs.sqlalchemy.org/en/14/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.pg8000)
        POSTGRESQL_PSYCOPG2CFFI (Enum): [postgresql+psycopg2cffi](https://docs.sqlalchemy.org/en/14/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.psycopg2cffi)
        POSTGRESQL_PYPOSTGRESQL (Enum): [postgresql+pypostgresql](https://docs.sqlalchemy.org/en/14/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.pypostgresql)
        POSTGRESQL_PYGRESQL (Enum): [postgresql+pygresql](https://docs.sqlalchemy.org/en/14/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.pygresql)

        MYSQL_MYSQLDB (Enum): [mysql+mysqldb](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#module-sqlalchemy.dialects.mysql.mysqldb)
        MYSQL_PYMYSQL (Enum): [mysql+pymysql](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#module-sqlalchemy.dialects.mysql.pymysql)
        MYSQL_MYSQLCONNECTOR (Enum): [mysql+mysqlconnector](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#module-sqlalchemy.dialects.mysql.mysqlconnector)
        MYSQL_CYMYSQL (Enum): [mysql+cymysql](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#module-sqlalchemy.dialects.mysql.cymysql)
        MYSQL_OURSQL (Enum): [mysql+oursql](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#module-sqlalchemy.dialects.mysql.oursql)
        MYSQL_PYODBC (Enum): [mysql+pyodbc](https://docs.sqlalchemy.org/en/14/dialects/mysql.html#module-sqlalchemy.dialects.mysql.pyodbc)

        SQLITE_PYSQLITE (Enum): [sqlite+pysqlite](https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#module-sqlalchemy.dialects.sqlite.pysqlite)
        SQLITE_PYSQLCIPHER (Enum): [sqlite+pysqlcipher](https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#module-sqlalchemy.dialects.sqlite.pysqlcipher)

        ORACLE_CX_ORACLE (Enum): [oracle+cx_oracle](https://docs.sqlalchemy.org/en/14/dialects/oracle.html#module-sqlalchemy.dialects.oracle.cx_oracle)

        MSSQL_PYODBC (Enum): [mssql+pyodbc](https://docs.sqlalchemy.org/en/14/dialects/mssql.html#module-sqlalchemy.dialects.mssql.pyodbc)
        MSSQL_MXODBC (Enum): [mssql+mxodbc](https://docs.sqlalchemy.org/en/14/dialects/mssql.html#module-sqlalchemy.dialects.mssql.mxodbc)
        MSSQL_PYMSSQL (Enum): [mssql+pymssql](https://docs.sqlalchemy.org/en/14/dialects/mssql.html#module-sqlalchemy.dialects.mssql.pymssql)
    """  # noqa

    POSTGRESQL_PSYCOPG2 = "postgresql+psycopg2"
    POSTGRESQL_PG8000 = "postgresql+pg8000"
    POSTGRESQL_PSYCOPG2CFFI = "postgresql+psycopg2cffi"
    POSTGRESQL_PYPOSTGRESQL = "postgresql+pypostgresql"
    POSTGRESQL_PYGRESQL = "postgresql+pygresql"

    MYSQL_MYSQLDB = "mysql+mysqldb"
    MYSQL_PYMYSQL = "mysql+pymysql"
    MYSQL_MYSQLCONNECTOR = "mysql+mysqlconnector"
    MYSQL_CYMYSQL = "mysql+cymysql"
    MYSQL_OURSQL = "mysql+oursql"
    MYSQL_PYODBC = "mysql+pyodbc"

    SQLITE_PYSQLITE = "sqlite+pysqlite"
    SQLITE_PYSQLCIPHER = "sqlite+pysqlcipher"

    ORACLE_CX_ORACLE = "oracle+cx_oracle"

    MSSQL_PYODBC = "mssql+pyodbc"
    MSSQL_MXODBC = "mssql+mxodbc"
    MSSQL_PYMSSQL = "mssql+pymssql"