Skip to content

prefect_airbyte.server

A module for defining OSS Airbyte interactions with Prefect.

Classes

AirbyteServer

Bases: Block

A block representing an Airbyte server for generating AirbyteClient instances.

Attributes:

Name Type Description
username str

Username for Airbyte API.

password SecretStr

Password for Airbyte API.

server_host str

Hostname for Airbyte API.

server_port int

Port for Airbyte API.

api_version str

Version of Airbyte API to use.

use_ssl bool

Whether to use a secure url for calls to the Airbyte API.

Example
from prefect_airbyte.server import AirbyteServer

server = AirbyteServer.load("BLOCK_NAME")
Source code in prefect_airbyte/server.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class AirbyteServer(Block):
    """A block representing an Airbyte server for generating `AirbyteClient` instances.

    Attributes:
        username: Username for Airbyte API.
        password: Password for Airbyte API.
        server_host: Hostname for Airbyte API.
        server_port: Port for Airbyte API.
        api_version: Version of Airbyte API to use.
        use_ssl: Whether to use a secure url for calls to the Airbyte API.

    Example:
        ```python
        from prefect_airbyte.server import AirbyteServer

        server = AirbyteServer.load("BLOCK_NAME")
        ```
    """

    _block_type_name = "Airbyte Server"
    _block_type_slug = "airbyte-server"
    _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/7f50097d1915fe75b0ee84c951c742a83d3c53cb-250x250.png"  # noqa
    _documentation_url = "https://prefecthq.github.io/prefect-airbyte/server/#prefect_airbyte.server.AirbyteServer"  # noqa

    username: str = Field(
        default="airbyte",
        description="Username to authenticate with Airbyte API.",
    )

    password: SecretStr = Field(
        default=SecretStr("password"),
        description="Password to authenticate with Airbyte API.",
    )

    server_host: str = Field(
        default="localhost",
        description="Host address of Airbyte server.",
        example="127.0.0.1",
    )

    server_port: int = Field(
        default=8000,
        description="Port number of Airbyte server.",
    )

    api_version: str = Field(
        default="v1",
        description="Airbyte API version to use.",
        title="API Version",
    )

    use_ssl: bool = Field(
        default=False,
        description="Whether to use SSL when connecting to Airbyte server.",
        title="Use SSL",
    )

    @property
    def base_url(self) -> str:
        """Property containing the base URL for the Airbyte API."""
        protocol = "https" if self.use_ssl else "http"
        return (
            f"{protocol}://{self.server_host}:{self.server_port}/api/{self.api_version}"
        )

    def get_client(self, logger: Logger, timeout: int = 10) -> AirbyteClient:
        """Returns an `AirbyteClient` instance for interacting with the Airbyte API.

        Args:
            logger: Logger instance used to log messages related to API calls.
            timeout: The number of seconds to wait before an API call times out.

        Returns:
            An `AirbyteClient` instance.
        """
        return AirbyteClient(
            logger=logger,
            airbyte_base_url=self.base_url,
            auth=(self.username, self.password.get_secret_value()),
            timeout=timeout,
        )

Attributes

base_url: str property

Property containing the base URL for the Airbyte API.

Functions

get_client

Returns an AirbyteClient instance for interacting with the Airbyte API.

Parameters:

Name Type Description Default
logger Logger

Logger instance used to log messages related to API calls.

required
timeout int

The number of seconds to wait before an API call times out.

10

Returns:

Type Description
AirbyteClient

An AirbyteClient instance.

Source code in prefect_airbyte/server.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def get_client(self, logger: Logger, timeout: int = 10) -> AirbyteClient:
    """Returns an `AirbyteClient` instance for interacting with the Airbyte API.

    Args:
        logger: Logger instance used to log messages related to API calls.
        timeout: The number of seconds to wait before an API call times out.

    Returns:
        An `AirbyteClient` instance.
    """
    return AirbyteClient(
        logger=logger,
        airbyte_base_url=self.base_url,
        auth=(self.username, self.password.get_secret_value()),
        timeout=timeout,
    )