Examples Catalog
Below is a list of examples for prefect-duckdb
.
Database Module
from prefect_duckdb.database import DuckDBConnector
with DuckDBConnector.load("BLOCK_NAME") as conn:
conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
conn.execute("INSERT INTO test_table VALUES (1, 'one')")
result = conn.fetch_df("SELECT * FROM test_table")
print(result)
from prefect_duckdb.database import DuckDBConnector
with DuckDBConnector.load("BLOCK_NAME") as conn:
conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
conn.execute_many(
"INSERT INTO test_table VALUES (?, ?)",
parameters=[[1, "one"], [2, "two"], [3, "three"]]
)
result = conn.fetch_many("SELECT * FROM test_table", size=2)
print(result)
from prefect_duckdb.database import DuckDBConnector
with DuckDBConnector.load("BLOCK_NAME") as conn:
conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
conn.execute("INSERT INTO test_table VALUES (1, 'one')")
result = conn.fetch_arrow("SELECT * FROM test_table")
print(result)
from prefect_duckdb.database import DuckDBConnector
with DuckDBConnector.load("BLOCK_NAME") as conn:
conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
conn.execute("INSERT INTO test_table VALUES (1, 'one')")
result = conn.fetch_one("SELECT * FROM test_table")
print(result)
from prefect_duckdb.database import DuckDBConnector
with DuckDBConnector.load("BLOCK_NAME") as conn:
conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
conn.execute_many(
"INSERT INTO test_table VALUES (?, ?)",
parameters=[[1, "one"], [2, "two"], [3, "three"]]
)
from prefect_duckdb.database import DuckDBConnector
with DuckDBConnector.load("BLOCK_NAME") as conn:
conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
conn.execute("INSERT INTO test_table VALUES (1, 'one')")
result = conn.fetch_all("SELECT * FROM test_table")
print(result)
from prefect_duckdb.database import DuckDBConnector
with DuckDBConnector.load("BLOCK_NAME") as conn:
conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
conn.execute("INSERT INTO test_table VALUES (1, 'one')")
result = conn.fetch_numpy("SELECT * FROM test_table")
print(result)
from prefect_duckdb.database import DuckDBConnector
with DuckDBConnector.load("BLOCK_NAME") as conn:
conn.sql(
"CREATE TABLE test_table (i INTEGER, j STRING)"
)
from prefect_duckdb.database import DuckDBConnector
duckdb_connector = DuckDBConnector.load("BLOCK_NAME")
with duckdb_connector as conn:
conn.execute("CREATE TABLE test_table (i INTEGER, j STRING);")
...
from prefect_duckdb.database import DuckDBConnector
from prefect_aws import AwsCredentials
aws_credentials_block = AwsCredentials.load("BLOCK_NAME")
connector = DuckDBConnector().load("BLOCK_NAME")
connector.get_connection()
connector.create_secret(
name="test_secret",
secret_type="S3",
key_id=aws_credentials_block.access_key,
secret=aws_credentials_block.secret_access_key,
region=aws_credentials_block.region_name
)
connector.execute("SELECT count(*) FROM 's3://<bucket>/<file>';")
from prefect_duckdb.database import DuckDBConnector
with DuckDBConnector.load("BLOCK_NAME") as conn:
conn.execute(
"CREATE TABLE test_table (i INTEGER, j STRING)"
)
from prefect import Flow
from prefect_duckdb.database import DuckDBConnector, duckdb_query
@flow
def duckdb_query_flow():
duckdb_connector = DuckDBConnector.load("BLOCK_NAME")
result = duckdb_query("SELECT * FROM test_table", duckdb_connector)
print(result)
duckdb_query_flow()
from prefect_duckdb.database import DuckDBConnector
duckdb_connector = DuckDBConnector.load("BLOCK_NAME"):
Insert data into database and fetch results.
from prefect_duckdb.database import DuckDBConnector
with DuckDBConnector.load("BLOCK_NAME") as conn:
conn.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
conn.execute_many(
"INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
results = conn.fetch_all(
"SELECT * FROM customers WHERE address = %(address)s",
parameters={"address": "Space"}
)
print(results)