Skip to content

Examples Catalog

Below is a list of examples for prefect-duckdb.

Database Module

from prefect_duckdb.database import DuckDBConnector

with DuckDBConnector.load("BLOCK_NAME") as conn:
    conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
    conn.execute("INSERT INTO test_table VALUES (1, 'one')")
    result = conn.fetch_df("SELECT * FROM test_table")
    print(result)
from prefect_duckdb.database import DuckDBConnector

with DuckDBConnector.load("BLOCK_NAME") as conn:
    conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
    conn.execute_many(
        "INSERT INTO test_table VALUES (?, ?)",
        parameters=[[1, "one"], [2, "two"], [3, "three"]]
    )
    result = conn.fetch_many("SELECT * FROM test_table", size=2)
    print(result)
from prefect_duckdb.database import DuckDBConnector

with DuckDBConnector.load("BLOCK_NAME") as conn:
    conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
    conn.execute("INSERT INTO test_table VALUES (1, 'one')")
    result = conn.fetch_arrow("SELECT * FROM test_table")
    print(result)
from prefect_duckdb.database import DuckDBConnector

with DuckDBConnector.load("BLOCK_NAME") as conn:
    conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
    conn.execute("INSERT INTO test_table VALUES (1, 'one')")
    result = conn.fetch_one("SELECT * FROM test_table")
    print(result)
    from prefect_duckdb.database import DuckDBConnector

    with DuckDBConnector.load("BLOCK_NAME") as conn:
        conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
        conn.execute_many(
            "INSERT INTO test_table VALUES (?, ?)",
            parameters=[[1, "one"], [2, "two"], [3, "three"]]
        )
from prefect_duckdb.database import DuckDBConnector

with DuckDBConnector.load("BLOCK_NAME") as conn:
    conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
    conn.execute("INSERT INTO test_table VALUES (1, 'one')")
    result = conn.fetch_all("SELECT * FROM test_table")
    print(result)
from prefect_duckdb.database import DuckDBConnector

with DuckDBConnector.load("BLOCK_NAME") as conn:
    conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
    conn.execute("INSERT INTO test_table VALUES (1, 'one')")
    result = conn.fetch_numpy("SELECT * FROM test_table")
    print(result)
from prefect_duckdb.database import DuckDBConnector

with DuckDBConnector.load("BLOCK_NAME") as conn:
    conn.sql(
        "CREATE TABLE test_table (i INTEGER, j STRING)"
    )
from prefect_duckdb.database import DuckDBConnector

duckdb_connector = DuckDBConnector.load("BLOCK_NAME")

with duckdb_connector as conn:
    conn.execute("CREATE TABLE test_table (i INTEGER, j STRING);")
    ...
from prefect_duckdb.database import DuckDBConnector
from prefect_aws import AwsCredentials

aws_credentials_block = AwsCredentials.load("BLOCK_NAME")
connector = DuckDBConnector().load("BLOCK_NAME")
connector.get_connection()
connector.create_secret(
    name="test_secret",
    secret_type="S3",
    key_id=aws_credentials_block.access_key,
    secret=aws_credentials_block.secret_access_key,
    region=aws_credentials_block.region_name
)
connector.execute("SELECT count(*) FROM 's3://<bucket>/<file>';")
from prefect_duckdb.database import DuckDBConnector

with DuckDBConnector.load("BLOCK_NAME") as conn:
    conn.execute(
        "CREATE TABLE test_table (i INTEGER, j STRING)"
    )
from prefect import Flow
from prefect_duckdb.database import DuckDBConnector, duckdb_query

@flow
def duckdb_query_flow():
    duckdb_connector = DuckDBConnector.load("BLOCK_NAME")

    result = duckdb_query("SELECT * FROM test_table", duckdb_connector)
    print(result)

duckdb_query_flow()
Load stored DuckDB connector as a context manager:
from prefect_duckdb.database import DuckDBConnector

duckdb_connector = DuckDBConnector.load("BLOCK_NAME"):

Insert data into database and fetch results.

from prefect_duckdb.database import DuckDBConnector

with DuckDBConnector.load("BLOCK_NAME") as conn:
    conn.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
    conn.execute_many(
        "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
        parameters=[
            {"name": "Ford", "address": "Highway 42"},
            {"name": "Unknown", "address": "Space"},
            {"name": "Me", "address": "Myway 88"},
        ],
    )
    results = conn.fetch_all(
        "SELECT * FROM customers WHERE address = %(address)s",
        parameters={"address": "Space"}
    )
    print(results)