Skip to content

prefect_airbyte.configuration

Tasks for updating and fetching Airbyte configurations

Classes

Functions

export_configuration async

Prefect Task that exports an Airbyte configuration via {airbyte_server_host}/api/v1/deployment/export.

As of prefect-airbyte==0.1.3, the kwargs airbyte_server_host and airbyte_server_port can be replaced by passing an airbyte_server block instance to generate the AirbyteClient. Using the airbyte_server block is preferred, but the individual kwargs remain for backwards compatibility.

Parameters:

Name Type Description Default
airbyte_server Optional[AirbyteServer]

An AirbyteServer block for generating an AirbyteClient.

None
airbyte_server_host Optional[str]

Airbyte server host to connect to.

None
airbyte_server_port Optional[int]

Airbyte server port to connect to.

None
airbyte_api_version Optional[str]

Airbyte API version to use.

None
timeout int

Timeout in seconds on the httpx.AsyncClient.

5

Returns:

Type Description
bytes

Bytes containing Airbyte configuration

Flow that writes the Airbyte configuration as a gzip to a filepath:

```python
import gzip

from prefect import flow, task
from prefect_airbyte.configuration import export_configuration
from prefect_airbyte.server import AirbyteServer

@task
def zip_and_write_somewhere(
    airbyte_configuration: bytes
    somewhere: str = 'my_destination.gz','
):
    with gzip.open('my_destination.gz', 'wb') as f:
            f.write(airbyte_configuration)

@flow
def example_export_configuration_flow():

    # Run other tasks and subflows here

    airbyte_config = export_configuration(
        airbyte_server=AirbyteServer.load("oss-airbyte")
    )

    zip_and_write_somewhere(airbyte_config=airbyte_config)

example_trigger_sync_flow()
```
Source code in prefect_airbyte/configuration.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@task
async def export_configuration(
    airbyte_server: Optional[AirbyteServer] = None,
    airbyte_server_host: Optional[str] = None,
    airbyte_server_port: Optional[int] = None,
    airbyte_api_version: Optional[str] = None,
    timeout: int = 5,
) -> bytes:

    """
    Prefect Task that exports an Airbyte configuration via
    `{airbyte_server_host}/api/v1/deployment/export`.

    As of `prefect-airbyte==0.1.3`, the kwargs `airbyte_server_host` and
    `airbyte_server_port` can be replaced by passing an `airbyte_server` block
    instance to generate the `AirbyteClient`. Using the `airbyte_server` block is
    preferred, but the individual kwargs remain for backwards compatibility.

    Args:
        airbyte_server: An `AirbyteServer` block for generating an `AirbyteClient`.
        airbyte_server_host: Airbyte server host to connect to.
        airbyte_server_port: Airbyte server port to connect to.
        airbyte_api_version: Airbyte API version to use.
        timeout: Timeout in seconds on the `httpx.AsyncClient`.

    Returns:
        Bytes containing Airbyte configuration

    Examples:

        Flow that writes the Airbyte configuration as a gzip to a filepath:

        ```python
        import gzip

        from prefect import flow, task
        from prefect_airbyte.configuration import export_configuration
        from prefect_airbyte.server import AirbyteServer

        @task
        def zip_and_write_somewhere(
            airbyte_configuration: bytes
            somewhere: str = 'my_destination.gz','
        ):
            with gzip.open('my_destination.gz', 'wb') as f:
                    f.write(airbyte_configuration)

        @flow
        def example_export_configuration_flow():

            # Run other tasks and subflows here

            airbyte_config = export_configuration(
                airbyte_server=AirbyteServer.load("oss-airbyte")
            )

            zip_and_write_somewhere(airbyte_config=airbyte_config)

        example_trigger_sync_flow()
        ```
    """
    logger = get_run_logger()

    if not airbyte_server:
        warn(
            "The use of `airbyte_server_host`, `airbyte_server_port`, and "
            "`airbyte_api_version` is deprecated and will be removed in a "
            "future release. Please pass an `airbyte_server` block to this "
            "task instead.",
            DeprecationWarning,
            stacklevel=2,
        )
        if any([airbyte_server_host, airbyte_server_port, airbyte_api_version]):
            airbyte_server = AirbyteServer(
                server_host=airbyte_server_host or "localhost",
                server_port=airbyte_server_port or 8000,
                api_version=airbyte_api_version or "v1",
            )
        else:
            airbyte_server = AirbyteServer()
    else:
        if any([airbyte_server_host, airbyte_server_port, airbyte_api_version]):
            logger.info(
                "Ignoring `airbyte_server_host`, `airbyte_api_version`, "
                "and `airbyte_server_port` because `airbyte_server` block "
                " was passed. Using API URL from `airbyte_server` block: "
                f"{airbyte_server.base_url!r}."
            )

    async with airbyte_server.get_client(
        logger=logger, timeout=timeout
    ) as airbyte_client:

        logger.info("Initiating export of Airbyte configuration")

        return await airbyte_client.export_configuration()