Skip to content

prefect_hightouch.syncs

flows

This is a module containing flows used for interacting with syncs.

trigger_sync_run_and_wait_for_completion async

Flow that triggers a sync run and waits for the triggered run to complete.

Parameters:

Name Type Description Default
hightouch_credentials HightouchCredentials

Credentials to use for authentication with Hightouch.

required
sync_id str

Sync ID used in formatting the endpoint URL.

required
full_resync bool

Whether to resync all the rows in the query (i.e. ignoring previously synced rows).

False
max_wait_seconds int

Maximum number of seconds to wait for the entire flow to complete.

900
poll_frequency_seconds int

Number of seconds to wait in between checks for run completion.

10

Returns:

Type Description
Sync
  • id: str
  • slug: str
  • workspace_id: str
  • created_at: str
  • updated_at: str
  • destination_id: str
  • model_id: str
  • configuration: Dict
  • schedule: Dict
  • status: "models.SyncStatus"
  • disabled: bool
  • last_run_at: str
  • referenced_columns: List[str]
  • primary_key: str

Examples:

Trigger a Hightouch sync run and wait for completion as a stand alone flow.

import asyncio

from prefect_hightouch import HightouchCredentials
from prefect_hightouch.syncs import trigger_sync_run_and_wait_for_completion

asyncio.run(
    trigger_sync_run_and_wait_for_completion(
        hightouch_credentials=HightouchCredentials(
            token="1abc0d23-1234-1a2b-abc3-12ab456c7d8e"
        ),
        sync_id=12345,
        full_resync=True,
        max_wait_seconds=1800,
        poll_frequency_seconds=5,
    )
)

Trigger a Hightouch sync run and wait for completion as a subflow.

from prefect import flow

from prefect_hightouch import HightouchCredentials
from prefect_hightouch.syncs import trigger_sync_run_and_wait_for_completion

@flow
def sync_flow():
    hightouch_credentials = HightouchCredentials.load("hightouch-token")
    sync_metadata = trigger_sync_run_and_wait_for_completion(
        hightouch_credentials=hightouch_credentials,
        sync_id=12345,
        full_resync=True,
        max_wait_seconds=1800,
        poll_frequency_seconds=10,
    )
    return sync_metadata

sync_flow()

Source code in prefect_hightouch/syncs/flows.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
@flow
async def trigger_sync_run_and_wait_for_completion(
    hightouch_credentials: HightouchCredentials,
    sync_id: str,
    full_resync: bool = False,
    max_wait_seconds: int = 900,
    poll_frequency_seconds: int = 10,
) -> api_models.sync.Sync:
    """
    Flow that triggers a sync run and waits for the triggered run to complete.

    Args:
        hightouch_credentials: Credentials to use for authentication with Hightouch.
        sync_id: Sync ID used in formatting the endpoint URL.
        full_resync: Whether to resync all the rows in the query
            (i.e. ignoring previously synced rows).
        max_wait_seconds: Maximum number of seconds to wait for the entire
            flow to complete.
        poll_frequency_seconds: Number of seconds to wait in between checks for
            run completion.

    Returns:
        - `id`: `str`<br>
            - `slug`: `str`<br>
            - `workspace_id`: `str`<br>
            - `created_at`: `str`<br>
            - `updated_at`: `str`<br>
            - `destination_id`: `str`<br>
            - `model_id`: `str`<br>
            - `configuration`: `Dict`<br>
            - `schedule`: `Dict`<br>
            - `status`: `"models.SyncStatus"`<br>
            - `disabled`: `bool`<br>
            - `last_run_at`: `str`<br>
            - `referenced_columns`: `List[str]`<br>
            - `primary_key`: `str`<br>

    Examples:
        Trigger a Hightouch sync run and wait for completion as a stand alone flow.
        ```python
        import asyncio

        from prefect_hightouch import HightouchCredentials
        from prefect_hightouch.syncs import trigger_sync_run_and_wait_for_completion

        asyncio.run(
            trigger_sync_run_and_wait_for_completion(
                hightouch_credentials=HightouchCredentials(
                    token="1abc0d23-1234-1a2b-abc3-12ab456c7d8e"
                ),
                sync_id=12345,
                full_resync=True,
                max_wait_seconds=1800,
                poll_frequency_seconds=5,
            )
        )
        ```

        Trigger a Hightouch sync run and wait for completion as a subflow.
        ```python
        from prefect import flow

        from prefect_hightouch import HightouchCredentials
        from prefect_hightouch.syncs import trigger_sync_run_and_wait_for_completion

        @flow
        def sync_flow():
            hightouch_credentials = HightouchCredentials.load("hightouch-token")
            sync_metadata = trigger_sync_run_and_wait_for_completion(
                hightouch_credentials=hightouch_credentials,
                sync_id=12345,
                full_resync=True,
                max_wait_seconds=1800,
                poll_frequency_seconds=10,
            )
            return sync_metadata

        sync_flow()
        ```
    """
    logger = get_run_logger()

    json_body = api_models.trigger_run_input.TriggerRunInput(full_resync=full_resync)
    sync_run_future = await trigger_run.submit(
        hightouch_credentials=hightouch_credentials,
        sync_id=sync_id,
        json_body=json_body,
    )
    sync_run = await sync_run_future.result()
    logger.info(
        "Started sync %s run %s; open %s and append %s to view results on webpage.",
        repr(sync_id),
        repr(sync_run.id),
        "https://app.hightouch.com/",
        f"/sources/{sync_id}/runs/{sync_run.id}",
    )

    sync_status, sync_metadata = await wait_for_sync_run_completion(
        hightouch_credentials=hightouch_credentials,
        sync_id=sync_id,
        max_wait_seconds=max_wait_seconds,
        poll_frequency_seconds=poll_frequency_seconds,
    )

    if sync_status == api_models.sync.SyncStatus.SUCCESS:
        return sync_metadata
    else:
        raise TERMINAL_STATUS_EXCEPTIONS.get(sync_status, HightouchSyncRunError)(
            f"Sync ({sync_metadata.slug!r}, ID {sync_id!r}) "
            f"was unsuccessful with {sync_status.value!r} status"
        )

wait_for_sync_run_completion async

Flow that waits for the triggered sync run to complete.

Parameters:

Name Type Description Default
hightouch_credentials HightouchCredentials

Credentials to use for authentication with Hightouch.

required
sync_id str

Sync ID used in formatting the endpoint URL.

required
max_wait_seconds int

Maximum number of seconds to wait for the entire flow to complete.

900
poll_frequency_seconds int

Number of seconds to wait in between checks for run completion.

10

Returns:

Type Description
SyncStatus
  • value
Sync
  • id: str
  • slug: str
  • workspace_id: str
  • created_at: str
  • updated_at: str
  • destination_id: str
  • model_id: str
  • configuration: Dict
  • schedule: Dict
  • status: "api_models.SyncStatus"
  • disabled: bool
  • last_run_at: str
  • referenced_columns: List[str]
  • primary_key: str

Examples:

Wait for completion as a subflow.

from prefect import flow

from prefect_hightouch import HightouchCredentials
from prefect_hightouch.syncs import wait_for_sync_run_completion

@flow
def wait_flow():
    hightouch_credentials = HightouchCredentials.load("hightouch-token")
    sync_status, sync_metadata = wait_for_sync_run_completion(
        hightouch_credentials=hightouch_credentials,
        sync_id=12345,
        max_wait_seconds=1800,
        poll_frequency_seconds=20,
    )
    return sync_metadata

wait_flow()

Source code in prefect_hightouch/syncs/flows.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
@flow
async def wait_for_sync_run_completion(
    hightouch_credentials: HightouchCredentials,
    sync_id: str,
    max_wait_seconds: int = 900,
    poll_frequency_seconds: int = 10,
) -> Tuple[api_models.sync.SyncStatus, api_models.sync.Sync]:
    """
    Flow that waits for the triggered sync run to complete.

    Args:
        hightouch_credentials: Credentials to use for authentication with Hightouch.
        sync_id: Sync ID used in formatting the endpoint URL.
        max_wait_seconds: Maximum number of seconds to wait for the
            entire flow to complete.
        poll_frequency_seconds: Number of seconds to wait in between checks for
            run completion.

    Returns:
        - `value`
        - `id`: `str`<br>
            - `slug`: `str`<br>
            - `workspace_id`: `str`<br>
            - `created_at`: `str`<br>
            - `updated_at`: `str`<br>
            - `destination_id`: `str`<br>
            - `model_id`: `str`<br>
            - `configuration`: `Dict`<br>
            - `schedule`: `Dict`<br>
            - `status`: `"api_models.SyncStatus"`<br>
            - `disabled`: `bool`<br>
            - `last_run_at`: `str`<br>
            - `referenced_columns`: `List[str]`<br>
            - `primary_key`: `str`<br>

    Examples:
        Wait for completion as a subflow.
        ```python
        from prefect import flow

        from prefect_hightouch import HightouchCredentials
        from prefect_hightouch.syncs import wait_for_sync_run_completion

        @flow
        def wait_flow():
            hightouch_credentials = HightouchCredentials.load("hightouch-token")
            sync_status, sync_metadata = wait_for_sync_run_completion(
                hightouch_credentials=hightouch_credentials,
                sync_id=12345,
                max_wait_seconds=1800,
                poll_frequency_seconds=20,
            )
            return sync_metadata

        wait_flow()
        ```
    """
    logger = get_run_logger()
    seconds_waited_for_run_completion = 0
    wait_for = []

    while seconds_waited_for_run_completion <= max_wait_seconds:
        sync_future = await get_sync.submit(
            hightouch_credentials=hightouch_credentials,
            sync_id=sync_id,
            wait_for=wait_for,
        )
        wait_for = [sync_future]

        sync_metadata = await sync_future.result()
        sync_slug = sync_metadata.slug
        sync_status = sync_metadata.status
        if sync_status in TERMINAL_STATUS_EXCEPTIONS.keys():
            return sync_status, sync_metadata

        logger.info(
            "Waiting on sync (%s, ID %s) with sync status %s for %s seconds",
            repr(sync_slug),
            repr(sync_id),
            repr(sync_status.value),
            poll_frequency_seconds,
        )
        await asyncio.sleep(poll_frequency_seconds)
        seconds_waited_for_run_completion += poll_frequency_seconds

    raise HightouchSyncRunTimedOut(
        f"Max wait time of {max_wait_seconds} seconds exceeded while waiting "
        f"for sync ({sync_slug!r}, ID {sync_id!r})"
    )

generated

This is a module containing tasks, auto-generated from the Hightouch REST schema, used for interacting with syncs.

get_sync async

Retrieve sync from sync ID.

Parameters:

Name Type Description Default
hightouch_credentials HightouchCredentials

Credentials to use for authentication with Hightouch.

required
sync_id float

Sync ID used in formatting the endpoint URL.

required

Returns:

Type Description
Sync
  • id: str
  • slug: str
  • workspace_id: str
  • created_at: str
  • updated_at: str
  • destination_id: str
  • model_id: str
  • configuration: Dict
  • schedule: Dict
  • status: "models.SyncStatus"
  • disabled: bool
  • last_run_at: str
  • referenced_columns: List[str]
  • primary_key: str

API Endpoint:

/syncs/{sync_id}

API Responses:

Response Description
200 Ok.
401 Unauthorized.
404 Not found.
Source code in prefect_hightouch/syncs/generated.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@task
@_update_kwargs_and_execute(_get_sync_endpoint)
async def get_sync(*args, **kwargs) -> api_models.sync.Sync:
    """
    Retrieve sync from sync ID.

    Args:
        hightouch_credentials (HightouchCredentials):
            Credentials to use for authentication with Hightouch.
        sync_id (float):
            Sync ID used in formatting the endpoint URL.

    Returns:
        - `id`: `str`<br>
            - `slug`: `str`<br>
            - `workspace_id`: `str`<br>
            - `created_at`: `str`<br>
            - `updated_at`: `str`<br>
            - `destination_id`: `str`<br>
            - `model_id`: `str`<br>
            - `configuration`: `Dict`<br>
            - `schedule`: `Dict`<br>
            - `status`: `"models.SyncStatus"`<br>
            - `disabled`: `bool`<br>
            - `last_run_at`: `str`<br>
            - `referenced_columns`: `List[str]`<br>
            - `primary_key`: `str`<br>

    <h4>API Endpoint:</h4>
    `/syncs/{sync_id}`

    <h4>API Responses:</h4>
    | Response | Description |
    | --- | --- |
    | 200 | Ok. |
    | 401 | Unauthorized. |
    | 404 | Not found. |
    """  # noqa
    ...  # pragma: no cover because only the decorated function gets run

list_sync async

List all the syncs in the current workspace.

Parameters:

Name Type Description Default
hightouch_credentials HightouchCredentials

Credentials to use for authentication with Hightouch.

required
slug Optional[str]]

Filter based on slug.

required
model_id Optional[float]]

Filter based on modelId.

required
after Optional[datetime.datetime]]

Select syncs that were run after given time.

required
before Optional[datetime.datetime]]

Select syncs that were run before given time.

required
limit Optional[float]]

Limit the number of object it returns. Default is 100.

required
order_by Optional[models.list_sync_order_by.ListSyncOrderBy]]

Specify the order.

required

Returns:

Type Description
List[Sync]
  • data: List

API Endpoint:

/syncs

API Responses:

Response Description
200 Ok.
400 Bad request.
401 Unauthorized.
422 Validation Failed.
Source code in prefect_hightouch/syncs/generated.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@task
@_update_kwargs_and_execute(_list_sync_endpoint)
async def list_sync(*args, **kwargs) -> typing.List[api_models.sync.Sync]:
    """
    List all the syncs in the current workspace.

    Args:
        hightouch_credentials (HightouchCredentials):
            Credentials to use for authentication with Hightouch.
        slug (Optional[str]]):
            Filter based on slug.
        model_id (Optional[float]]):
            Filter based on modelId.
        after (Optional[datetime.datetime]]):
            Select syncs that were run after given time.
        before (Optional[datetime.datetime]]):
            Select syncs that were run before given time.
        limit (Optional[float]]):
            Limit the number of object it returns. Default is 100.
        order_by (Optional[models.list_sync_order_by.ListSyncOrderBy]]):
            Specify the order.

    Returns:
        - `data`: `List`<br>

    <h4>API Endpoint:</h4>
    `/syncs`

    <h4>API Responses:</h4>
    | Response | Description |
    | --- | --- |
    | 200 | Ok. |
    | 400 | Bad request. |
    | 401 | Unauthorized. |
    | 422 | Validation Failed. |
    """  # noqa
    ...  # pragma: no cover because only the decorated function gets run

list_sync_runs async

List all sync runs under a sync.

Parameters:

Name Type Description Default
hightouch_credentials HightouchCredentials

Credentials to use for authentication with Hightouch.

required
sync_id float

Sync ID used in formatting the endpoint URL.

required
run_id Optional[float]]

Query for specific run id.

required
limit Optional[float]]

Limit the number of object it returns. Default is 5.

required
offset Optional[float]]

Setting offset from result(for pagination).

required
after Optional[datetime.datetime]]

Select sync runs that are started after given timestamp.

required
before Optional[datetime.datetime]]

Select sync runs that are started before certain timestamp.

required
within Optional[float]]

Select sync runs that are started within last given minutes.

required
order_by Optional[models.list_sync_runs_order_by.ListSyncRunsOrderBy]]

Specify the order.

required

Returns:

Type Description
List[SyncRun]
  • data: List

API Endpoint:

/syncs/{sync_id}/runs

API Responses:

Response Description
200 Ok.
400 Bad request.
401 Unauthorized.
422 Validation Failed.
Source code in prefect_hightouch/syncs/generated.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
@task
@_update_kwargs_and_execute(_list_sync_runs_endpoint)
async def list_sync_runs(*args, **kwargs) -> typing.List[api_models.sync_run.SyncRun]:
    """
    List all sync runs under a sync.

    Args:
        hightouch_credentials (HightouchCredentials):
            Credentials to use for authentication with Hightouch.
        sync_id (float):
            Sync ID used in formatting the endpoint URL.
        run_id (Optional[float]]):
            Query for specific run id.
        limit (Optional[float]]):
            Limit the number of object it returns. Default is 5.
        offset (Optional[float]]):
            Setting offset from result(for pagination).
        after (Optional[datetime.datetime]]):
            Select sync runs that are started after given timestamp.
        before (Optional[datetime.datetime]]):
            Select sync runs that are started before certain timestamp.
        within (Optional[float]]):
            Select sync runs that are started within last given minutes.
        order_by (Optional[models.list_sync_runs_order_by.ListSyncRunsOrderBy]]):
            Specify the order.

    Returns:
        - `data`: `List`<br>

    <h4>API Endpoint:</h4>
    `/syncs/{sync_id}/runs`

    <h4>API Responses:</h4>
    | Response | Description |
    | --- | --- |
    | 200 | Ok. |
    | 400 | Bad request. |
    | 401 | Unauthorized. |
    | 422 | Validation Failed. |
    """  # noqa
    ...  # pragma: no cover because only the decorated function gets run

trigger_run async

Trigger a new run for the given sync. If a run is already in progress, this queues a sync run that will get executed immediately after the current run completes.

Parameters:

Name Type Description Default
hightouch_credentials HightouchCredentials

Credentials to use for authentication with Hightouch.

required
sync_id str

Sync ID used in formatting the endpoint URL.

required
json_body TriggerRunInput

The input of a trigger action to run syncs.

required

Returns:

Type Description
TriggerRunOutput
  • id: str

API Endpoint:

/syncs/{sync_id}/trigger

API Responses:

Response Description
200 Ok.
400 Bad request.
401 Unauthorized.
422 Validation Failed.
Source code in prefect_hightouch/syncs/generated.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
@task
@_update_kwargs_and_execute(_trigger_run_endpoint)
async def trigger_run(
    *args, **kwargs
) -> api_models.trigger_run_output.TriggerRunOutput:
    """
    Trigger a new run for the given sync.  If a run is already in progress, this
    queues a sync run that will get executed immediately after the current run
    completes.

    Args:
        hightouch_credentials (HightouchCredentials):
            Credentials to use for authentication with Hightouch.
        sync_id (str):
            Sync ID used in formatting the endpoint URL.
        json_body (models.trigger_run_input.TriggerRunInput):
            The input of a trigger action to run syncs.

    Returns:
        - `id`: `str`<br>

    <h4>API Endpoint:</h4>
    `/syncs/{sync_id}/trigger`

    <h4>API Responses:</h4>
    | Response | Description |
    | --- | --- |
    | 200 | Ok. |
    | 400 | Bad request. |
    | 401 | Unauthorized. |
    | 422 | Validation Failed. |
    """  # noqa
    ...  # pragma: no cover because only the decorated function gets run

trigger_run_custom async

Trigger a new run globally based on sync id or sync slug If a run is already in progress, this queues a sync run that will get executed immediately after the current run completes.

Parameters:

Name Type Description Default
hightouch_credentials HightouchCredentials

Credentials to use for authentication with Hightouch.

required
json_body TriggerRunCustomInput

The input of a trigger action to run syncs based on sync ID, slug or other filters.

required

Returns:

Type Description
TriggerRunOutput
  • id: str
  • message: str
  • details: Dict

API Endpoint:

/syncs/trigger

API Responses:

Response Description
200 Ok.
400 Bad request.
401 Unauthorized.
422 Validation Failed.
Source code in prefect_hightouch/syncs/generated.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@task
@_update_kwargs_and_execute(_trigger_run_custom_endpoint)
async def trigger_run_custom(
    *args, **kwargs
) -> api_models.trigger_run_output.TriggerRunOutput:
    """
    Trigger a new run globally based on sync id or sync slug  If a run is already in
    progress, this queues a sync run that will get executed immediately after
    the current run completes.

    Args:
        hightouch_credentials (HightouchCredentials):
            Credentials to use for authentication with Hightouch.
        json_body (models.trigger_run_custom_input.TriggerRunCustomInput):
            The input of a trigger action to run syncs based on sync ID, slug or
            other filters.

    Returns:
        - `id`: `str`<br>
            - `message`: `str`<br>
            - `details`: `Dict`<br>

    <h4>API Endpoint:</h4>
    `/syncs/trigger`

    <h4>API Responses:</h4>
    | Response | Description |
    | --- | --- |
    | 200 | Ok. |
    | 400 | Bad request. |
    | 401 | Unauthorized. |
    | 422 | Validation Failed. |
    """  # noqa
    ...  # pragma: no cover because only the decorated function gets run