Examples Catalog
Below is a list of examples for prefect-sqlalchemy
.
Credentials Module
Create an asynchronous engine to PostgreSQL using URL params.
from prefect import flow
from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver
@flow
def sqlalchemy_credentials_flow():
sqlalchemy_credentials = DatabaseCredentials(
driver=AsyncDriver.POSTGRESQL_ASYNCPG,
username="prefect",
password="prefect_password",
database="postgres"
)
print(sqlalchemy_credentials.get_engine())
sqlalchemy_credentials_flow()
Create a synchronous engine to Snowflake using the url
kwarg.
from prefect import flow
from prefect_sqlalchemy import DatabaseCredentials, AsyncDriver
@flow
def sqlalchemy_credentials_flow():
url = (
"snowflake://<user_login_name>:<password>"
"@<account_identifier>/<database_name>"
"?warehouse=<warehouse_name>"
)
sqlalchemy_credentials = DatabaseCredentials(url=url)
print(sqlalchemy_credentials.get_engine())
sqlalchemy_credentials_flow()
Database Module
Create a table and insert two rows into it.
from prefect_sqlalchemy import SqlAlchemyConnector
with SqlAlchemyConnector.load("MY_BLOCK") as database:
database.execute("CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);")
database.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
from prefect_sqlalchemy import SqlAlchemyConnector
with SqlAlchemyConnector.load("MY_BLOCK") as database:
database.execute("CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);")
database.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
results = database.fetch_many("SELECT * FROM customers", size=2)
print(results)
results = database.fetch_many("SELECT * FROM customers", size=2)
print(results)
from prefect_sqlalchemy import SqlalchemyConnector
sqlalchemy_connector = SqlAlchemyConnector.load("BLOCK_NAME")
engine = sqlalchemy_connector.get_client(client_type="engine")
Create a context managed connection.
from prefect_sqlalchemy import SqlalchemyConnector
sqlalchemy_connector = SqlAlchemyConnector.load("BLOCK_NAME")
with sqlalchemy_connector.get_client(client_type="connection") as conn:
...
from prefect_sqlalchemy import SqlAlchemyConnector
with SqlAlchemyConnector.load("MY_BLOCK") as database:
database.execute("CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);")
database.execute(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
parameters={"name": "Marvin", "address": "Highway 42"},
)
fetch_*
methods return new results.
import asyncio
from prefect_sqlalchemy import SqlAlchemyConnector
async def example_run():
async with SqlAlchemyConnector.load("MY_BLOCK") as database:
results = await database.fetch_one("SELECT * FROM customers")
await database.reset_async_connections()
results = await database.fetch_one("SELECT * FROM customers")
asyncio.run(example_run())
from prefect_sqlalchemy import SqlAlchemyConnector
database_block = SqlAlchemyConnector.load("BLOCK_NAME")
with database_block:
...
Create table named customers and insert values; then fetch the first 10 rows.
from prefect_sqlalchemy import (
SqlAlchemyConnector, SyncDriver, ConnectionComponents
)
with SqlAlchemyConnector(
connection_info=ConnectionComponents(
driver=SyncDriver.SQLITE_PYSQLITE,
database="prefect.db"
)
) as database:
database.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);",
)
for i in range(1, 42):
database.execute(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
parameters={"name": "Marvin", "address": f"Highway {i}"},
)
results = database.fetch_many(
"SELECT * FROM customers WHERE name = :name;",
parameters={"name": "Marvin"},
size=10
)
print(results)
from prefect_sqlalchemy import SqlAlchemyConnector
with SqlAlchemyConnector.load("MY_BLOCK") as database:
database.execute("CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);")
database.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
results = database.fetch_all("SELECT * FROM customers WHERE name = :name", parameters={"name": "Me"})
from prefect_sqlalchemy import SqlAlchemyConnector
sqlalchemy_connector = SqlAlchemyConnector.load("BLOCK_NAME")
with sqlalchemy_connector.get_connection(begin=False) as connection:
connection.execute("SELECT * FROM table LIMIT 1;")
Create an asynchronous connection as a context-managed transacation.
import asyncio
from prefect_sqlalchemy import SqlAlchemyConnector
sqlalchemy_connector = SqlAlchemyConnector.load("BLOCK_NAME")
async with sqlalchemy_connector.get_connection(begin=False) as connection:
asyncio.run(connection.execute("SELECT * FROM table LIMIT 1;"))
fetch_*
methods return new results.
from prefect_sqlalchemy import SqlAlchemyConnector
with SqlAlchemyConnector.load("MY_BLOCK") as database:
results = database.fetch_one("SELECT * FROM customers")
database.reset_connections()
results = database.fetch_one("SELECT * FROM customers")
from prefect_sqlalchemy import SqlAlchemyConnector
with SqlAlchemyConnector.load("MY_BLOCK") as database:
database.execute("CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);")
database.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
results = True
while results:
results = database.fetch_one("SELECT * FROM customers")
print(results)
from prefect import flow
from prefect_sqlalchemy import (
SqlAlchemyConnector, ConnectionComponents, AsyncDriver
)
@flow
def sqlalchemy_credentials_flow():
sqlalchemy_credentials = SqlAlchemyConnector(
connection_info=ConnectionComponents(
driver=AsyncDriver.POSTGRESQL_ASYNCPG,
username="prefect",
password="prefect_password",
database="postgres"
)
)
print(sqlalchemy_credentials.get_engine())
sqlalchemy_credentials_flow()
Create a synchronous engine to Snowflake using the url
kwarg.
from prefect import flow
from prefect_sqlalchemy import SqlAlchemyConnector, AsyncDriver
@flow
def sqlalchemy_credentials_flow():
url = (
"snowflake://<user_login_name>:<password>"
"@<account_identifier>/<database_name>"
"?warehouse=<warehouse_name>"
)
sqlalchemy_credentials = SqlAlchemyConnector(url=url)
print(sqlalchemy_credentials.get_engine())
sqlalchemy_credentials_flow()