Skip to content

prefect_hex.project

This is a module containing tasks for interacting with Hex projects

cancel_run async

Cancel a project run.

Parameters:

Name Type Description Default
project_id str

Project ID associated with the run to cancel.

required
run_id str

Run ID of the run to cancel.

required
hex_credentials HexCredentials

Credentials to use for authentication with Hex.

required
Source code in prefect_hex/project.py
@task
async def cancel_run(
    project_id: str,
    run_id: str,
    hex_credentials: HexCredentials,
) -> None:  # pragma: no cover
    """
    Cancel a project run.

    Args:
        project_id:
            Project ID associated with the run to cancel.
        run_id:
            Run ID of the run to cancel.
        hex_credentials:
            Credentials to use for authentication with Hex.
    """  # noqa
    endpoint = f"/project/{project_id}/run/{run_id}"  # noqa

    response = await execute_endpoint.fn(
        endpoint,
        hex_credentials,
        http_method=HTTPMethod.DELETE,
    )

    # Handles any errors returned by the API
    _unpack_contents(response)

get_project_runs async

Get the status of the API-triggered runs of a project.

Parameters:

Name Type Description Default
project_id str

Project ID to get runs for.

required
hex_credentials HexCredentials

Credentials to use for authentication with Hex.

required
limit Optional[prefect_hex.models.project.PageSize]

Number of results to fetch per page for paginated requests.

None
offset Optional[prefect_hex.models.project.Offset]

Offset for paginated requests.

None
status_filter Optional[prefect_hex.models.project.ProjectRunStatus]

Current status of a project run.

None

Returns:

Type Description
ProjectRunsResponsePayload

Details of all the retrieved runs.

Source code in prefect_hex/project.py
@task
async def get_project_runs(
    project_id: str,
    hex_credentials: HexCredentials,
    limit: Optional[models.PageSize] = None,
    offset: Optional[models.Offset] = None,
    status_filter: Optional[models.ProjectRunStatus] = None,
) -> models.ProjectRunsResponsePayload:  # pragma: no cover
    """
    Get the status of the API-triggered runs of a project.

    Args:
        project_id:
            Project ID to get runs for.
        hex_credentials:
            Credentials to use for authentication with Hex.
        limit:
            Number of results to fetch per page for paginated requests.
        offset:
            Offset for paginated requests.
        status_filter:
            Current status of a project run.

    Returns:
        Details of all the retrieved runs.
    """  # noqa
    endpoint = f"/project/{project_id}/runs"  # noqa

    params = {
        "limit": limit,
        "offset": offset,
        "statusFilter": status_filter.value if status_filter is not None else None,
    }

    response = await execute_endpoint.fn(
        endpoint,
        hex_credentials,
        http_method=HTTPMethod.GET,
        params=params,
    )

    contents = _unpack_contents(response)
    return models.ProjectRunsResponsePayload.parse_obj(contents)

get_run_status async

Get the status of a project run.

Parameters:

Name Type Description Default
project_id str

Project ID associated with the run to get the status of.

required
run_id str

Run ID of the run to get the status of.

required
hex_credentials HexCredentials

Credentials to use for authentication with Hex.

required

Returns:

Type Description
ProjectStatusResponsePayload

Information about the requested run.

Source code in prefect_hex/project.py
@task
async def get_run_status(
    project_id: str,
    run_id: str,
    hex_credentials: HexCredentials,
) -> models.ProjectStatusResponsePayload:  # pragma: no cover
    """
    Get the status of a project run.

    Args:
        project_id:
            Project ID associated with the run to get the status of.
        run_id:
            Run ID of the run to get the status of.
        hex_credentials:
            Credentials to use for authentication with Hex.

    Returns:
        Information about the requested run.
    """  # noqa
    endpoint = f"/project/{project_id}/run/{run_id}"  # noqa

    response = await execute_endpoint.fn(
        endpoint,
        hex_credentials,
        http_method=HTTPMethod.GET,
    )

    contents = _unpack_contents(response)
    return models.ProjectStatusResponsePayload.parse_obj(contents)

run_project async

Trigger a run of the latest published version of a project.

Parameters:

Name Type Description Default
project_id str

Project ID to run.

required
hex_credentials HexCredentials

Credentials to use for authentication with Hex.

required
input_params Optional[Dict]

Optional input parameter value map for this project run, e.g.

{"text_input_1": "Hello World", "numeric_input_1": 123}

None
dry_run bool

If specified, perform a dry run without actually executing the project.

False
update_cache bool

When true, this run will update the cached state of the published app with the latest run results. Additionally, any SQL cells that have caching enabled will be re-executed as part of this run. Note that this cannot be set to true if custom input parameters are provided.

False

Returns:

Type Description
ProjectRunResponsePayload

Information about the triggered project run.

Source code in prefect_hex/project.py
@task
async def run_project(
    project_id: str,
    hex_credentials: HexCredentials,
    input_params: Optional[Dict] = None,
    dry_run: bool = False,
    update_cache: bool = False,
) -> models.ProjectRunResponsePayload:  # pragma: no cover
    """
    Trigger a run of the latest published version of a project.

    Args:
        project_id:
            Project ID to run.
        hex_credentials:
            Credentials to use for authentication with Hex.
        input_params:
            Optional input parameter value map for this project run, e.g.
            ```
            {"text_input_1": "Hello World", "numeric_input_1": 123}
            ```
        dry_run:
            If specified, perform a dry run without actually executing the project.
        update_cache:
            When true, this run will update the cached state of the published app
            with the latest run results. Additionally, any SQL cells
            that have caching enabled will be re-executed as part of
            this run. Note that this cannot be set to true if custom
            input parameters are provided.

    Returns:
        Information about the triggered project run.
    """  # noqa
    endpoint = f"/project/{project_id}/run"  # noqa

    response = await execute_endpoint.fn(
        endpoint,
        hex_credentials,
        http_method=HTTPMethod.POST,
        json=models.RunProjectRequestBody(
            dryRun=dry_run, inputParams=input_params, updateCache=update_cache
        ).dict(by_alias=True),
    )

    contents = _unpack_contents(response)
    return models.ProjectRunResponsePayload.parse_obj(contents)

trigger_project_run_and_wait_for_completion async

Flow that triggers a project run and waits for the triggered run to complete.

Parameters:

Name Type Description Default
project_id str

Project ID to run.

required
hex_credentials HexCredentials

Credentials to use for authentication with Hex.

required
input_params Optional[Dict]

Optional input parameter value map for this project run, e.g.

{"text_input_1": "Hello World", "numeric_input_1": 123}

None
update_cache bool

When true, this run will update the cached state of the published app with the latest run results. Additionally, any SQL cells that have caching enabled will be re-executed as part of this run. Note that this cannot be set to true if custom input parameters are provided.

False
max_wait_seconds int

Maximum number of seconds to wait for the entire flow to complete.

900
poll_frequency_seconds int

Number of seconds to wait in between checks for run completion.

10

Returns:

Type Description
ProjectRunResponsePayload

Information about the triggered project run.

Examples:

Trigger a Hex project run and wait for completion as a stand-alone flow.

import asyncio
from prefect_hex import HexCredentials
from prefect_hex.project import trigger_project_run_and_wait_for_completion

asyncio.run(
    trigger_sync_run_and_wait_for_completion(
        hex_credentials=HexCredentials(
            token="1abc0d23-1234-1a2b-abc3-12ab456c7d8e"
        ),
        project_id="012345c6-b67c-1234-1b2c-66e4ad07b9f3",
        max_wait_seconds=1800,
        poll_frequency_seconds=5,
    )
)

Trigger a Hex project run and wait for completion as a subflow.

from prefect import flow
from prefect_hex import HexCredentials
from prefect_hex.project import trigger_project_run_and_wait_for_completion

@flow
def trigger_project_run_and_wait_for_completion_flow(project_id: str):
    hex_credentials = HexCredentials.load("hex-token")
    project_metadata = trigger_project_run_and_wait_for_completion(
        project_id=project_id,
        hex_credentials=hex_credentials
    )
    return project_metadata

trigger_project_run_and_wait_for_completion_flow(
    project_id="012345c6-b67c-1234-1b2c-66e4ad07b9f3"
)

Source code in prefect_hex/project.py
@flow
async def trigger_project_run_and_wait_for_completion(
    project_id: str,
    hex_credentials: HexCredentials,
    input_params: Optional[Dict] = None,
    update_cache: bool = False,
    max_wait_seconds: int = 900,
    poll_frequency_seconds: int = 10,
) -> models.ProjectRunResponsePayload:
    """
    Flow that triggers a project run and waits for the triggered run to complete.

    Args:
        project_id:
            Project ID to run.
        hex_credentials:
            Credentials to use for authentication with Hex.
        input_params:
            Optional input parameter value map for this project run, e.g.
            ```
            {"text_input_1": "Hello World", "numeric_input_1": 123}
            ```
        update_cache:
            When true, this run will update the cached state of the published app
            with the latest run results. Additionally, any SQL cells
            that have caching enabled will be re-executed as part of
            this run. Note that this cannot be set to true if custom
            input parameters are provided.
        max_wait_seconds: Maximum number of seconds to wait for the entire
            flow to complete.
        poll_frequency_seconds: Number of seconds to wait in between checks for
            run completion.

    Returns:
        Information about the triggered project run.

    Examples:
        Trigger a Hex project run and wait for completion as a stand-alone flow.
        ```python
        import asyncio
        from prefect_hex import HexCredentials
        from prefect_hex.project import trigger_project_run_and_wait_for_completion

        asyncio.run(
            trigger_sync_run_and_wait_for_completion(
                hex_credentials=HexCredentials(
                    token="1abc0d23-1234-1a2b-abc3-12ab456c7d8e"
                ),
                project_id="012345c6-b67c-1234-1b2c-66e4ad07b9f3",
                max_wait_seconds=1800,
                poll_frequency_seconds=5,
            )
        )
        ```

        Trigger a Hex project run and wait for completion as a subflow.
        ```python
        from prefect import flow
        from prefect_hex import HexCredentials
        from prefect_hex.project import trigger_project_run_and_wait_for_completion

        @flow
        def trigger_project_run_and_wait_for_completion_flow(project_id: str):
            hex_credentials = HexCredentials.load("hex-token")
            project_metadata = trigger_project_run_and_wait_for_completion(
                project_id=project_id,
                hex_credentials=hex_credentials
            )
            return project_metadata

        trigger_project_run_and_wait_for_completion_flow(
            project_id="012345c6-b67c-1234-1b2c-66e4ad07b9f3"
        )
        ```
    """
    logger = get_run_logger()

    project_run_future = await run_project.submit(
        project_id=project_id,
        hex_credentials=hex_credentials,
        input_params=input_params,
        update_cache=update_cache,
    )
    project_run = await project_run_future.result()
    run_id = project_run.run_id

    logger.info(
        "Started project %s run %s; visit %s to view the run.",
        repr(project_id),
        repr(run_id),
        str(project_run.run_status_url),
    )

    project_status, project_metadata = await wait_for_project_run_completion(
        project_id=project_id,
        run_id=run_id,
        hex_credentials=hex_credentials,
        max_wait_seconds=max_wait_seconds,
        poll_frequency_seconds=poll_frequency_seconds,
    )

    if project_status == models.ProjectRunStatus.completed:
        return project_metadata
    else:
        raise TERMINAL_STATUS_EXCEPTIONS.get(project_status, HexProjectRunError)(
            f"Project {project_id!r} run {run_id!r} "
            f"was unsuccessful with {project_status.value!r} status"
        )

wait_for_project_run_completion async

Flow that waits for the triggered project run to complete.

Parameters:

Name Type Description Default
project_id str

Project ID to watch.

required
run_id str

Run ID to wait for.

required
hex_credentials HexCredentials

Credentials to use for authentication with Hex.

required
max_wait_seconds int

Maximum number of seconds to wait for the entire flow to complete.

900
poll_frequency_seconds int

Number of seconds to wait in between checks for run completion.

10

Returns:

Type Description
Tuple[prefect_hex.models.project.ProjectRunStatus, prefect_hex.models.project.ProjectStatusResponsePayload]

The status of the project run and the metadata associated with the run.

Examples:

Wait for completion of a project run as a subflow.

from prefect import flow
from prefect_hex import HexCredentials
from prefect_hex.project import wait_for_project_run_completion

@flow
def wait_for_project_run_completion_flow(project_id: str, run_id: str):
    hex_credentials = HexCredentials.load("hex-token")
    project_status, project_metadata = wait_for_project_run_completion(
        project_id=project_id,
        run_id=run_id,
        hex_credentials=hex_credentials
    )
    return project_status, project_metadata

wait_for_project_run_completion_flow(
    project_id="012345c6-b67c-1234-1b2c-66e4ad07b9f3",
    run_id="654321c6-b67c-1234-1b2c-66e4ad07b9f3",
)

Source code in prefect_hex/project.py
@flow
async def wait_for_project_run_completion(
    project_id: str,
    run_id: str,
    hex_credentials: HexCredentials,
    max_wait_seconds: int = 900,
    poll_frequency_seconds: int = 10,
) -> Tuple[models.ProjectRunStatus, models.ProjectStatusResponsePayload]:
    """
    Flow that waits for the triggered project run to complete.

    Args:
        project_id:
            Project ID to watch.
        run_id:
            Run ID to wait for.
        hex_credentials:
            Credentials to use for authentication with Hex.
        max_wait_seconds: Maximum number of seconds to wait for the entire
            flow to complete.
        poll_frequency_seconds: Number of seconds to wait in between checks for
            run completion.

    Returns:
        The status of the project run and the metadata associated with the run.

    Examples:
        Wait for completion of a project run as a subflow.
        ```python
        from prefect import flow
        from prefect_hex import HexCredentials
        from prefect_hex.project import wait_for_project_run_completion

        @flow
        def wait_for_project_run_completion_flow(project_id: str, run_id: str):
            hex_credentials = HexCredentials.load("hex-token")
            project_status, project_metadata = wait_for_project_run_completion(
                project_id=project_id,
                run_id=run_id,
                hex_credentials=hex_credentials
            )
            return project_status, project_metadata

        wait_for_project_run_completion_flow(
            project_id="012345c6-b67c-1234-1b2c-66e4ad07b9f3",
            run_id="654321c6-b67c-1234-1b2c-66e4ad07b9f3",
        )
        ```
    """
    logger = get_run_logger()
    seconds_waited_for_run_completion = 0
    wait_for = []

    while seconds_waited_for_run_completion <= max_wait_seconds:
        project_future = await get_run_status.submit(
            project_id=project_id,
            run_id=run_id,
            hex_credentials=hex_credentials,
            wait_for=wait_for,
        )
        wait_for = [project_future]

        project_metadata = await project_future.result()
        project_status = project_metadata.status
        if project_status in TERMINAL_STATUS_EXCEPTIONS.keys():
            return project_status, project_metadata

        logger.debug(
            "Waiting on project %s run %s with sync status %s for %s seconds",
            repr(project_id),
            repr(run_id),
            repr(project_status.value),
            poll_frequency_seconds,
        )
        await asyncio.sleep(poll_frequency_seconds)
        seconds_waited_for_run_completion += poll_frequency_seconds

    raise HexProjectRunTimedOut(
        f"Max wait time of {max_wait_seconds} seconds exceeded while waiting "
        f"for project {project_id!r} run {run_id!r}"
    )