Skip to content

prefect-dbt


PyPI

prefect-dbt is a collection of Prefect integrations for working with dbt with your Prefect flows.

Getting Started

Trigger a dbt Cloud job and wait for completion

from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import trigger_dbt_cloud_job_run_and_wait_for_completion

@flow
def run_dbt_job_flow():
    run_result = trigger_dbt_cloud_job_run_and_wait_for_completion(
        dbt_cloud_credentials=DbtCloudCredentials(
            api_key="my_api_key",
            account_id=123456789
        ),
        job_id=1
    )

run_dbt_job_flow()

Execute a dbt CLI command

from prefect import flow
from prefect_dbt.cli.commands import trigger_dbt_cli_command

@flow
def trigger_dbt_cli_command_flow() -> str:
    result = trigger_dbt_cli_command("dbt debug")
    return result # Returns the last line in the CLI output

trigger_dbt_cli_command_flow()

Execute a dbt CLI command without a pre-populated profiles.yml

from prefect import flow
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector

from prefect_dbt.cli.credentials import DbtCliProfile
from prefect_dbt.cli.commands import trigger_dbt_cli_command
from prefect_dbt.cli.configs import SnowflakeTargetConfigs

@flow
def trigger_dbt_cli_command_flow():
    connector = SnowflakeConnector(
        schema="public",
        database="database",
        warehouse="warehouse",
        credentials=SnowflakeCredentials(
            user="user",
            password="password",
            account="account.region.aws",
            role="role",
        ),
    )
    target_configs = SnowflakeTargetConfigs(
        connector=connector
    )
    dbt_cli_profile = DbtCliProfile(
        name="jaffle_shop",
        target="dev",
        target_configs=target_configs,
    )
    result = trigger_dbt_cli_command(
        "dbt debug",
        overwrite_profiles=True,
        dbt_cli_profile=dbt_cli_profile
    )
    return result

trigger_dbt_cli_command_flow()

Idempotent way to execute multiple dbt CLI commands without prepopulated profiles.yml

from prefect import flow

from prefect_dbt.cli.credentials import DbtCliProfile
from prefect_dbt.cli.commands import trigger_dbt_cli_command

@flow
def trigger_dbt_cli_commands_flow():
    dbt_cli_profile = DbtCliProfile.load("MY_BLOCK_NAME")

    trigger_kwargs = dict(
        profiles_dir=".",
        overwrite_profiles=True,
        dbt_cli_profile=dbt_cli_profile,
    )

    trigger_dbt_cli_command(
        "dbt deps",
        **trigger_kwargs
    )

    result = trigger_dbt_cli_command(
        "dbt debug",
        **trigger_kwargs
    )

    return result


trigger_dbt_cli_commands_flow()

Resources

If you need help getting started with or using dbt, please consult the dbt documentation.

Installation

Install prefect-dbt with pip:

pip install prefect-dbt

Some dbt CLI profiles require additional installation; for example Databricks:

pip install dbt-databricks

Requires an installation of Python 3.7+.

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the Prefect documentation.

Saving dbt Cloud credentials to block

Note, to use the load method on Blocks, you must already have a block document saved through code or saved through the UI.

  1. Head over to your dbt Cloud profile.
  2. Login to your dbt Cloud account.
  3. Scroll down to "API" or click "API Access" on the sidebar.
  4. Copy the API Key.
  5. Create a short script, replacing the placeholders (or do so in the UI).
from prefect_dbt.cloud import DbtCloudCredentials
DbtCloudCredentials(
    api_key="API_KEY_PLACEHOLDER",
    account_id="ACCOUNT_ID_PLACEHOLDER"
).save("BLOCK_NAME_PLACEHOLDER")

Congrats! You can now easily load the saved block, which holds your credentials:

from prefect_dbt.cloud import DbtCloudCredentials
DbtCloudCredentials.load("BLOCK_NAME_PLACEHOLDER")

To view and edit the blocks on Prefect UI:

prefect block register -m prefect_dbt

Feedback

If you encounter any bugs while using prefect-dbt, feel free to open an issue in the prefect-dbt repository.

If you have any questions or issues while using prefect-dbt, you can find help in either the Prefect Discourse forum or the Prefect Slack community.

Feel free to star or watch prefect-dbt for updates too!

Contributing

If you'd like to help contribute to fix an issue or add a feature to prefect-dbt, please propose changes through a pull request from a fork of the repository.

Here are the steps:

  1. Fork the repository
  2. Clone the forked repository
  3. Install the repository and its dependencies:
    pip install -e ".[dev]"
    
  4. Make desired changes
  5. Add tests
  6. Insert an entry to CHANGELOG.md
  7. Install pre-commit to perform quality checks prior to commit:
    pre-commit install
    
  8. git commit, git push, and create a pull request