Skip to content

Examples Catalog

Below is a list of examples for prefect-snowflake.

Credentials Module

Get Snowflake connection with only block configuration:

from prefect_snowflake import SnowflakeCredentials

snowflake_credentials_block = SnowflakeCredentials.load("BLOCK_NAME")

connection = snowflake_credentials_block.get_client()

Get Snowflake connector scoped to a specified database:

from prefect_snowflake import SnowflakeCredentials

snowflake_credentials_block = SnowflakeCredentials.load("BLOCK_NAME")

connection = snowflake_credentials_block.get_client(database="my_database")

Database Module

Repeatedly fetch two rows from the database where address is Highway 42.

from prefect_snowflake.database import SnowflakeConnector

with SnowflakeConnector.load("BLOCK_NAME") as conn:
    conn.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
    conn.execute_many(
        "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
        seq_of_parameters=[
            {"name": "Marvin", "address": "Highway 42"},
            {"name": "Ford", "address": "Highway 42"},
            {"name": "Unknown", "address": "Highway 42"},
            {"name": "Me", "address": "Highway 42"},
        ],
    )
    result = conn.fetch_many(
        "SELECT * FROM customers WHERE address = %(address)s",
        parameters={"address": "Highway 42"},
        size=2
    )
    print(result)  # Marvin, Ford
    result = conn.fetch_many(
        "SELECT * FROM customers WHERE address = %(address)s",
        parameters={"address": "Highway 42"},
        size=2
    )
    print(result)  # Unknown, Me
Fetch all rows from the database where address is Highway 42.
from prefect_snowflake.database import SnowflakeConnector

with SnowflakeConnector.load("BLOCK_NAME") as conn:
    conn.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
    conn.execute_many(
        "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
        seq_of_parameters=[
            {"name": "Marvin", "address": "Highway 42"},
            {"name": "Ford", "address": "Highway 42"},
            {"name": "Unknown", "address": "Highway 42"},
            {"name": "Me", "address": "Myway 88"},
        ],
    )
    result = conn.fetch_all(
        "SELECT * FROM customers WHERE address = %(address)s",
        parameters={"address": "Highway 42"},
    )
    print(result)  # Marvin, Ford, Unknown
Create table and insert three rows into it.
from prefect_snowflake.database import SnowflakeConnector

with SnowflakeConnector.load("BLOCK_NAME") as conn:
    conn.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
    conn.execute_many(
        "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
        seq_of_parameters=[
            {"name": "Marvin", "address": "Highway 42"},
            {"name": "Ford", "address": "Highway 42"},
            {"name": "Unknown", "address": "Space"},
        ],
    )
Reset the cursors to refresh cursor position.
from prefect_snowflake.database import SnowflakeConnector

with SnowflakeConnector.load("BLOCK_NAME") as conn:
    conn.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
    conn.execute_many(
        "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
        seq_of_parameters=[
            {"name": "Ford", "address": "Highway 42"},
            {"name": "Unknown", "address": "Space"},
            {"name": "Me", "address": "Myway 88"},
        ],
    )
    print(conn.fetch_one("SELECT * FROM customers"))  # Ford
    conn.reset_cursors()
    print(conn.fetch_one("SELECT * FROM customers"))  # should be Ford again
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector

snowflake_credentials = SnowflakeCredentials(
    account="account",
    user="user",
    password="password",
)
snowflake_connector = SnowflakeConnector(
    database="database",
    warehouse="warehouse",
    schema="schema",
    credentials=snowflake_credentials
)
with snowflake_connector.get_connection() as connection:
    ...
Load stored Snowflake connector as a context manager:
from prefect_snowflake.database import SnowflakeConnector

snowflake_connector = SnowflakeConnector.load("BLOCK_NAME"):

Insert data into database and fetch results.

from prefect_snowflake.database import SnowflakeConnector

with SnowflakeConnector.load("BLOCK_NAME") as conn:
    conn.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
    conn.execute_many(
        "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
        seq_of_parameters=[
            {"name": "Ford", "address": "Highway 42"},
            {"name": "Unknown", "address": "Space"},
            {"name": "Me", "address": "Myway 88"},
        ],
    )
    results = conn.fetch_all(
        "SELECT * FROM customers WHERE address = %(address)s",
        parameters={"address": "Space"}
    )
    print(results)
Fetch one row from the database where address is Space.
from prefect_snowflake.database import SnowflakeConnector

with SnowflakeConnector.load("BLOCK_NAME") as conn:
    conn.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
    conn.execute_many(
        "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
        seq_of_parameters=[
            {"name": "Ford", "address": "Highway 42"},
            {"name": "Unknown", "address": "Space"},
            {"name": "Me", "address": "Myway 88"},
        ],
    )
    result = conn.fetch_one(
        "SELECT * FROM customers WHERE address = %(address)s",
        parameters={"address": "Space"}
    )
    print(result)
Create table named customers with two columns, name and address.
from prefect_snowflake.database import SnowflakeConnector

with SnowflakeConnector.load("BLOCK_NAME") as conn:
    conn.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )