Skip to content

prefect_census.runs

Module containing tasks and flows for interacting with Census sync runs

Classes

CensusGetSyncRunInfoFailed

Bases: RuntimeError

Used to idicate retrieve sync run info.

Source code in prefect_census/runs.py
25
26
class CensusGetSyncRunInfoFailed(RuntimeError):
    """Used to idicate retrieve sync run info."""

CensusSyncRunCancelled

Bases: Exception

Raised when a triggered sync run is cancelled

Source code in prefect_census/runs.py
29
30
class CensusSyncRunCancelled(Exception):
    """Raised when a triggered sync run is cancelled"""

CensusSyncRunFailed

Bases: RuntimeError

Raised when unable to retrieve Census sync run

Source code in prefect_census/runs.py
14
15
class CensusSyncRunFailed(RuntimeError):
    """Raised when unable to retrieve Census sync run"""

CensusSyncRunStatus

Bases: Enum

Census sync statuses.

Source code in prefect_census/runs.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class CensusSyncRunStatus(Enum):
    """Census sync statuses."""

    CANCELLED = "cancelled"
    WORKING = "working"
    FAILED = "failed"
    COMPLETED = "completed"
    SKIPPED = "skipped"
    QUEUED = "queued"

    @classmethod
    def is_terminal_status_code(cls, status_code: str) -> bool:
        """
        Returns True if a status code is terminal for a sync run.
        Returns False otherwise.
        """
        return status_code in [
            cls.CANCELLED.value,
            cls.FAILED.value,
            cls.COMPLETED.value,
            cls.SKIPPED.value,
        ]

Functions

is_terminal_status_code classmethod

Returns True if a status code is terminal for a sync run. Returns False otherwise.

Source code in prefect_census/runs.py
43
44
45
46
47
48
49
50
51
52
53
54
@classmethod
def is_terminal_status_code(cls, status_code: str) -> bool:
    """
    Returns True if a status code is terminal for a sync run.
    Returns False otherwise.
    """
    return status_code in [
        cls.CANCELLED.value,
        cls.FAILED.value,
        cls.COMPLETED.value,
        cls.SKIPPED.value,
    ]

CensusSyncRunTimeout

Bases: RuntimeError

Raised when a triggered job run does not complete in the configured max wait seconds

Source code in prefect_census/runs.py
18
19
20
21
22
class CensusSyncRunTimeout(RuntimeError):
    """
    Raised when a triggered job run does not complete in the configured max
    wait seconds
    """

Functions

get_census_sync_run_info async

A task to retrieve information a Census sync run.

Parameters:

Name Type Description Default
credentials CensusCredentials

Credentials for authenticating with Census.

required
run_id int

The ID of the run of the sync to trigger.

required

Returns:

Type Description
Dict[str, Any]

The run data returned by the Census API as dict with the following shape:

{
    "id": 94,
    "sync_id": 52,
    "source_record_count": 1,
    "records_processed": 1,
    "records_updated": 1,
    "records_failed": 0,
    "records_invalid": 0,
    "created_at": "2021-10-20T02:51:07.546Z",
    "updated_at": "2021-10-20T02:52:29.236Z",
    "completed_at": "2021-10-20T02:52:29.234Z",
    "scheduled_execution_time": null,
    "error_code": null,
    "error_message": null,
    "error_detail": null,
    "status": "completed",
    "canceled": false,
    "full_sync": true,
    "sync_trigger_reason": {
        "ui_tag": "Manual",
        "ui_detail": "Manually triggered by test@getcensus.com"
    }
}

Example

Get Census sync run info:

from prefect import flow

from prefect_census import CensusCredentials
from prefect_census.runs import get_census_sync_run_info

@flow
def get_sync_run_info_flow():
    credentials = CensusCredentials(api_key="my_api_key")

    return get_census_sync_run_info(
        credentials=credentials,
        run_id=42
    )

get_sync_run_info_flow()

Source code in prefect_census/runs.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
@task(
    name="Get Census sync run details",
    description=(
        "Retrieves details of a Census sync run" "for the sync with the given sync_id."
    ),
    retries=3,
    retry_delay_seconds=10,
)
async def get_census_sync_run_info(
    credentials: CensusCredentials, run_id: int
) -> Dict[str, Any]:
    """
    A task to retrieve information a Census sync run.

    Args:
        credentials: Credentials for authenticating with Census.
        run_id: The ID of the run of the sync to trigger.

    Returns:
        The run data returned by the Census API as dict with the following shape:
            ```
            {
                "id": 94,
                "sync_id": 52,
                "source_record_count": 1,
                "records_processed": 1,
                "records_updated": 1,
                "records_failed": 0,
                "records_invalid": 0,
                "created_at": "2021-10-20T02:51:07.546Z",
                "updated_at": "2021-10-20T02:52:29.236Z",
                "completed_at": "2021-10-20T02:52:29.234Z",
                "scheduled_execution_time": null,
                "error_code": null,
                "error_message": null,
                "error_detail": null,
                "status": "completed",
                "canceled": false,
                "full_sync": true,
                "sync_trigger_reason": {
                    "ui_tag": "Manual",
                    "ui_detail": "Manually triggered by test@getcensus.com"
                }
            }
            ```


    Example:
        Get Census sync run info:
        ```python
        from prefect import flow

        from prefect_census import CensusCredentials
        from prefect_census.runs import get_census_sync_run_info

        @flow
        def get_sync_run_info_flow():
            credentials = CensusCredentials(api_key="my_api_key")

            return get_census_sync_run_info(
                credentials=credentials,
                run_id=42
            )

        get_sync_run_info_flow()
        ```
    """  # noqa
    try:
        async with credentials.get_client() as client:
            response = await client.get_run_info(run_id)
    except HTTPStatusError as e:
        raise CensusGetSyncRunInfoFailed(extract_user_message(e)) from e

    return response.json()["data"]

wait_census_sync_completion async

Wait for the given Census sync run to finish running.

Parameters:

Name Type Description Default
run_id int

The ID of the sync run to wait for.

required
credentials CensusCredentials

Credentials for authenticating with Census.

required
max_wait_seconds int

Maximum number of seconds to wait for sync to complete.

60
poll_frequency_seconds int

Number of seconds to wait in between checks for run completion.

5

Raises:

Type Description
CensusSyncRunTimeout

When the elapsed wait time exceeds max_wait_seconds.

Returns:

Name Type Description
run_status CensusSyncRunStatus

An enum representing the final Census sync run status.

run_data Dict[str, Any]

A dictionary containing information about the run after completion in the following shape:

{
    "id": 94,
    "sync_id": 52,
    "source_record_count": 1,
    "records_processed": 1,
    "records_updated": 1,
    "records_failed": 0,
    "records_invalid": 0,
    "created_at": "2021-10-20T02:51:07.546Z",
    "updated_at": "2021-10-20T02:52:29.236Z",
    "completed_at": "2021-10-20T02:52:29.234Z",
    "scheduled_execution_time": null,
    "error_code": null,
    "error_message": null,
    "error_detail": null,
    "status": "completed",
    "canceled": false,
    "full_sync": true,
    "sync_trigger_reason": {
        "ui_tag": "Manual",
        "ui_detail": "Manually triggered by test@getcensus.com"
    }
}

Source code in prefect_census/runs.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
@flow(
    name="Wait for Census sync run",
    description="Waits for the Census sync run to finish running.",
)
async def wait_census_sync_completion(
    run_id: int,
    credentials: CensusCredentials,
    max_wait_seconds: int = 60,
    poll_frequency_seconds: int = 5,
) -> Tuple[CensusSyncRunStatus, Dict[str, Any]]:
    """
    Wait for the given Census sync run to finish running.

    Args:
        run_id: The ID of the sync run to wait for.
        credentials: Credentials for authenticating with Census.
        max_wait_seconds: Maximum number of seconds to wait for sync to complete.
        poll_frequency_seconds: Number of seconds to wait in between checks for
            run completion.

    Raises:
        CensusSyncRunTimeout: When the elapsed wait time exceeds `max_wait_seconds`.

    Returns:
        run_status: An enum representing the final Census sync run status.
        run_data: A dictionary containing information about the run after completion
            in the following shape:
            ```
            {
                "id": 94,
                "sync_id": 52,
                "source_record_count": 1,
                "records_processed": 1,
                "records_updated": 1,
                "records_failed": 0,
                "records_invalid": 0,
                "created_at": "2021-10-20T02:51:07.546Z",
                "updated_at": "2021-10-20T02:52:29.236Z",
                "completed_at": "2021-10-20T02:52:29.234Z",
                "scheduled_execution_time": null,
                "error_code": null,
                "error_message": null,
                "error_detail": null,
                "status": "completed",
                "canceled": false,
                "full_sync": true,
                "sync_trigger_reason": {
                    "ui_tag": "Manual",
                    "ui_detail": "Manually triggered by test@getcensus.com"
                }
            }
            ```

    """
    logger = get_run_logger()
    seconds_waited_for_run_completion = 0
    wait_for = []
    while seconds_waited_for_run_completion <= max_wait_seconds:
        run_data_future = await get_census_sync_run_info.submit(
            credentials=credentials,
            run_id=run_id,
            wait_for=wait_for,
        )
        run_data = await run_data_future.result()
        run_status = run_data.get("status")

        if CensusSyncRunStatus.is_terminal_status_code(run_status):
            return CensusSyncRunStatus(run_status), run_data

        wait_for = [run_data_future]
        logger.info(
            "Census sync run with ID %i has status %s. Waiting for %i seconds.",
            run_id,
            CensusSyncRunStatus(run_status).name,
            poll_frequency_seconds,
        )
        await asyncio.sleep(poll_frequency_seconds)
        seconds_waited_for_run_completion += poll_frequency_seconds

    raise CensusSyncRunTimeout(
        f"Max wait time of {max_wait_seconds} seconds exceeded while waiting "
        f"for sync run with ID {run_id}"
    )