Skip to content

Examples Catalog

Below is a list of examples for prefect-dbt.

Cli.Commands Module

Execute dbt debug with a pre-populated profiles.yml.

from prefect import flow
from prefect_dbt.cli.commands import trigger_dbt_cli_command

@flow
def trigger_dbt_cli_command_flow():
    result = trigger_dbt_cli_command("dbt debug")
    return result

trigger_dbt_cli_command_flow()

Execute dbt debug without a pre-populated profiles.yml.

from prefect import flow
from prefect_dbt.cli.credentials import DbtCliProfile
from prefect_dbt.cli.commands import trigger_dbt_cli_command
from prefect_dbt.cli.configs import SnowflakeTargetConfigs
from prefect_snowflake.credentials import SnowflakeCredentials

@flow
def trigger_dbt_cli_command_flow():
    credentials = SnowflakeCredentials(
        user="user",
        password="password",
        account="account.region.aws",
        role="role",
    )
    connector = SnowflakeConnector(
        schema="public",
        database="database",
        warehouse="warehouse",
        credentials=credentials,
    )
    target_configs = SnowflakeTargetConfigs(
        connector=connector
    )
    dbt_cli_profile = DbtCliProfile(
        name="jaffle_shop",
        target="dev",
        target_configs=target_configs,
    )
    result = trigger_dbt_cli_command(
        "dbt debug",
        overwrite_profiles=True,
        dbt_cli_profile=dbt_cli_profile
    )
    return result

trigger_dbt_cli_command_flow()
Load a configured block.
from prefect_dbt import DbtCoreOperation

dbt_op = DbtCoreOperation.load("BLOCK_NAME")

Execute short-lasting dbt debug and list with a custom DbtCliProfile.

from prefect_dbt import DbtCoreOperation, DbtCliProfile
from prefect_dbt.cli.configs import SnowflakeTargetConfigs
from prefect_snowflake import SnowflakeConnector

snowflake_connector = await SnowflakeConnector.load("snowflake-connector")
target_configs = SnowflakeTargetConfigs(connector=snowflake_connector)
dbt_cli_profile = DbtCliProfile(
    name="jaffle_shop",
    target="dev",
    target_configs=target_configs,
)
dbt_init = DbtCoreOperation(
    commands=["dbt debug", "dbt list"],
    dbt_cli_profile=dbt_cli_profile,
    overwrite_profiles=True
)
dbt_init.run()

Execute a longer-lasting dbt run as a context manager.

with DbtCoreOperation(commands=["dbt run"]) as dbt_run:
    dbt_process = dbt_run.trigger()
    # do other things
    dbt_process.wait_for_completion()
    dbt_output = dbt_process.fetch_result()

Cli.Credentials Module

Load stored dbt CLI profile:

from prefect_dbt.cli import DbtCliProfile
dbt_cli_profile = DbtCliProfile.load("BLOCK_NAME").get_profile()

Get a dbt Snowflake profile from DbtCliProfile by using SnowflakeTargetConfigs:

from prefect_dbt.cli import DbtCliProfile
from prefect_dbt.cli.configs import SnowflakeTargetConfigs
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector

credentials = SnowflakeCredentials(
    user="user",
    password="password",
    account="account.region.aws",
    role="role",
)
connector = SnowflakeConnector(
    schema="public",
    database="database",
    warehouse="warehouse",
    credentials=credentials,
)
target_configs = SnowflakeTargetConfigs(
    connector=connector
)
dbt_cli_profile = DbtCliProfile(
    name="jaffle_shop",
    target="dev",
    target_configs=target_configs,
)
profile = dbt_cli_profile.get_profile()

Get a dbt Redshift profile from DbtCliProfile by using generic TargetConfigs:

from prefect_dbt.cli import DbtCliProfile
from prefect_dbt.cli.configs import GlobalConfigs, TargetConfigs

target_configs_extras = dict(
    host="hostname.region.redshift.amazonaws.com",
    user="username",
    password="password1",
    port=5439,
    dbname="analytics",
)
target_configs = TargetConfigs(
    type="redshift",
    schema="schema",
    threads=4,
    extras=target_configs_extras
)
dbt_cli_profile = DbtCliProfile(
    name="jaffle_shop",
    target="dev",
    target_configs=target_configs,
)
profile = dbt_cli_profile.get_profile()

Cloud.Credentials Module

Sending queries via the returned metadata client:

from prefect_dbt import DbtCloudCredentials

credentials_block = DbtCloudCredentials.load("test-account")
metadata_client = credentials_block.get_metadata_client()
query = """
{
    metrics(jobId: 123) {
        uniqueId
        name
        packageName
        tags
        label
        runId
        description
        type
        sql
        timestamp
        timeGrains
        dimensions
        meta
        resourceType
        filters {
            field
            operator
            value
        }
        model {
            name
        }
    }
}
"""
metadata_client.query(query)
# Result:
# {
#   "data": {
#     "metrics": [
#       {
#         "uniqueId": "metric.tpch.total_revenue",
#         "name": "total_revenue",
#         "packageName": "tpch",
#         "tags": [],
#         "label": "Total Revenue ($)",
#         "runId": 108952046,
#         "description": "",
#         "type": "sum",
#         "sql": "net_item_sales_amount",
#         "timestamp": "order_date",
#         "timeGrains": ["day", "week", "month"],
#         "dimensions": ["status_code", "priority_code"],
#         "meta": {},
#         "resourceType": "metric",
#         "filters": [],
#         "model": { "name": "fct_orders" }
#       }
#     ]
#   }
# }
Load stored dbt Cloud credentials:
from prefect_dbt.cloud import DbtCloudCredentials

dbt_cloud_credentials = DbtCloudCredentials.load("BLOCK_NAME")

Use DbtCloudCredentials instance to trigger a job run:

from prefect_dbt.cloud import DbtCloudCredentials

credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

async with dbt_cloud_credentials.get_administrative_client() as client:
    client.trigger_job_run(job_id=1)

Load saved dbt Cloud credentials within a flow:

from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run


@flow
def trigger_dbt_cloud_job_run_flow():
    credentials = DbtCloudCredentials.load("my-dbt-credentials")
    trigger_dbt_cloud_job_run(dbt_cloud_credentials=credentials, job_id=1)

trigger_dbt_cloud_job_run_flow()

Cloud.Jobs Module

from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob
from prefect_dbt.cloud.jobs import run_dbt_cloud_job

@flow
def run_dbt_cloud_job_flow():
    dbt_cloud_credentials = DbtCloudCredentials.load("dbt-token")
    dbt_cloud_job = DbtCloudJob(
        dbt_cloud_credentials=dbt_cloud_credentials, job_id=154217
    )
    return run_dbt_cloud_job(dbt_cloud_job=dbt_cloud_job)

run_dbt_cloud_job_flow()
Trigger a dbt Cloud job run:
from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run


@flow
def trigger_dbt_cloud_job_run_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    trigger_dbt_cloud_job_run(dbt_cloud_credentials=credentials, job_id=1)


trigger_dbt_cloud_job_run_flow()

Trigger a dbt Cloud job run with overrides:

from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run
from prefect_dbt.cloud.models import TriggerJobRunOptions


@flow
def trigger_dbt_cloud_job_run_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    trigger_dbt_cloud_job_run(
        dbt_cloud_credentials=credentials,
        job_id=1,
        options=TriggerJobRunOptions(
            git_branch="staging",
            schema_override="dbt_cloud_pr_123",
            dbt_version_override="0.18.0",
            target_name_override="staging",
            timeout_seconds_override=3000,
            generate_docs_override=True,
            threads_override=8,
            steps_override=[
                "dbt seed",
                "dbt run --fail-fast",
                "dbt test --fail-fast",
            ],
        ),
    )


trigger_dbt_cloud_job_run()
Retry a subset of models in a dbt Cloud job run and wait for completion:
from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import retry_dbt_cloud_job_run_subset_and_wait_for_completion

@flow
def retry_dbt_cloud_job_run_subset_and_wait_for_completion_flow():
    credentials = DbtCloudCredentials.load("MY_BLOCK_NAME")
    retry_dbt_cloud_job_run_subset_and_wait_for_completion(
        dbt_cloud_credentials=credentials,
        run_id=88640123,
    )

retry_dbt_cloud_job_run_subset_and_wait_for_completion_flow()
Load a configured dbt Cloud job block.
from prefect_dbt.cloud import DbtCloudJob

dbt_cloud_job = DbtCloudJob.load("BLOCK_NAME")

Triggers a dbt Cloud job, waits for completion, and fetches the results.

from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob

@flow
def dbt_cloud_job_flow():
    dbt_cloud_credentials = DbtCloudCredentials.load("dbt-token")
    dbt_cloud_job = DbtCloudJob.load(
        dbt_cloud_credentials=dbt_cloud_credentials,
        job_id=154217
    )
    dbt_cloud_job_run = dbt_cloud_job.trigger()
    dbt_cloud_job_run.wait_for_completion()
    dbt_cloud_job_run.fetch_result()
    return dbt_cloud_job_run

dbt_cloud_job_flow()
Trigger a dbt Cloud job and wait for completion as a stand alone flow:
import asyncio
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion

asyncio.run(
    trigger_dbt_cloud_job_run_and_wait_for_completion(
        dbt_cloud_credentials=DbtCloudCredentials(
            api_key="my_api_key",
            account_id=123456789
        ),
        job_id=1
    )
)

Trigger a dbt Cloud job and wait for completion as a sub-flow:

from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion

@flow
def my_flow():
    ...
    run_result = trigger_dbt_cloud_job_run_and_wait_for_completion(
        dbt_cloud_credentials=DbtCloudCredentials(
            api_key="my_api_key",
            account_id=123456789
        ),
        job_id=1
    )
    ...

my_flow()

Trigger a dbt Cloud job with overrides:

import asyncio
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion
from prefect_dbt.cloud.models import TriggerJobRunOptions

asyncio.run(
    trigger_dbt_cloud_job_run_and_wait_for_completion(
        dbt_cloud_credentials=DbtCloudCredentials(
            api_key="my_api_key",
            account_id=123456789
        ),
        job_id=1,
        trigger_job_run_options=TriggerJobRunOptions(
            git_branch="staging",
            schema_override="dbt_cloud_pr_123",
            dbt_version_override="0.18.0",
            target_name_override="staging",
            timeout_seconds_override=3000,
            generate_docs_override=True,
            threads_override=8,
            steps_override=[
                "dbt seed",
                "dbt run --fail-fast",
                "dbt test --fail fast",
            ],
        ),
    )
)
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run, get_run_id


@flow
def trigger_run_and_get_id():
    dbt_cloud_credentials=DbtCloudCredentials(
            api_key="my_api_key",
            account_id=123456789
        )

    triggered_run_data = trigger_dbt_cloud_job_run(
        dbt_cloud_credentials=dbt_cloud_credentials,
        job_id=job_id,
        options=trigger_job_run_options,
    )
    run_id = get_run_id.submit(triggered_run_data)
    return run_id

trigger_run_and_get_id()
Get status of a dbt Cloud job:
from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import get_job

@flow
def get_job_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    return get_job(
        dbt_cloud_credentials=credentials,
        job_id=42
    )

get_job_flow()

Cloud.Runs Module

List artifacts of a dbt Cloud job run:

from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import list_dbt_cloud_run_artifacts

@flow
def list_artifacts_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    return list_dbt_cloud_run_artifacts(
        dbt_cloud_credentials=credentials,
        run_id=42
    )

list_artifacts_flow()
Get status of a dbt Cloud job run:
from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import get_run

@flow
def get_run_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    return get_run(
        dbt_cloud_credentials=credentials,
        run_id=42
    )

get_run_flow()
Get an artifact of a dbt Cloud job run:
from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.runs import get_dbt_cloud_run_artifact

@flow
def get_artifact_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    return get_dbt_cloud_run_artifact(
        dbt_cloud_credentials=credentials,
        run_id=42,
        path="manifest.json"
    )

get_artifact_flow()

Get an artifact of a dbt Cloud job run and write it to a file:

import json

from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import get_dbt_cloud_run_artifact

@flow
def get_artifact_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    get_run_artifact_result = get_dbt_cloud_run_artifact(
        dbt_cloud_credentials=credentials,
        run_id=42,
        path="manifest.json"
    )

    with open("manifest.json", "w") as file:
        json.dump(get_run_artifact_result, file)

get_artifact_flow()

Cloud.Utils Module

List projects for an account:

from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.utils import call_dbt_cloud_administrative_api_endpoint

@flow
def get_projects_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    result = call_dbt_cloud_administrative_api_endpoint(
        dbt_cloud_credentials=credentials,
        path="/projects/",
        http_method="GET",
    )
    return result["data"]

get_projects_flow()

Create a new job:

from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.utils import call_dbt_cloud_administrative_api_endpoint


@flow
def create_job_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    result = call_dbt_cloud_administrative_api_endpoint(
        dbt_cloud_credentials=credentials,
        path="/jobs/",
        http_method="POST",
        json={
            "id": None,
            "account_id": 123456789,
            "project_id": 100,
            "environment_id": 10,
            "name": "Nightly run",
            "dbt_version": None,
            "triggers": {"github_webhook": True, "schedule": True},
            "execute_steps": ["dbt run", "dbt test", "dbt source snapshot-freshness"],
            "settings": {"threads": 4, "target_name": "prod"},
            "state": 1,
            "schedule": {
                "date": {"type": "every_day"},
                "time": {"type": "every_hour", "interval": 1},
            },
        },
    )
    return result["data"]

create_job_flow()