prefect_hex.project
This is a module containing tasks for interacting with Hex projects
cancel_run
async
Cancel a project run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_id |
str |
Project ID associated with the run to cancel. |
required |
run_id |
str |
Run ID of the run to cancel. |
required |
hex_credentials |
HexCredentials |
Credentials to use for authentication with Hex. |
required |
Source code in prefect_hex/project.py
@task
async def cancel_run(
project_id: str,
run_id: str,
hex_credentials: HexCredentials,
) -> None: # pragma: no cover
"""
Cancel a project run.
Args:
project_id:
Project ID associated with the run to cancel.
run_id:
Run ID of the run to cancel.
hex_credentials:
Credentials to use for authentication with Hex.
""" # noqa
endpoint = f"/project/{project_id}/run/{run_id}" # noqa
response = await execute_endpoint.fn(
endpoint,
hex_credentials,
http_method=HTTPMethod.DELETE,
)
# Handles any errors returned by the API
_unpack_contents(response)
get_project_runs
async
Get the status of the API-triggered runs of a project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_id |
str |
Project ID to get runs for. |
required |
hex_credentials |
HexCredentials |
Credentials to use for authentication with Hex. |
required |
limit |
Optional[prefect_hex.models.project.PageSize] |
Number of results to fetch per page for paginated requests. |
None |
offset |
Optional[prefect_hex.models.project.Offset] |
Offset for paginated requests. |
None |
status_filter |
Optional[prefect_hex.models.project.ProjectRunStatus] |
Current status of a project run. |
None |
Returns:
Type | Description |
---|---|
ProjectRunsResponsePayload |
Details of all the retrieved runs. |
Source code in prefect_hex/project.py
@task
async def get_project_runs(
project_id: str,
hex_credentials: HexCredentials,
limit: Optional[models.PageSize] = None,
offset: Optional[models.Offset] = None,
status_filter: Optional[models.ProjectRunStatus] = None,
) -> models.ProjectRunsResponsePayload: # pragma: no cover
"""
Get the status of the API-triggered runs of a project.
Args:
project_id:
Project ID to get runs for.
hex_credentials:
Credentials to use for authentication with Hex.
limit:
Number of results to fetch per page for paginated requests.
offset:
Offset for paginated requests.
status_filter:
Current status of a project run.
Returns:
Details of all the retrieved runs.
""" # noqa
endpoint = f"/project/{project_id}/runs" # noqa
params = {
"limit": limit,
"offset": offset,
"statusFilter": status_filter.value if status_filter is not None else None,
}
response = await execute_endpoint.fn(
endpoint,
hex_credentials,
http_method=HTTPMethod.GET,
params=params,
)
contents = _unpack_contents(response)
return models.ProjectRunsResponsePayload.parse_obj(contents)
get_run_status
async
Get the status of a project run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_id |
str |
Project ID associated with the run to get the status of. |
required |
run_id |
str |
Run ID of the run to get the status of. |
required |
hex_credentials |
HexCredentials |
Credentials to use for authentication with Hex. |
required |
Returns:
Type | Description |
---|---|
ProjectStatusResponsePayload |
Information about the requested run. |
Source code in prefect_hex/project.py
@task
async def get_run_status(
project_id: str,
run_id: str,
hex_credentials: HexCredentials,
) -> models.ProjectStatusResponsePayload: # pragma: no cover
"""
Get the status of a project run.
Args:
project_id:
Project ID associated with the run to get the status of.
run_id:
Run ID of the run to get the status of.
hex_credentials:
Credentials to use for authentication with Hex.
Returns:
Information about the requested run.
""" # noqa
endpoint = f"/project/{project_id}/run/{run_id}" # noqa
response = await execute_endpoint.fn(
endpoint,
hex_credentials,
http_method=HTTPMethod.GET,
)
contents = _unpack_contents(response)
return models.ProjectStatusResponsePayload.parse_obj(contents)
run_project
async
Trigger a run of the latest published version of a project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_id |
str |
Project ID to run. |
required |
hex_credentials |
HexCredentials |
Credentials to use for authentication with Hex. |
required |
input_params |
Optional[Dict] |
Optional input parameter value map for this project run, e.g.
|
None |
dry_run |
bool |
If specified, perform a dry run without actually executing the project. |
False |
update_cache |
bool |
When true, this run will update the cached state of the published app with the latest run results. Additionally, any SQL cells that have caching enabled will be re-executed as part of this run. Note that this cannot be set to true if custom input parameters are provided. |
False |
Returns:
Type | Description |
---|---|
ProjectRunResponsePayload |
Information about the triggered project run. |
Source code in prefect_hex/project.py
@task
async def run_project(
project_id: str,
hex_credentials: HexCredentials,
input_params: Optional[Dict] = None,
dry_run: bool = False,
update_cache: bool = False,
) -> models.ProjectRunResponsePayload: # pragma: no cover
"""
Trigger a run of the latest published version of a project.
Args:
project_id:
Project ID to run.
hex_credentials:
Credentials to use for authentication with Hex.
input_params:
Optional input parameter value map for this project run, e.g.
```
{"text_input_1": "Hello World", "numeric_input_1": 123}
```
dry_run:
If specified, perform a dry run without actually executing the project.
update_cache:
When true, this run will update the cached state of the published app
with the latest run results. Additionally, any SQL cells
that have caching enabled will be re-executed as part of
this run. Note that this cannot be set to true if custom
input parameters are provided.
Returns:
Information about the triggered project run.
""" # noqa
endpoint = f"/project/{project_id}/run" # noqa
response = await execute_endpoint.fn(
endpoint,
hex_credentials,
http_method=HTTPMethod.POST,
json=models.RunProjectRequestBody(
dryRun=dry_run, inputParams=input_params, updateCache=update_cache
).dict(by_alias=True),
)
contents = _unpack_contents(response)
return models.ProjectRunResponsePayload.parse_obj(contents)
trigger_project_run_and_wait_for_completion
async
Flow that triggers a project run and waits for the triggered run to complete.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_id |
str |
Project ID to run. |
required |
hex_credentials |
HexCredentials |
Credentials to use for authentication with Hex. |
required |
input_params |
Optional[Dict] |
Optional input parameter value map for this project run, e.g.
|
None |
update_cache |
bool |
When true, this run will update the cached state of the published app with the latest run results. Additionally, any SQL cells that have caching enabled will be re-executed as part of this run. Note that this cannot be set to true if custom input parameters are provided. |
False |
max_wait_seconds |
int |
Maximum number of seconds to wait for the entire flow to complete. |
900 |
poll_frequency_seconds |
int |
Number of seconds to wait in between checks for run completion. |
10 |
Returns:
Type | Description |
---|---|
ProjectRunResponsePayload |
Information about the triggered project run. |
Examples:
Trigger a Hex project run and wait for completion as a stand-alone flow.
import asyncio
from prefect_hex import HexCredentials
from prefect_hex.project import trigger_project_run_and_wait_for_completion
asyncio.run(
trigger_sync_run_and_wait_for_completion(
hex_credentials=HexCredentials(
token="1abc0d23-1234-1a2b-abc3-12ab456c7d8e"
),
project_id="012345c6-b67c-1234-1b2c-66e4ad07b9f3",
max_wait_seconds=1800,
poll_frequency_seconds=5,
)
)
Trigger a Hex project run and wait for completion as a subflow.
from prefect import flow
from prefect_hex import HexCredentials
from prefect_hex.project import trigger_project_run_and_wait_for_completion
@flow
def trigger_project_run_and_wait_for_completion_flow(project_id: str):
hex_credentials = HexCredentials.load("hex-token")
project_metadata = trigger_project_run_and_wait_for_completion(
project_id=project_id,
hex_credentials=hex_credentials
)
return project_metadata
trigger_project_run_and_wait_for_completion_flow(
project_id="012345c6-b67c-1234-1b2c-66e4ad07b9f3"
)
Source code in prefect_hex/project.py
@flow
async def trigger_project_run_and_wait_for_completion(
project_id: str,
hex_credentials: HexCredentials,
input_params: Optional[Dict] = None,
update_cache: bool = False,
max_wait_seconds: int = 900,
poll_frequency_seconds: int = 10,
) -> models.ProjectRunResponsePayload:
"""
Flow that triggers a project run and waits for the triggered run to complete.
Args:
project_id:
Project ID to run.
hex_credentials:
Credentials to use for authentication with Hex.
input_params:
Optional input parameter value map for this project run, e.g.
```
{"text_input_1": "Hello World", "numeric_input_1": 123}
```
update_cache:
When true, this run will update the cached state of the published app
with the latest run results. Additionally, any SQL cells
that have caching enabled will be re-executed as part of
this run. Note that this cannot be set to true if custom
input parameters are provided.
max_wait_seconds: Maximum number of seconds to wait for the entire
flow to complete.
poll_frequency_seconds: Number of seconds to wait in between checks for
run completion.
Returns:
Information about the triggered project run.
Examples:
Trigger a Hex project run and wait for completion as a stand-alone flow.
```python
import asyncio
from prefect_hex import HexCredentials
from prefect_hex.project import trigger_project_run_and_wait_for_completion
asyncio.run(
trigger_sync_run_and_wait_for_completion(
hex_credentials=HexCredentials(
token="1abc0d23-1234-1a2b-abc3-12ab456c7d8e"
),
project_id="012345c6-b67c-1234-1b2c-66e4ad07b9f3",
max_wait_seconds=1800,
poll_frequency_seconds=5,
)
)
```
Trigger a Hex project run and wait for completion as a subflow.
```python
from prefect import flow
from prefect_hex import HexCredentials
from prefect_hex.project import trigger_project_run_and_wait_for_completion
@flow
def trigger_project_run_and_wait_for_completion_flow(project_id: str):
hex_credentials = HexCredentials.load("hex-token")
project_metadata = trigger_project_run_and_wait_for_completion(
project_id=project_id,
hex_credentials=hex_credentials
)
return project_metadata
trigger_project_run_and_wait_for_completion_flow(
project_id="012345c6-b67c-1234-1b2c-66e4ad07b9f3"
)
```
"""
logger = get_run_logger()
project_run_future = await run_project.submit(
project_id=project_id,
hex_credentials=hex_credentials,
input_params=input_params,
update_cache=update_cache,
)
project_run = await project_run_future.result()
run_id = project_run.run_id
logger.info(
"Started project %s run %s; visit %s to view the run.",
repr(project_id),
repr(run_id),
str(project_run.run_status_url),
)
project_status, project_metadata = await wait_for_project_run_completion(
project_id=project_id,
run_id=run_id,
hex_credentials=hex_credentials,
max_wait_seconds=max_wait_seconds,
poll_frequency_seconds=poll_frequency_seconds,
)
if project_status == models.ProjectRunStatus.completed:
return project_metadata
else:
raise TERMINAL_STATUS_EXCEPTIONS.get(project_status, HexProjectRunError)(
f"Project {project_id!r} run {run_id!r} "
f"was unsuccessful with {project_status.value!r} status"
)
wait_for_project_run_completion
async
Flow that waits for the triggered project run to complete.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_id |
str |
Project ID to watch. |
required |
run_id |
str |
Run ID to wait for. |
required |
hex_credentials |
HexCredentials |
Credentials to use for authentication with Hex. |
required |
max_wait_seconds |
int |
Maximum number of seconds to wait for the entire flow to complete. |
900 |
poll_frequency_seconds |
int |
Number of seconds to wait in between checks for run completion. |
10 |
Returns:
Type | Description |
---|---|
Tuple[prefect_hex.models.project.ProjectRunStatus, prefect_hex.models.project.ProjectStatusResponsePayload] |
The status of the project run and the metadata associated with the run. |
Examples:
Wait for completion of a project run as a subflow.
from prefect import flow
from prefect_hex import HexCredentials
from prefect_hex.project import wait_for_project_run_completion
@flow
def wait_for_project_run_completion_flow(project_id: str, run_id: str):
hex_credentials = HexCredentials.load("hex-token")
project_status, project_metadata = wait_for_project_run_completion(
project_id=project_id,
run_id=run_id,
hex_credentials=hex_credentials
)
return project_status, project_metadata
wait_for_project_run_completion_flow(
project_id="012345c6-b67c-1234-1b2c-66e4ad07b9f3",
run_id="654321c6-b67c-1234-1b2c-66e4ad07b9f3",
)
Source code in prefect_hex/project.py
@flow
async def wait_for_project_run_completion(
project_id: str,
run_id: str,
hex_credentials: HexCredentials,
max_wait_seconds: int = 900,
poll_frequency_seconds: int = 10,
) -> Tuple[models.ProjectRunStatus, models.ProjectStatusResponsePayload]:
"""
Flow that waits for the triggered project run to complete.
Args:
project_id:
Project ID to watch.
run_id:
Run ID to wait for.
hex_credentials:
Credentials to use for authentication with Hex.
max_wait_seconds: Maximum number of seconds to wait for the entire
flow to complete.
poll_frequency_seconds: Number of seconds to wait in between checks for
run completion.
Returns:
The status of the project run and the metadata associated with the run.
Examples:
Wait for completion of a project run as a subflow.
```python
from prefect import flow
from prefect_hex import HexCredentials
from prefect_hex.project import wait_for_project_run_completion
@flow
def wait_for_project_run_completion_flow(project_id: str, run_id: str):
hex_credentials = HexCredentials.load("hex-token")
project_status, project_metadata = wait_for_project_run_completion(
project_id=project_id,
run_id=run_id,
hex_credentials=hex_credentials
)
return project_status, project_metadata
wait_for_project_run_completion_flow(
project_id="012345c6-b67c-1234-1b2c-66e4ad07b9f3",
run_id="654321c6-b67c-1234-1b2c-66e4ad07b9f3",
)
```
"""
logger = get_run_logger()
seconds_waited_for_run_completion = 0
wait_for = []
while seconds_waited_for_run_completion <= max_wait_seconds:
project_future = await get_run_status.submit(
project_id=project_id,
run_id=run_id,
hex_credentials=hex_credentials,
wait_for=wait_for,
)
wait_for = [project_future]
project_metadata = await project_future.result()
project_status = project_metadata.status
if project_status in TERMINAL_STATUS_EXCEPTIONS.keys():
return project_status, project_metadata
logger.debug(
"Waiting on project %s run %s with sync status %s for %s seconds",
repr(project_id),
repr(run_id),
repr(project_status.value),
poll_frequency_seconds,
)
await asyncio.sleep(poll_frequency_seconds)
seconds_waited_for_run_completion += poll_frequency_seconds
raise HexProjectRunTimedOut(
f"Max wait time of {max_wait_seconds} seconds exceeded while waiting "
f"for project {project_id!r} run {run_id!r}"
)