Examples Catalog
Below is a list of examples for prefect-dbt
.
Cli.Commands Module
Execute dbt debug
with a pre-populated profiles.yml.
from prefect import flow
from prefect_dbt.cli.commands import trigger_dbt_cli_command
@flow
def trigger_dbt_cli_command_flow():
result = trigger_dbt_cli_command("dbt debug")
return result
trigger_dbt_cli_command_flow()
Execute dbt debug
without a pre-populated profiles.yml.
from prefect import flow
from prefect_dbt.cli.credentials import DbtCliProfile
from prefect_dbt.cli.commands import trigger_dbt_cli_command
from prefect_dbt.cli.configs import SnowflakeTargetConfigs
from prefect_snowflake.credentials import SnowflakeCredentials
@flow
def trigger_dbt_cli_command_flow():
credentials = SnowflakeCredentials(
user="user",
password="password",
account="account.region.aws",
role="role",
)
connector = SnowflakeConnector(
schema="public",
database="database",
warehouse="warehouse",
credentials=credentials,
)
target_configs = SnowflakeTargetConfigs(
connector=connector
)
dbt_cli_profile = DbtCliProfile(
name="jaffle_shop",
target="dev",
target_configs=target_configs,
)
result = trigger_dbt_cli_command(
"dbt debug",
overwrite_profiles=True,
dbt_cli_profile=dbt_cli_profile
)
return result
trigger_dbt_cli_command_flow()
from prefect_dbt import DbtCoreOperation
dbt_op = DbtCoreOperation.load("BLOCK_NAME")
Execute short-lasting dbt debug and list with a custom DbtCliProfile.
from prefect_dbt import DbtCoreOperation, DbtCliProfile
from prefect_dbt.cli.configs import SnowflakeTargetConfigs
from prefect_snowflake import SnowflakeConnector
snowflake_connector = await SnowflakeConnector.load("snowflake-connector")
target_configs = SnowflakeTargetConfigs(connector=snowflake_connector)
dbt_cli_profile = DbtCliProfile(
name="jaffle_shop",
target="dev",
target_configs=target_configs,
)
dbt_init = DbtCoreOperation(
commands=["dbt debug", "dbt list"],
dbt_cli_profile=dbt_cli_profile,
overwrite_profiles=True
)
dbt_init.run()
Execute a longer-lasting dbt run as a context manager.
with DbtCoreOperation(commands=["dbt run"]) as dbt_run:
dbt_process = dbt_run.trigger()
# do other things
dbt_process.wait_for_completion()
dbt_output = dbt_process.fetch_result()
Cli.Credentials Module
Load stored dbt CLI profile:
from prefect_dbt.cli import DbtCliProfile
dbt_cli_profile = DbtCliProfile.load("BLOCK_NAME").get_profile()
Get a dbt Snowflake profile from DbtCliProfile by using SnowflakeTargetConfigs:
from prefect_dbt.cli import DbtCliProfile
from prefect_dbt.cli.configs import SnowflakeTargetConfigs
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector
credentials = SnowflakeCredentials(
user="user",
password="password",
account="account.region.aws",
role="role",
)
connector = SnowflakeConnector(
schema="public",
database="database",
warehouse="warehouse",
credentials=credentials,
)
target_configs = SnowflakeTargetConfigs(
connector=connector
)
dbt_cli_profile = DbtCliProfile(
name="jaffle_shop",
target="dev",
target_configs=target_configs,
)
profile = dbt_cli_profile.get_profile()
Get a dbt Redshift profile from DbtCliProfile by using generic TargetConfigs:
from prefect_dbt.cli import DbtCliProfile
from prefect_dbt.cli.configs import GlobalConfigs, TargetConfigs
target_configs_extras = dict(
host="hostname.region.redshift.amazonaws.com",
user="username",
password="password1",
port=5439,
dbname="analytics",
)
target_configs = TargetConfigs(
type="redshift",
schema="schema",
threads=4,
extras=target_configs_extras
)
dbt_cli_profile = DbtCliProfile(
name="jaffle_shop",
target="dev",
target_configs=target_configs,
)
profile = dbt_cli_profile.get_profile()
Cloud.Credentials Module
Sending queries via the returned metadata client:
from prefect_dbt import DbtCloudCredentials
credentials_block = DbtCloudCredentials.load("test-account")
metadata_client = credentials_block.get_metadata_client()
query = """
{
metrics(jobId: 123) {
uniqueId
name
packageName
tags
label
runId
description
type
sql
timestamp
timeGrains
dimensions
meta
resourceType
filters {
field
operator
value
}
model {
name
}
}
}
"""
metadata_client.query(query)
# Result:
# {
# "data": {
# "metrics": [
# {
# "uniqueId": "metric.tpch.total_revenue",
# "name": "total_revenue",
# "packageName": "tpch",
# "tags": [],
# "label": "Total Revenue ($)",
# "runId": 108952046,
# "description": "",
# "type": "sum",
# "sql": "net_item_sales_amount",
# "timestamp": "order_date",
# "timeGrains": ["day", "week", "month"],
# "dimensions": ["status_code", "priority_code"],
# "meta": {},
# "resourceType": "metric",
# "filters": [],
# "model": { "name": "fct_orders" }
# }
# ]
# }
# }
from prefect_dbt.cloud import DbtCloudCredentials
dbt_cloud_credentials = DbtCloudCredentials.load("BLOCK_NAME")
Use DbtCloudCredentials instance to trigger a job run:
from prefect_dbt.cloud import DbtCloudCredentials
credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)
async with dbt_cloud_credentials.get_administrative_client() as client:
client.trigger_job_run(job_id=1)
Load saved dbt Cloud credentials within a flow:
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run
@flow
def trigger_dbt_cloud_job_run_flow():
credentials = DbtCloudCredentials.load("my-dbt-credentials")
trigger_dbt_cloud_job_run(dbt_cloud_credentials=credentials, job_id=1)
trigger_dbt_cloud_job_run_flow()
Cloud.Jobs Module
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob
from prefect_dbt.cloud.jobs import run_dbt_cloud_job
@flow
def run_dbt_cloud_job_flow():
dbt_cloud_credentials = DbtCloudCredentials.load("dbt-token")
dbt_cloud_job = DbtCloudJob(
dbt_cloud_credentials=dbt_cloud_credentials, job_id=154217
)
return run_dbt_cloud_job(dbt_cloud_job=dbt_cloud_job)
run_dbt_cloud_job_flow()
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run
@flow
def trigger_dbt_cloud_job_run_flow():
credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)
trigger_dbt_cloud_job_run(dbt_cloud_credentials=credentials, job_id=1)
trigger_dbt_cloud_job_run_flow()
Trigger a dbt Cloud job run with overrides:
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run
from prefect_dbt.cloud.models import TriggerJobRunOptions
@flow
def trigger_dbt_cloud_job_run_flow():
credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)
trigger_dbt_cloud_job_run(
dbt_cloud_credentials=credentials,
job_id=1,
options=TriggerJobRunOptions(
git_branch="staging",
schema_override="dbt_cloud_pr_123",
dbt_version_override="0.18.0",
target_name_override="staging",
timeout_seconds_override=3000,
generate_docs_override=True,
threads_override=8,
steps_override=[
"dbt seed",
"dbt run --fail-fast",
"dbt test --fail-fast",
],
),
)
trigger_dbt_cloud_job_run()
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import retry_dbt_cloud_job_run_subset_and_wait_for_completion
@flow
def retry_dbt_cloud_job_run_subset_and_wait_for_completion_flow():
credentials = DbtCloudCredentials.load("MY_BLOCK_NAME")
retry_dbt_cloud_job_run_subset_and_wait_for_completion(
dbt_cloud_credentials=credentials,
run_id=88640123,
)
retry_dbt_cloud_job_run_subset_and_wait_for_completion_flow()
from prefect_dbt.cloud import DbtCloudJob
dbt_cloud_job = DbtCloudJob.load("BLOCK_NAME")
Triggers a dbt Cloud job, waits for completion, and fetches the results.
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob
@flow
def dbt_cloud_job_flow():
dbt_cloud_credentials = DbtCloudCredentials.load("dbt-token")
dbt_cloud_job = DbtCloudJob.load(
dbt_cloud_credentials=dbt_cloud_credentials,
job_id=154217
)
dbt_cloud_job_run = dbt_cloud_job.trigger()
dbt_cloud_job_run.wait_for_completion()
dbt_cloud_job_run.fetch_result()
return dbt_cloud_job_run
dbt_cloud_job_flow()
import asyncio
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion
asyncio.run(
trigger_dbt_cloud_job_run_and_wait_for_completion(
dbt_cloud_credentials=DbtCloudCredentials(
api_key="my_api_key",
account_id=123456789
),
job_id=1
)
)
Trigger a dbt Cloud job and wait for completion as a sub-flow:
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion
@flow
def my_flow():
...
run_result = trigger_dbt_cloud_job_run_and_wait_for_completion(
dbt_cloud_credentials=DbtCloudCredentials(
api_key="my_api_key",
account_id=123456789
),
job_id=1
)
...
my_flow()
Trigger a dbt Cloud job with overrides:
import asyncio
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion
from prefect_dbt.cloud.models import TriggerJobRunOptions
asyncio.run(
trigger_dbt_cloud_job_run_and_wait_for_completion(
dbt_cloud_credentials=DbtCloudCredentials(
api_key="my_api_key",
account_id=123456789
),
job_id=1,
trigger_job_run_options=TriggerJobRunOptions(
git_branch="staging",
schema_override="dbt_cloud_pr_123",
dbt_version_override="0.18.0",
target_name_override="staging",
timeout_seconds_override=3000,
generate_docs_override=True,
threads_override=8,
steps_override=[
"dbt seed",
"dbt run --fail-fast",
"dbt test --fail fast",
],
),
)
)
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run, get_run_id
@flow
def trigger_run_and_get_id():
dbt_cloud_credentials=DbtCloudCredentials(
api_key="my_api_key",
account_id=123456789
)
triggered_run_data = trigger_dbt_cloud_job_run(
dbt_cloud_credentials=dbt_cloud_credentials,
job_id=job_id,
options=trigger_job_run_options,
)
run_id = get_run_id.submit(triggered_run_data)
return run_id
trigger_run_and_get_id()
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import get_job
@flow
def get_job_flow():
credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)
return get_job(
dbt_cloud_credentials=credentials,
job_id=42
)
get_job_flow()
Cloud.Runs Module
List artifacts of a dbt Cloud job run:
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import list_dbt_cloud_run_artifacts
@flow
def list_artifacts_flow():
credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)
return list_dbt_cloud_run_artifacts(
dbt_cloud_credentials=credentials,
run_id=42
)
list_artifacts_flow()
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import get_run
@flow
def get_run_flow():
credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)
return get_run(
dbt_cloud_credentials=credentials,
run_id=42
)
get_run_flow()
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.runs import get_dbt_cloud_run_artifact
@flow
def get_artifact_flow():
credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)
return get_dbt_cloud_run_artifact(
dbt_cloud_credentials=credentials,
run_id=42,
path="manifest.json"
)
get_artifact_flow()
Get an artifact of a dbt Cloud job run and write it to a file:
import json
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import get_dbt_cloud_run_artifact
@flow
def get_artifact_flow():
credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)
get_run_artifact_result = get_dbt_cloud_run_artifact(
dbt_cloud_credentials=credentials,
run_id=42,
path="manifest.json"
)
with open("manifest.json", "w") as file:
json.dump(get_run_artifact_result, file)
get_artifact_flow()
Cloud.Utils Module
List projects for an account:
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.utils import call_dbt_cloud_administrative_api_endpoint
@flow
def get_projects_flow():
credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)
result = call_dbt_cloud_administrative_api_endpoint(
dbt_cloud_credentials=credentials,
path="/projects/",
http_method="GET",
)
return result["data"]
get_projects_flow()
Create a new job:
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.utils import call_dbt_cloud_administrative_api_endpoint
@flow
def create_job_flow():
credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)
result = call_dbt_cloud_administrative_api_endpoint(
dbt_cloud_credentials=credentials,
path="/jobs/",
http_method="POST",
json={
"id": None,
"account_id": 123456789,
"project_id": 100,
"environment_id": 10,
"name": "Nightly run",
"dbt_version": None,
"triggers": {"github_webhook": True, "schedule": True},
"execute_steps": ["dbt run", "dbt test", "dbt source snapshot-freshness"],
"settings": {"threads": 4, "target_name": "prod"},
"state": 1,
"schedule": {
"date": {"type": "every_day"},
"time": {"type": "every_hour", "interval": 1},
},
},
)
return result["data"]
create_job_flow()