Skip to content

prefect_dbt.cloud.utils

Utilities for common interactions with the dbt Cloud API

Classes

DbtCloudAdministrativeApiCallFailed

Bases: Exception

Raised when a call to dbt Cloud administrative API fails.

Source code in prefect_dbt/cloud/utils.py
44
45
class DbtCloudAdministrativeApiCallFailed(Exception):
    """Raised when a call to dbt Cloud administrative API fails."""

Functions

call_dbt_cloud_administrative_api_endpoint async

Task that calls a specified endpoint in the dbt Cloud administrative API. Use this task if a prebuilt one is not yet available.

Parameters:

Name Type Description Default
dbt_cloud_credentials DbtCloudCredentials

Credentials for authenticating with dbt Cloud.

required
path str

The partial path for the request (e.g. /projects/). Will be appended onto the base URL as determined by the client configuration.

required
http_method str

HTTP method to call on the endpoint.

required
params Optional[Dict[str, Any]]

Query parameters to include in the request.

None
json Optional[Dict[str, Any]]

JSON serializable body to send in the request.

None

Returns:

Type Description
Any

The body of the response. If the body is JSON serializable, then the result of json.loads with the body as the input will be returned. Otherwise, the body will be returned directly.

Examples:

List projects for an account:

from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.utils import call_dbt_cloud_administrative_api_endpoint

@flow
def get_projects_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    result = call_dbt_cloud_administrative_api_endpoint(
        dbt_cloud_credentials=credentials,
        path="/projects/",
        http_method="GET",
    )
    return result["data"]

get_projects_flow()

Create a new job:

from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.utils import call_dbt_cloud_administrative_api_endpoint


@flow
def create_job_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    result = call_dbt_cloud_administrative_api_endpoint(
        dbt_cloud_credentials=credentials,
        path="/jobs/",
        http_method="POST",
        json={
            "id": None,
            "account_id": 123456789,
            "project_id": 100,
            "environment_id": 10,
            "name": "Nightly run",
            "dbt_version": None,
            "triggers": {"github_webhook": True, "schedule": True},
            "execute_steps": ["dbt run", "dbt test", "dbt source snapshot-freshness"],
            "settings": {"threads": 4, "target_name": "prod"},
            "state": 1,
            "schedule": {
                "date": {"type": "every_day"},
                "time": {"type": "every_hour", "interval": 1},
            },
        },
    )
    return result["data"]

create_job_flow()

Source code in prefect_dbt/cloud/utils.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
@task(
    name="Call dbt Cloud administrative API endpoint",
    description="Calls a dbt Cloud administrative API endpoint",
    retries=3,
    retry_delay_seconds=10,
)
async def call_dbt_cloud_administrative_api_endpoint(
    dbt_cloud_credentials: DbtCloudCredentials,
    path: str,
    http_method: str,
    params: Optional[Dict[str, Any]] = None,
    json: Optional[Dict[str, Any]] = None,
) -> Any:
    """
    Task that calls a specified endpoint in the dbt Cloud administrative API. Use this
    task if a prebuilt one is not yet available.

    Args:
        dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.
        path: The partial path for the request (e.g. /projects/). Will be appended
            onto the base URL as determined by the client configuration.
        http_method: HTTP method to call on the endpoint.
        params: Query parameters to include in the request.
        json: JSON serializable body to send in the request.

    Returns:
        The body of the response. If the body is JSON serializable, then the result of
            `json.loads` with the body as the input will be returned. Otherwise, the
            body will be returned directly.

    Examples:
        List projects for an account:
        ```python
        from prefect import flow

        from prefect_dbt.cloud import DbtCloudCredentials
        from prefect_dbt.cloud.utils import call_dbt_cloud_administrative_api_endpoint

        @flow
        def get_projects_flow():
            credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

            result = call_dbt_cloud_administrative_api_endpoint(
                dbt_cloud_credentials=credentials,
                path="/projects/",
                http_method="GET",
            )
            return result["data"]

        get_projects_flow()
        ```

        Create a new job:
        ```python
        from prefect import flow

        from prefect_dbt.cloud import DbtCloudCredentials
        from prefect_dbt.cloud.utils import call_dbt_cloud_administrative_api_endpoint


        @flow
        def create_job_flow():
            credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

            result = call_dbt_cloud_administrative_api_endpoint(
                dbt_cloud_credentials=credentials,
                path="/jobs/",
                http_method="POST",
                json={
                    "id": None,
                    "account_id": 123456789,
                    "project_id": 100,
                    "environment_id": 10,
                    "name": "Nightly run",
                    "dbt_version": None,
                    "triggers": {"github_webhook": True, "schedule": True},
                    "execute_steps": ["dbt run", "dbt test", "dbt source snapshot-freshness"],
                    "settings": {"threads": 4, "target_name": "prod"},
                    "state": 1,
                    "schedule": {
                        "date": {"type": "every_day"},
                        "time": {"type": "every_hour", "interval": 1},
                    },
                },
            )
            return result["data"]

        create_job_flow()
        ```
    """  # noqa
    try:

        async with dbt_cloud_credentials.get_administrative_client() as client:
            response = await client.call_endpoint(
                http_method=http_method, path=path, params=params, json=json
            )
    except HTTPStatusError as ex:
        raise DbtCloudAdministrativeApiCallFailed(extract_developer_message(ex)) from ex
    try:
        return response.json()
    except JSONDecodeError:
        return response.text

extract_developer_message

Extracts developer message from a error response from the dbt Cloud administrative API.

Parameters:

Name Type Description Default
ex HTTPStatusError

An HTTPStatusError raised by httpx

required

Returns:

Type Description
Optional[str]

developer_message from dbt Cloud administrative API response or None if a

Optional[str]

developer_message cannot be extracted

Source code in prefect_dbt/cloud/utils.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def extract_developer_message(ex: HTTPStatusError) -> Optional[str]:
    """
    Extracts developer message from a error response from the dbt Cloud
    administrative API.

    Args:
        ex: An HTTPStatusError raised by httpx

    Returns:
        developer_message from dbt Cloud administrative API response or None if a
        developer_message cannot be extracted
    """
    response_payload = ex.response.json()
    status = response_payload.get("status", {})
    return status.get("developer_message")

extract_user_message

Extracts user message from a error response from the dbt Cloud administrative API.

Parameters:

Name Type Description Default
ex HTTPStatusError

An HTTPStatusError raised by httpx

required

Returns:

Type Description
Optional[str]

user_message from dbt Cloud administrative API response or None if a

Optional[str]

user_message cannot be extracted

Source code in prefect_dbt/cloud/utils.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def extract_user_message(ex: HTTPStatusError) -> Optional[str]:
    """
    Extracts user message from a error response from the dbt Cloud administrative API.

    Args:
        ex: An HTTPStatusError raised by httpx

    Returns:
        user_message from dbt Cloud administrative API response or None if a
        user_message cannot be extracted
    """
    response_payload = ex.response.json()
    status = response_payload.get("status", {})
    return status.get("user_message")