Skip to content

prefect_census.client

Module containing client for interacting with the Census API

Classes

CensusClient

Client for interacting with the Census API.

Attributes:

Name Type Description
api_key str

API key to authenticate with the Census API.

Source code in prefect_census/client.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class CensusClient:
    """
    Client for interacting with the Census API.

    Attributes:
        api_key (str): API key to authenticate with the Census API.
    """

    def __init__(self, api_key: str):
        self._closed = False
        self._started = False

        self.client = AsyncClient(
            base_url="https://app.getcensus.com/api/v1",
            headers={
                "Authorization": f"Bearer {api_key}",
                "user-agent": f"prefect-{prefect.__version__}",
            },
        )

    @sync_compatible
    async def call_endpoint(
        self,
        http_method: str,
        path: str,
        params: Optional[Dict[str, Any]] = None,
        json: Optional[Dict[str, Any]] = None,
    ) -> Response:
        """
        Call an endpoint in the Census API.

        Args:
            http_method: HTTP method to call on the endpoint.
            path: The partial path for request (e.g. /syncs/42). Will be
                appended onto the base URL as determined by the client configuration.
            params: Query parameters to include in the request.
            json: JSON serializable body to send in the request.

        Returns:
            The response from the Census API.

        Example:
            ```python
            from prefect import flow
            from prefect_census import CensusCredentials
            from prefect_census.client import CensusClient

            @flow
            def my_flow(sync_id):
                creds_block = CensusCredentials(api_key="my_api_key")

                client = CensusClient(
                    api_key=creds_block.api_key.get_secret_value()
                )
                response = client.call_endpoint(
                    http_method="GET",
                    path=f"/syncs/{sync_id}"
                )
                return response

            my_flow(42)
            ```
        """

        response = await self.client.request(
            method=http_method, url=path, params=params, json=json
        )
        response.raise_for_status()
        return response

    @sync_compatible
    async def get_run_info(self, run_id: int) -> Response:
        """
        Sends a request to the [get sync id info endpoint](https://docs.getcensus.com/basics/api/syncs#get-syncs-id)

        Args:
            run_id: The ID of the sync run to get details for.

        Returns:
            The response from the Census API.
        """  # noqa
        return await self.call_endpoint(
            http_method="GET",
            path=f"/sync_runs/{run_id}",
        )

    @sync_compatible
    async def trigger_sync_run(
        self, sync_id: int, force_full_sync: bool = False
    ) -> Response:
        """
        Sends a request to the [trigger sync run endpoint](https://docs.getcensus.com/basics/api/sync-runs)
        to initiate a sync run.

        Args:
            sync_id: The ID of the sync to trigger.
            force_full_sync: If the sync should perform a full sync.

        Returns:
            The response from the Census API.
        """  # noqa
        return await self.call_endpoint(
            http_method="POST",
            path=f"/syncs/{sync_id}/trigger",
            params={"force_full_sync": force_full_sync},
        )

    async def __aenter__(self):
        """Async context manager entry method."""
        if self._closed:
            raise RuntimeError(
                "The client cannot be started again after it has been closed."
            )
        if self._started:
            raise RuntimeError("The client cannot be started more than once.")

        self._started = True
        return self

    async def __aexit__(self, *exc):
        """Async context manager exit method."""
        self._closed = True
        await self.client.__aexit__()

Functions

__aenter__ async

Async context manager entry method.

Source code in prefect_census/client.py
116
117
118
119
120
121
122
123
124
125
126
async def __aenter__(self):
    """Async context manager entry method."""
    if self._closed:
        raise RuntimeError(
            "The client cannot be started again after it has been closed."
        )
    if self._started:
        raise RuntimeError("The client cannot be started more than once.")

    self._started = True
    return self
__aexit__ async

Async context manager exit method.

Source code in prefect_census/client.py
128
129
130
131
async def __aexit__(self, *exc):
    """Async context manager exit method."""
    self._closed = True
    await self.client.__aexit__()
call_endpoint async

Call an endpoint in the Census API.

Parameters:

Name Type Description Default
http_method str

HTTP method to call on the endpoint.

required
path str

The partial path for request (e.g. /syncs/42). Will be appended onto the base URL as determined by the client configuration.

required
params Optional[Dict[str, Any]]

Query parameters to include in the request.

None
json Optional[Dict[str, Any]]

JSON serializable body to send in the request.

None

Returns:

Type Description
Response

The response from the Census API.

Example
from prefect import flow
from prefect_census import CensusCredentials
from prefect_census.client import CensusClient

@flow
def my_flow(sync_id):
    creds_block = CensusCredentials(api_key="my_api_key")

    client = CensusClient(
        api_key=creds_block.api_key.get_secret_value()
    )
    response = client.call_endpoint(
        http_method="GET",
        path=f"/syncs/{sync_id}"
    )
    return response

my_flow(42)
Source code in prefect_census/client.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@sync_compatible
async def call_endpoint(
    self,
    http_method: str,
    path: str,
    params: Optional[Dict[str, Any]] = None,
    json: Optional[Dict[str, Any]] = None,
) -> Response:
    """
    Call an endpoint in the Census API.

    Args:
        http_method: HTTP method to call on the endpoint.
        path: The partial path for request (e.g. /syncs/42). Will be
            appended onto the base URL as determined by the client configuration.
        params: Query parameters to include in the request.
        json: JSON serializable body to send in the request.

    Returns:
        The response from the Census API.

    Example:
        ```python
        from prefect import flow
        from prefect_census import CensusCredentials
        from prefect_census.client import CensusClient

        @flow
        def my_flow(sync_id):
            creds_block = CensusCredentials(api_key="my_api_key")

            client = CensusClient(
                api_key=creds_block.api_key.get_secret_value()
            )
            response = client.call_endpoint(
                http_method="GET",
                path=f"/syncs/{sync_id}"
            )
            return response

        my_flow(42)
        ```
    """

    response = await self.client.request(
        method=http_method, url=path, params=params, json=json
    )
    response.raise_for_status()
    return response
get_run_info async

Sends a request to the get sync id info endpoint

Parameters:

Name Type Description Default
run_id int

The ID of the sync run to get details for.

required

Returns:

Type Description
Response

The response from the Census API.

Source code in prefect_census/client.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@sync_compatible
async def get_run_info(self, run_id: int) -> Response:
    """
    Sends a request to the [get sync id info endpoint](https://docs.getcensus.com/basics/api/syncs#get-syncs-id)

    Args:
        run_id: The ID of the sync run to get details for.

    Returns:
        The response from the Census API.
    """  # noqa
    return await self.call_endpoint(
        http_method="GET",
        path=f"/sync_runs/{run_id}",
    )
trigger_sync_run async

Sends a request to the trigger sync run endpoint to initiate a sync run.

Parameters:

Name Type Description Default
sync_id int

The ID of the sync to trigger.

required
force_full_sync bool

If the sync should perform a full sync.

False

Returns:

Type Description
Response

The response from the Census API.

Source code in prefect_census/client.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@sync_compatible
async def trigger_sync_run(
    self, sync_id: int, force_full_sync: bool = False
) -> Response:
    """
    Sends a request to the [trigger sync run endpoint](https://docs.getcensus.com/basics/api/sync-runs)
    to initiate a sync run.

    Args:
        sync_id: The ID of the sync to trigger.
        force_full_sync: If the sync should perform a full sync.

    Returns:
        The response from the Census API.
    """  # noqa
    return await self.call_endpoint(
        http_method="POST",
        path=f"/syncs/{sync_id}/trigger",
        params={"force_full_sync": force_full_sync},
    )