Skip to content

prefect_airbyte.client

Client for interacting with Airbyte instance

Classes

AirbyteClient

Client class used to call API endpoints on an Airbyte server.

This client currently supports username/password authentication as set in auth.

For more info, see the Airbyte docs.

Attributes:

Name Type Description
airbyte_base_url str

Base API endpoint URL for Airbyte.

auth

Username and password for Airbyte API.

logger

A logger instance used by the client to log messages related to API calls.

timeout

The number of seconds to wait before an API call times out.

Source code in prefect_airbyte/client.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
class AirbyteClient:
    """
    Client class used to call API endpoints on an Airbyte server.

    This client currently supports username/password authentication as set in `auth`.

    For more info, see the [Airbyte docs](https://docs.airbyte.io/api-documentation).

    Attributes:
        airbyte_base_url str: Base API endpoint URL for Airbyte.
        auth: Username and password for Airbyte API.
        logger: A logger instance used by the client to log messages related to
            API calls.
        timeout: The number of seconds to wait before an API call times out.
    """

    def __init__(
        self,
        logger: logging.Logger,
        airbyte_base_url: str = "http://localhost:8000/api/v1",
        auth: Tuple[str, str] = ("airbyte", "password"),
        timeout: int = 5,
    ):
        self._closed = False
        self._started = False

        self.airbyte_base_url = airbyte_base_url
        self.auth = auth
        self.logger = logger
        self.timeout = timeout
        self._client = httpx.AsyncClient(
            base_url=self.airbyte_base_url, auth=self.auth, timeout=self.timeout
        )

    async def check_health_status(self, client: httpx.AsyncClient) -> bool:
        """
        Checks the health status of an Airbyte instance.

        Args:
            client: `httpx.AsyncClient` used to interact with the Airbyte API.

        Returns:
            True if the server is healthy. False otherwise.
        """
        get_connection_url = self.airbyte_base_url + "/health/"
        try:
            response = await client.get(get_connection_url)
            response.raise_for_status()

            self.logger.debug("Health check response: %s", response.json())
            key = "available" if "available" in response.json() else "db"
            health_status = response.json()[key]
            if not health_status:
                raise err.AirbyteServerNotHealthyException(
                    f"Airbyte Server health status: {health_status}"
                )
            return True
        except httpx.HTTPStatusError as e:
            raise err.AirbyteServerNotHealthyException() from e

    async def export_configuration(
        self,
    ) -> bytes:
        """
        Triggers an export of Airbyte configuration.

        **Note**: As of Airbyte v0.40.7-alpha, this endpoint no longer exists.

        Returns:
            Gzipped Airbyte configuration data.
        """
        warn(
            "As of Airbyte v0.40.7-alpha, the Airbyte API no longer supports "
            "exporting configurations. See the Octavia CLI docs for more info.",
            DeprecationWarning,
            stacklevel=2,
        )

        get_connection_url = self.airbyte_base_url + "/deployment/export/"

        try:
            response = await self._client.post(get_connection_url)
            response.raise_for_status()

            self.logger.debug("Export configuration response: %s", response)

            export_config = response.content
            return export_config
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 404:
                self.logger.error(
                    "If you are using Airbyte v0.40.7-alpha, there is no longer "
                    "an API endpoint for exporting configurations."
                )
                raise err.AirbyteExportConfigurationFailed() from e

    async def get_connection_status(self, connection_id: str) -> str:
        """
        Gets the status of a defined Airbyte connection.

        Args:
            connection_id: ID of an existing Airbyte connection.

        Returns:
            The status of the defined Airbyte connection.
        """
        get_connection_url = self.airbyte_base_url + "/connections/get/"

        try:
            response = await self._client.post(
                get_connection_url, json={"connectionId": connection_id}
            )

            response.raise_for_status()

            connection_status = response.json()["status"]
            return connection_status
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 404:
                raise err.ConnectionNotFoundException() from e
            else:
                raise err.AirbyteServerNotHealthyException() from e

    async def trigger_manual_sync_connection(
        self, connection_id: str
    ) -> Tuple[str, str]:
        """
        Triggers a manual sync of the connection.

        Args:
            connection_id: ID of connection to sync.

        Returns:
            job_id: ID of the job that was triggered.
            created_at: Datetime string of when the job was created.

        """
        get_connection_url = self.airbyte_base_url + "/connections/sync/"

        try:
            response = await self._client.post(
                get_connection_url, json={"connectionId": connection_id}
            )
            response.raise_for_status()
            job = response.json()["job"]
            job_id = job["id"]
            job_created_at = job["createdAt"]
            return job_id, job_created_at
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 404:
                raise err.ConnectionNotFoundException(
                    f"Connection {connection_id} not found, please double "
                    f"check the connection_id."
                ) from e

            raise err.AirbyteServerNotHealthyException() from e

    async def get_job_status(self, job_id: str) -> Tuple[str, int, int]:
        """
        Gets the status of an Airbyte connection sync job.

        **Note**: Deprecated in favor of `AirbyteClient.get_job_info`.

        Args:
            job_id: ID of the Airbyte job to check.

        Returns:
            job_status: The current status of the job.
            job_created_at: Datetime string of when the job was created.
            job_updated_at: Datetime string of the when the job was last updated.
        """
        warn(
            "`AirbyteClient.get_job_status` is deprecated and will be removed in "
            "a future release. If you are using this client method directly, please "
            "use the `AirbyteClient.get_job_info` method instead. If you are"
            "seeing this warning while using the `trigger_sync` task, please "
            "define an `AirbyteConnection` and use `run_connection_sync` instead.",
            DeprecationWarning,
            stacklevel=2,
        )

        get_connection_url = self.airbyte_base_url + "/jobs/get/"
        try:
            response = await self._client.post(get_connection_url, json={"id": job_id})
            response.raise_for_status()

            job = response.json()["job"]
            job_status = job["status"]
            job_created_at = job["createdAt"]
            job_updated_at = job["updatedAt"]
            return job_status, job_created_at, job_updated_at
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 404:
                raise err.JobNotFoundException(f"Job {job_id} not found.") from e
            raise err.AirbyteServerNotHealthyException() from e

    async def get_job_info(self, job_id: str) -> Dict[str, Any]:
        """
        Gets the full API response for a given Airbyte Job ID.

        Args:
            job_id: The ID of the Airbyte job to retrieve information on.

        Returns:
            Dict of the full API response for the given job ID.
        """
        get_connection_url = self.airbyte_base_url + "/jobs/get/"
        try:
            response = await self._client.post(get_connection_url, json={"id": job_id})
            response.raise_for_status()

            return response.json()

        except httpx.HTTPStatusError as e:
            if e.response.status_code == 404:
                raise err.JobNotFoundException(f"Job {job_id} not found.") from e
            raise err.AirbyteServerNotHealthyException() from e

    async def create_client(self) -> httpx.AsyncClient:
        """Convencience method to create a new httpx.AsyncClient.

        To be removed in favor of using the entire `AirbyteClient` class
        as a context manager.
        """
        warn(
            "Use of this method will be removed in a future release - "
            "please use the `AirbyteClient` class as a context manager.",
            DeprecationWarning,
            stacklevel=2,
        )
        return self._client

    async def __aenter__(self):
        """Context manager entry point."""
        if self._closed:
            raise RuntimeError(
                "The client cannot be started again after it has been closed."
            )
        if self._started:
            raise RuntimeError("The client cannot be started more than once.")

        self._started = True

        await self.check_health_status(self._client)

        return self

    async def __aexit__(self, *exc):
        """Context manager exit point."""

        self._closed = True
        await self._client.__aexit__()

Functions

__aenter__ async

Context manager entry point.

Source code in prefect_airbyte/client.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
async def __aenter__(self):
    """Context manager entry point."""
    if self._closed:
        raise RuntimeError(
            "The client cannot be started again after it has been closed."
        )
    if self._started:
        raise RuntimeError("The client cannot be started more than once.")

    self._started = True

    await self.check_health_status(self._client)

    return self
__aexit__ async

Context manager exit point.

Source code in prefect_airbyte/client.py
258
259
260
261
262
async def __aexit__(self, *exc):
    """Context manager exit point."""

    self._closed = True
    await self._client.__aexit__()
check_health_status async

Checks the health status of an Airbyte instance.

Parameters:

Name Type Description Default
client AsyncClient

httpx.AsyncClient used to interact with the Airbyte API.

required

Returns:

Type Description
bool

True if the server is healthy. False otherwise.

Source code in prefect_airbyte/client.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
async def check_health_status(self, client: httpx.AsyncClient) -> bool:
    """
    Checks the health status of an Airbyte instance.

    Args:
        client: `httpx.AsyncClient` used to interact with the Airbyte API.

    Returns:
        True if the server is healthy. False otherwise.
    """
    get_connection_url = self.airbyte_base_url + "/health/"
    try:
        response = await client.get(get_connection_url)
        response.raise_for_status()

        self.logger.debug("Health check response: %s", response.json())
        key = "available" if "available" in response.json() else "db"
        health_status = response.json()[key]
        if not health_status:
            raise err.AirbyteServerNotHealthyException(
                f"Airbyte Server health status: {health_status}"
            )
        return True
    except httpx.HTTPStatusError as e:
        raise err.AirbyteServerNotHealthyException() from e
create_client async

Convencience method to create a new httpx.AsyncClient.

To be removed in favor of using the entire AirbyteClient class as a context manager.

Source code in prefect_airbyte/client.py
229
230
231
232
233
234
235
236
237
238
239
240
241
async def create_client(self) -> httpx.AsyncClient:
    """Convencience method to create a new httpx.AsyncClient.

    To be removed in favor of using the entire `AirbyteClient` class
    as a context manager.
    """
    warn(
        "Use of this method will be removed in a future release - "
        "please use the `AirbyteClient` class as a context manager.",
        DeprecationWarning,
        stacklevel=2,
    )
    return self._client
export_configuration async

Triggers an export of Airbyte configuration.

Note: As of Airbyte v0.40.7-alpha, this endpoint no longer exists.

Returns:

Type Description
bytes

Gzipped Airbyte configuration data.

Source code in prefect_airbyte/client.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
async def export_configuration(
    self,
) -> bytes:
    """
    Triggers an export of Airbyte configuration.

    **Note**: As of Airbyte v0.40.7-alpha, this endpoint no longer exists.

    Returns:
        Gzipped Airbyte configuration data.
    """
    warn(
        "As of Airbyte v0.40.7-alpha, the Airbyte API no longer supports "
        "exporting configurations. See the Octavia CLI docs for more info.",
        DeprecationWarning,
        stacklevel=2,
    )

    get_connection_url = self.airbyte_base_url + "/deployment/export/"

    try:
        response = await self._client.post(get_connection_url)
        response.raise_for_status()

        self.logger.debug("Export configuration response: %s", response)

        export_config = response.content
        return export_config
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 404:
            self.logger.error(
                "If you are using Airbyte v0.40.7-alpha, there is no longer "
                "an API endpoint for exporting configurations."
            )
            raise err.AirbyteExportConfigurationFailed() from e
get_connection_status async

Gets the status of a defined Airbyte connection.

Parameters:

Name Type Description Default
connection_id str

ID of an existing Airbyte connection.

required

Returns:

Type Description
str

The status of the defined Airbyte connection.

Source code in prefect_airbyte/client.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
async def get_connection_status(self, connection_id: str) -> str:
    """
    Gets the status of a defined Airbyte connection.

    Args:
        connection_id: ID of an existing Airbyte connection.

    Returns:
        The status of the defined Airbyte connection.
    """
    get_connection_url = self.airbyte_base_url + "/connections/get/"

    try:
        response = await self._client.post(
            get_connection_url, json={"connectionId": connection_id}
        )

        response.raise_for_status()

        connection_status = response.json()["status"]
        return connection_status
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 404:
            raise err.ConnectionNotFoundException() from e
        else:
            raise err.AirbyteServerNotHealthyException() from e
get_job_info async

Gets the full API response for a given Airbyte Job ID.

Parameters:

Name Type Description Default
job_id str

The ID of the Airbyte job to retrieve information on.

required

Returns:

Type Description
Dict[str, Any]

Dict of the full API response for the given job ID.

Source code in prefect_airbyte/client.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
async def get_job_info(self, job_id: str) -> Dict[str, Any]:
    """
    Gets the full API response for a given Airbyte Job ID.

    Args:
        job_id: The ID of the Airbyte job to retrieve information on.

    Returns:
        Dict of the full API response for the given job ID.
    """
    get_connection_url = self.airbyte_base_url + "/jobs/get/"
    try:
        response = await self._client.post(get_connection_url, json={"id": job_id})
        response.raise_for_status()

        return response.json()

    except httpx.HTTPStatusError as e:
        if e.response.status_code == 404:
            raise err.JobNotFoundException(f"Job {job_id} not found.") from e
        raise err.AirbyteServerNotHealthyException() from e
get_job_status async

Gets the status of an Airbyte connection sync job.

Note: Deprecated in favor of AirbyteClient.get_job_info.

Parameters:

Name Type Description Default
job_id str

ID of the Airbyte job to check.

required

Returns:

Name Type Description
job_status str

The current status of the job.

job_created_at int

Datetime string of when the job was created.

job_updated_at int

Datetime string of the when the job was last updated.

Source code in prefect_airbyte/client.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
async def get_job_status(self, job_id: str) -> Tuple[str, int, int]:
    """
    Gets the status of an Airbyte connection sync job.

    **Note**: Deprecated in favor of `AirbyteClient.get_job_info`.

    Args:
        job_id: ID of the Airbyte job to check.

    Returns:
        job_status: The current status of the job.
        job_created_at: Datetime string of when the job was created.
        job_updated_at: Datetime string of the when the job was last updated.
    """
    warn(
        "`AirbyteClient.get_job_status` is deprecated and will be removed in "
        "a future release. If you are using this client method directly, please "
        "use the `AirbyteClient.get_job_info` method instead. If you are"
        "seeing this warning while using the `trigger_sync` task, please "
        "define an `AirbyteConnection` and use `run_connection_sync` instead.",
        DeprecationWarning,
        stacklevel=2,
    )

    get_connection_url = self.airbyte_base_url + "/jobs/get/"
    try:
        response = await self._client.post(get_connection_url, json={"id": job_id})
        response.raise_for_status()

        job = response.json()["job"]
        job_status = job["status"]
        job_created_at = job["createdAt"]
        job_updated_at = job["updatedAt"]
        return job_status, job_created_at, job_updated_at
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 404:
            raise err.JobNotFoundException(f"Job {job_id} not found.") from e
        raise err.AirbyteServerNotHealthyException() from e
trigger_manual_sync_connection async

Triggers a manual sync of the connection.

Parameters:

Name Type Description Default
connection_id str

ID of connection to sync.

required

Returns:

Name Type Description
job_id str

ID of the job that was triggered.

created_at str

Datetime string of when the job was created.

Source code in prefect_airbyte/client.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
async def trigger_manual_sync_connection(
    self, connection_id: str
) -> Tuple[str, str]:
    """
    Triggers a manual sync of the connection.

    Args:
        connection_id: ID of connection to sync.

    Returns:
        job_id: ID of the job that was triggered.
        created_at: Datetime string of when the job was created.

    """
    get_connection_url = self.airbyte_base_url + "/connections/sync/"

    try:
        response = await self._client.post(
            get_connection_url, json={"connectionId": connection_id}
        )
        response.raise_for_status()
        job = response.json()["job"]
        job_id = job["id"]
        job_created_at = job["createdAt"]
        return job_id, job_created_at
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 404:
            raise err.ConnectionNotFoundException(
                f"Connection {connection_id} not found, please double "
                f"check the connection_id."
            ) from e

        raise err.AirbyteServerNotHealthyException() from e