Skip to content

prefect_firebolt.database

Module for interacting with Firebolt databases

Classes

FireboltDatabase

Bases: Block

Connects to a Firebolt database.

Provide either engine_name or engine_url. Providing both will result in an error. If neither engine_name nor engine_url is provided, the default engine for the configured database will be used.

Attributes:

Name Type Description
credentials FireboltCredentials

Credentials to use to connect to the Firebase database.

database_name FireboltCredentials

The name of the database to connect to.

engine_name Optional[str]

Name of the engine to connect to.

engine_url Optional[str]

The engine endpoint to use.

additional_parameters Optional[Dict]

Additional configuration to pass to the Firebolt connection.

Source code in prefect_firebolt/database.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class FireboltDatabase(Block):
    """
    Connects to a Firebolt database.

    Provide either `engine_name` or `engine_url`. Providing both will result in an
    error. If neither `engine_name` nor `engine_url` is provided, the default engine
    for the configured database will be used.

    Attributes:
        credentials: Credentials to use to connect to the Firebase database.
        database_name: The name of the database to connect to.
        engine_name: Name of the engine to connect to.
        engine_url: The engine endpoint to use.
        additional_parameters: Additional configuration to pass to the Firebolt
            connection.
    """

    _block_type_name = "Firebolt Database"
    _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/df83dc3c5789050a0b6b146a3780e5982df693f7-300x300.png"  # noqa
    _documentation_url = "https://prefecthq.github.io/prefect-firebolt/database/#prefect_firebolt.database.FireboltDatabase"  # noqa
    _description = "Connects to a Firebolt database."

    credentials: FireboltCredentials
    database: str = Field(
        default=...,
        title="Database Name",
        description="The name of the database to connect to.",
    )
    engine_name: Optional[str] = Field(
        default=None,
        description="Name of the engine to connect to. May not be used with "
        "engine_url. If neither engine_name nor engine_url is provided, the "
        "default engine for the configured database will be used.",
    )
    engine_url: Optional[str] = Field(
        default=None,
        title="Engine URL",
        description="The engine endpoint to use. May not be used with engine_name. "
        "If neither engine_name nor engine_url is provided, the "
        "default engine for the configured database will be used.",
    )
    additional_parameters: Optional[Dict] = Field(
        default_factory=dict,
        description="Additional configuration to pass to the Firebolt connection.",
    )

    @root_validator(pre=True)
    def _not_both_engine_name_and_engine_url(cls, values):
        """Ensures that engine_name and engine_url are not both provided"""
        if (
            values.get("engine_name") is not None
            and values.get("engine_url") is not None
        ):
            raise ValueError(
                "You have provided a value for both engine_name and engine_url. "
                "Please provide either engine_name or engine_url, but not both."
            )
        return values

    def _get_connect_params(self) -> Dict[str, Any]:
        """Assemble the parameters to pass to the Firebolt connection"""
        return {
            "database": self.database,
            "engine_name": self.engine_name,
            "engine_url": self.engine_url,
            "additional_parameters": self.additional_parameters,
        }

    @sync_compatible
    async def get_connection(self) -> Connection:
        """
        Creates and returns an authenticated Firebolt connection for the
        configured database.
        """
        return await self.credentials.get_client(**self._get_connect_params())

Functions

get_connection async

Creates and returns an authenticated Firebolt connection for the configured database.

Source code in prefect_firebolt/database.py
89
90
91
92
93
94
95
@sync_compatible
async def get_connection(self) -> Connection:
    """
    Creates and returns an authenticated Firebolt connection for the
    configured database.
    """
    return await self.credentials.get_client(**self._get_connect_params())

Functions

query_firebolt async

Executes a query against a Firebolt database.

Parameters:

Name Type Description Default
database FireboltDatabase

Firebolt database configuration to use for query execution.

required
query str

SQL query to execute.

required
parameters Optional[Sequence[ParameterType]]

A sequence of substitution parameters. Used to replace ? placeholders inside a query with actual values.

None

Returns:

Type Description
List[List[ColType]]

All rows retrieved by the query.

Example

Execute a query against a Firebolt database:

from prefect import flow

from prefect_firebolt import FireboltCredentials, FireboltDatabase, query_firebolt


@flow
def run_firebolt_query():
    firebolt_database_block = FireboltDatabase(
        database="travel",
        credentials=FireboltCredentials(
            username="arthur.dent@hitchhikers.com", password="dont42panic"
        ),
    )

    results = query_firebolt(
        database=firebolt_database_block,
        query="SELECT * FROM ex_intergalactic_trips LIMIT 100",
    )

    return results


run_firebolt_query()

Source code in prefect_firebolt/database.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@task
async def query_firebolt(
    database: FireboltDatabase,
    query: str,
    parameters: "Optional[Sequence[ParameterType]]" = None,
) -> "List[List[ColType]]":
    """
    Executes a query against a Firebolt database.

    Args:
        database: Firebolt database configuration to use for query execution.
        query: SQL query to execute.
        parameters: A sequence of substitution parameters. Used to replace `?`
            placeholders inside a query with actual values.

    Returns:
        All rows retrieved by the query.

    Example:
        Execute a query against a Firebolt database:
        ```python
        from prefect import flow

        from prefect_firebolt import FireboltCredentials, FireboltDatabase, query_firebolt


        @flow
        def run_firebolt_query():
            firebolt_database_block = FireboltDatabase(
                database="travel",
                credentials=FireboltCredentials(
                    username="arthur.dent@hitchhikers.com", password="dont42panic"
                ),
            )

            results = query_firebolt(
                database=firebolt_database_block,
                query="SELECT * FROM ex_intergalactic_trips LIMIT 100",
            )

            return results


        run_firebolt_query()
        ```
    """  # noqa
    async with await database.get_connection() as connection:
        cursor = connection.cursor()
        with cursor:
            await cursor.execute(query=query, parameters=parameters)
            return await cursor.fetchall()