Skip to content

prefect_duckdb.database

Module for querying against DuckDB databases.

Classes

DuckDBConnector

Bases: DatabaseBlock

A block for connecting to a DuckDB database.

Parameters:

Name Type Description Default
configuration

DuckDBConfig block to be used when creating connection.

required
database

The name of the default database to use.

required
read_only

Whether the connection should be read-only.

required

Examples:

Load stored DuckDB connector as a context manager:

from prefect_duckdb.database import DuckDBConnector

duckdb_connector = DuckDBConnector.load("BLOCK_NAME"):

Insert data into database and fetch results.

from prefect_duckdb.database import DuckDBConnector

with DuckDBConnector.load("BLOCK_NAME") as conn:
    conn.execute(
        "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
    )
    conn.execute_many(
        "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
        parameters=[
            {"name": "Ford", "address": "Highway 42"},
            {"name": "Unknown", "address": "Space"},
            {"name": "Me", "address": "Myway 88"},
        ],
    )
    results = conn.fetch_all(
        "SELECT * FROM customers WHERE address = %(address)s",
        parameters={"address": "Space"}
    )
    print(results)

Source code in prefect_duckdb/database.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
class DuckDBConnector(DatabaseBlock):
    """
    A block for connecting to a DuckDB database.

    Args:
        configuration: DuckDBConfig block to be used when creating connection.
        database: The name of the default database to use.
        read_only: Whether the connection should be read-only.

    Examples:
        Load stored DuckDB connector as a context manager:
        ```python
        from prefect_duckdb.database import DuckDBConnector

        duckdb_connector = DuckDBConnector.load("BLOCK_NAME"):
        ```

        Insert data into database and fetch results.
        ```python
        from prefect_duckdb.database import DuckDBConnector

        with DuckDBConnector.load("BLOCK_NAME") as conn:
            conn.execute(
                "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
            )
            conn.execute_many(
                "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
                parameters=[
                    {"name": "Ford", "address": "Highway 42"},
                    {"name": "Unknown", "address": "Space"},
                    {"name": "Me", "address": "Myway 88"},
                ],
            )
            results = conn.fetch_all(
                "SELECT * FROM customers WHERE address = %(address)s",
                parameters={"address": "Space"}
            )
            print(results)
        ```
    """

    _block_type_name = "DuckDB Connector"
    _logo_url = "https://duckdb.org/images/logo-dl/DuckDB_Logo.png"  # noqa
    _documentation_url = "https://placeholder.com"  # noqa
    _description = "Perform data operations against a DuckDb database."

    configuration: Optional[dict] = Field(
        default=None, description="Configuration to be used when creating connection."
    )
    database: str = Field(
        default=":memory:", description="The name of the default database to use."
    )
    read_only: bool = Field(
        default=False,
        description="Whether the connection should be read-only.",
    )
    _connection: Optional[DuckDBPyConnection] = None
    _debug: bool = False

    def get_connection(
        self, read_only: Optional[bool] = None, config: Optional[dict] = None
    ) -> DuckDBPyConnection:
        """
        Returns a  DuckDB connection, if `mother_ducktoken` is found in enviroment
        or config, it will be passed in the connection.

        Args:
            read_only: Whether the connection should be read-only.
            config: Configuration to be used when creating connection.

        Returns:
            A `DuckDBPyConnection` object.

        Examples:
            ```python
            from prefect_duckdb.database import DuckDBConnector

            duckdb_connector = DuckDBConnector.load("BLOCK_NAME")

            with duckdb_connector as conn:
                conn.execute("CREATE TABLE test_table (i INTEGER, j STRING);")
                ...
            ```
        """
        if self._connection is not None:
            return self._connection

        config = config or self.configuration or {}
        read_only = read_only or self.read_only

        if os.environ.get("motherduck_token") and "motherduck_token" not in config:
            config["motherduck_token"] = os.environ.get("motherduck_token")

        connection = duckdb.connect(
            database=self.database,
            read_only=read_only,
            config=config,
        )

        self._connection = connection
        self.logger.info(f"Started a new connection to {self.database}.")
        return connection

    @sync_compatible
    async def execute(
        self,
        operation: str,
        parameters: Optional[Iterable[Any]] = [],
        multiple_parameter_sets: bool = False,
        debug: Optional[bool] = False,
    ) -> DuckDBPyConnection:
        """
        Execute the given SQL query, optionally using prepared statements
        with parameters set.

        Args:
            operation: The SQL operation to execute.
            parameters: The parameters to pass to the operation.
            multiple_parameter_sets: Whether to execute the operation multiple times.
            debug: Whether to run the operation in debug mode.
                   Sends the query plan to the logger.

        Examples:
            ```python
            from prefect_duckdb.database import DuckDBConnector

            with DuckDBConnector.load("BLOCK_NAME") as conn:
                conn.execute(
                    "CREATE TABLE test_table (i INTEGER, j STRING)"
                )
            ```
        """
        self.get_connection()
        cursor = self._connection.cursor()
        if self._debug or debug:
            await self.create_query_plan_markdown(operation, cursor, parameters)

        cursor = await run_sync_in_worker_thread(
            cursor.execute, operation, parameters, multiple_parameter_sets
        )
        self.logger.info(f"Executed the operation, {operation!r}.")
        return cursor

    @sync_compatible
    async def sql(
        self,
        operation: str,
        debug: Optional[bool] = False,
    ) -> DuckDBPyRelation:
        """
        Execute the given SQL query, optionally using prepared statements
        with parameters set.

        Args:
            operation: The SQL operation to execute.
            debug: Whether to run the operation in debug mode.
                   Sends the query plan to the logger.

        Examples:
            ```python
            from prefect_duckdb.database import DuckDBConnector

            with DuckDBConnector.load("BLOCK_NAME") as conn:
                conn.sql(
                    "CREATE TABLE test_table (i INTEGER, j STRING)"
                )
            ```
        """
        self.get_connection()
        cursor = self._connection.cursor()
        if self._debug or debug:
            await self.create_query_plan_markdown(operation=operation, cursor=cursor)
        cursor = await run_sync_in_worker_thread(cursor.sql, operation)
        self.logger.info(f"Executed the operation, {operation!r}.")
        return cursor

    @sync_compatible
    async def execute_many(
        self,
        operation: str,
        parameters: Iterable[Iterable[Any]] = [],
        debug: Optional[bool] = False,
    ) -> DuckDBPyConnection:
        """
        Execute the given prepared statement multiple times using the
        list of parameter sets in parameters

        Args:
            operation: The SQL operation to execute.
            parameters: The parameters to pass to the operation.
            debug: Whether to run the operation in debug mode.
                   Sends the query plan to the logger.

        Examples:
            ```python
                from prefect_duckdb.database import DuckDBConnector

                with DuckDBConnector.load("BLOCK_NAME") as conn:
                    conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
                    conn.execute_many(
                        "INSERT INTO test_table VALUES (?, ?)",
                        parameters=[[1, "one"], [2, "two"], [3, "three"]]
                    )
            ```
        """
        cursor = self._connection.cursor()
        if self._debug or debug:
            await self.create_query_plan_markdown(operation, cursor, parameters)
        await run_sync_in_worker_thread(cursor.executemany, operation, parameters)
        self.logger.info(f"Executed {len(parameters)} operations off {operation!r}.")
        return cursor

    @sync_compatible
    async def fetch_one(
        self,
        operation: str,
        parameters: Optional[Dict[str, Any]] = [],
    ) -> Tuple[Any]:
        """
        Fetch a single result from the database.

        Args:
            operation: The SQL operation to execute.
            parameters: The parameters to pass to the operation.

        Returns:
            A tuple representing the result.

        Examples:
            ```python
            from prefect_duckdb.database import DuckDBConnector

            with DuckDBConnector.load("BLOCK_NAME") as conn:
                conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
                conn.execute("INSERT INTO test_table VALUES (1, 'one')")
                result = conn.fetch_one("SELECT * FROM test_table")
                print(result)
            ```
        """
        with self._connection.cursor() as cursor:
            await run_sync_in_worker_thread(cursor.execute, operation, parameters)
            self.logger.debug("Preparing to fetch a row.")
            result = await run_sync_in_worker_thread(cursor.fetchone)
            return result

    @sync_compatible
    async def fetch_many(
        self,
        operation: str,
        parameters: Optional[Dict[str, Any]] = [],
        size: Optional[int] = 1,
    ) -> List[Tuple[Any]]:
        """
        Fetch multiple results from the database.

        Args:
            operation: The SQL operation to execute.
            parameters: The parameters to pass to the operation.
            size: The number of rows to fetch.

        Returns:
            A list of tuples representing the results.

        Examples:
            ```python
            from prefect_duckdb.database import DuckDBConnector

            with DuckDBConnector.load("BLOCK_NAME") as conn:
                conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
                conn.execute_many(
                    "INSERT INTO test_table VALUES (?, ?)",
                    parameters=[[1, "one"], [2, "two"], [3, "three"]]
                )
                result = conn.fetch_many("SELECT * FROM test_table", size=2)
                print(result)
            ```
        """
        with self._connection.cursor() as cursor:
            await run_sync_in_worker_thread(cursor.execute, operation, parameters)
            size = size
            self.logger.debug(f"Preparing to fetch {size} rows.")
            result = await run_sync_in_worker_thread(cursor.fetchmany, size=size)
            return result

    @sync_compatible
    async def fetch_all(
        self,
        operation: str,
        parameters: Optional[Dict[str, Any]] = [],
    ) -> List[Tuple[Any]]:
        """
        Fetch all results from the database.

        Args:
            operation: The SQL operation to execute.
            parameters: The parameters to pass to the operation.

        Returns:
            A list of tuples representing the results.

        Examples:
            ```python
            from prefect_duckdb.database import DuckDBConnector

            with DuckDBConnector.load("BLOCK_NAME") as conn:
                conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
                conn.execute("INSERT INTO test_table VALUES (1, 'one')")
                result = conn.fetch_all("SELECT * FROM test_table")
                print(result)
            ```
        """
        with self._connection.cursor() as cursor:
            await run_sync_in_worker_thread(cursor.execute, operation, parameters)
            self.logger.debug("Preparing to fetch all rows.")
            result = await run_sync_in_worker_thread(cursor.fetchall)
            return result

    @sync_compatible
    async def fetch_numpy(
        self,
        operation: str,
        parameters: Optional[Dict[str, Any]] = [],
    ) -> dict:
        """
        Fetch all results of the query from the database as a numpy array.
        Args:
            operation: The SQL operation to execute.
            parameters: The parameters to pass to the operation.

        Returns:
            A dictionary representing a numpy array.

        Examples:
            ```python
            from prefect_duckdb.database import DuckDBConnector

            with DuckDBConnector.load("BLOCK_NAME") as conn:
                conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
                conn.execute("INSERT INTO test_table VALUES (1, 'one')")
                result = conn.fetch_numpy("SELECT * FROM test_table")
                print(result)
            ```
        """
        with self._connection.cursor() as cursor:
            await run_sync_in_worker_thread(cursor.execute, operation, parameters)
            self.logger.debug("Preparing to fetch all rows.")
            result = await run_sync_in_worker_thread(cursor.fetchnumpy)
            return result

    @sync_compatible
    async def fetch_df(
        self,
        operation: str,
        parameters: Optional[Dict[str, Any]] = [],
        date_as_object: bool = False,
    ) -> pandas.DataFrame:
        """
        Fetch all results of the query from the database as a dataframe.

        Args:
            operation: The SQL operation to execute.
            parameters: The parameters to pass to the operation.

        Returns:
            A pandas dataframe.

        Examples:
            ```python
            from prefect_duckdb.database import DuckDBConnector

            with DuckDBConnector.load("BLOCK_NAME") as conn:
                conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
                conn.execute("INSERT INTO test_table VALUES (1, 'one')")
                result = conn.fetch_df("SELECT * FROM test_table")
                print(result)
            ```
        """
        with self._connection.cursor() as cursor:
            await run_sync_in_worker_thread(
                cursor.execute, operation, parameters, date_as_object
            )
            self.logger.debug("Preparing to fetch all rows.")
            result = await run_sync_in_worker_thread(cursor.df)
            return result

    @sync_compatible
    async def fetch_arrow(
        self,
        operation: str,
        parameters: Optional[Dict[str, Any]] = [],
    ) -> Any:
        """
        Fetch all results of the query from the database as an Arrow table.

        Args:
            operation: The SQL operation to execute.
            parameters: The parameters to pass to the operation.

        Returns:
            An Arrow table.

        Examples:
            ```python
            from prefect_duckdb.database import DuckDBConnector

            with DuckDBConnector.load("BLOCK_NAME") as conn:
                conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
                conn.execute("INSERT INTO test_table VALUES (1, 'one')")
                result = conn.fetch_arrow("SELECT * FROM test_table")
                print(result)
            ```
        """
        with self._connection.cursor() as cursor:
            await run_sync_in_worker_thread(cursor.execute, operation, parameters)
            self.logger.debug("Preparing to fetch all rows.")
            result = await run_sync_in_worker_thread(cursor.arrow)
            return result

    @sync_compatible
    async def create_function(
        self,
        name: str,
        func: Callable,
        parameters: Optional[Dict[str, Any]] = None,
        return_type: Optional[str] = None,
        side_effects: bool = False,
    ) -> None:
        """
        Create a function in the database.

        Args:
            name: string representing the unique name of the UDF within the catalog.
            func: The Python function you wish to register as a UDF.
            parameters: This parameter takes a list of column types used as input.
            return_type: Scalar functions return one element per row.
                         This parameter specifies the return type of the function.
            side_effects: Whether the function has side effects.
        """
        with self._connection.cursor() as cursor:
            await run_sync_in_worker_thread(
                cursor.create_function,
                name,
                func,
                parameters,
                return_type,
                side_effects=side_effects,
            )
            self.logger.info(f"Created function {name!r}.")

    def create_secret(
        self,
        name: str,
        secret_type: Literal["S3", "AZURE"],
        key_id: Optional[str] = None,
        secret: Optional[str] = None,
        region: Optional[str] = None,
        scope: Optional[str] = None,
    ):
        """Create a secret in DuckDB.

        Args:
            name: The name of the secret.
            secret_type: The type of secret.
            key_id: The key ID.
            secret: The secret.
            region: The region.
            scope: The scope.

        Examples:
            ```python
            from prefect_duckdb.database import DuckDBConnector
            from prefect_aws import AwsCredentials

            aws_credentials_block = AwsCredentials.load("BLOCK_NAME")
            connector = DuckDBConnector().load("BLOCK_NAME")
            connector.get_connection()
            connector.create_secret(
                name="test_secret",
                secret_type="S3",
                key_id=aws_credentials_block.access_key,
                secret=aws_credentials_block.secret_access_key,
                region=aws_credentials_block.region_name
            )
            connector.execute("SELECT count(*) FROM 's3://<bucket>/<file>';")
            ```
        """
        if not self._connection:
            self.get_connection()

        args = []
        if type(secret) == SecretStr:
            secret = secret.get_secret_value()
        if key_id:
            args.append(f"KEY_ID '{key_id}'")
        if secret:
            args.append(f"SECRET '{secret}'")
        if region:
            args.append(f"REGION '{region}'")
        if scope:
            args.append(f"SCOPE '{scope}'")

        argstring = ", ".join(args)
        self._connection.execute(
            f"""CREATE SECRET {name} ( TYPE {secret_type}, {argstring});"""
        )

    @sync_compatible
    async def from_csv_auto(
        self,
        file_name: str,
    ) -> DuckDBPyRelation:
        """
        Create a table from a CSV file.

        Args:
            file_name: The name of the CSV file.
        """
        with self._connection.cursor() as cursor:
            return await run_sync_in_worker_thread(cursor.from_csv_auto, file_name)

    @sync_compatible
    async def from_df(
        self,
        df: pandas.DataFrame,
        table_name: Optional[str] = None,
    ) -> DuckDBPyRelation:
        """
        Create a table from a Pandas DataFrame.

        Args:
            df: The Pandas DataFrame.
            table_name: The name of the table.
        """
        cursor = self._connection.cursor()
        table = await run_sync_in_worker_thread(cursor.from_df, df)
        if table_name:
            await run_sync_in_worker_thread(cursor.register, table_name, table)
        return cursor

    @sync_compatible
    async def from_arrow(
        self, arrow_object: pa.Table, table_name: Optional[str] = None
    ) -> DuckDBPyRelation:
        """
        Create a table from an Arrow object.

        Args:
            arrow_object: The Arrow object.
        """
        cursor = self._connection.cursor()
        table = await run_sync_in_worker_thread(cursor.from_arrow, arrow_object)
        if table_name:
            await run_sync_in_worker_thread(cursor.register, table_name, table)
        return cursor

    @sync_compatible
    async def from_parquet(
        self,
        file_name: str,
    ) -> DuckDBPyRelation:
        """
        Create a table from a Parquet file.

        Args:
            file_name: The name of the Parquet file.
        """
        with self._connection.cursor() as cursor:
            return await run_sync_in_worker_thread(cursor.from_parquet, file_name)

    def remove_function(self, name: str) -> None:
        """
        Remove a function from the database.

        Args:
            name: string representing the unique name of the UDF within the catalog.
        """
        self._connection.remove_function(name)

    def set_debug(self, debug: bool) -> None:
        """
        Set the debug mode of the connector.

        Args:
            debug: Whether to enable debug mode.
        """
        self._debug = debug
        self.logger.info(f"Set debug mode to {debug}.")

    async def create_query_plan_markdown(
        self,
        operation: str,
        cursor: DuckDBPyConnection,
        parameters: Optional[list] = [],
    ):
        debug_operation = f"""EXPLAIN \
                            {operation}"""
        plan = cursor.execute(debug_operation, parameters)
        plan = plan.df()
        plan = plan.rename(columns={"explain_value": "Physical_Plan"})[
            "Physical_Plan"
        ].to_markdown(index=False)

        markdown = f"""
```
{plan}
```
"""
        artifact_key = re.sub("[^A-Za-z0-9 ]+", "", operation).lower().replace(" ", "-")

        self.logger.info(markdown)
        async with get_client():
            return await create_markdown_artifact(
                key=artifact_key,
                markdown=markdown,
                description="The query plan for the operation.",
            )

    def close(self):
        """
        Closes connection and its cursors.
        """
        if self._connection is None:
            self.logger.info("There was no connection open to be closed.")
            return
        self._connection.close()
        self._connection = None
        self.logger.info("Successfully closed the DuckDB connection.")

    def __enter__(self):
        """
        Start a connection upon entry.
        """
        return self

    def __exit__(self, *args):
        """
        Closes connection and its cursors upon exit.
        """
        self.close()

    def __getstate__(self):
        """Allows block to be pickled and dumped."""
        data = self.__dict__.copy()
        data.update({k: None for k in {"_connection"}})
        return data

    def __setstate__(self, data: dict):
        """Reset connection and cursors upon loading."""
        self.__dict__.update(data)

Functions

__enter__

Start a connection upon entry.

Source code in prefect_duckdb/database.py
651
652
653
654
655
def __enter__(self):
    """
    Start a connection upon entry.
    """
    return self
__exit__

Closes connection and its cursors upon exit.

Source code in prefect_duckdb/database.py
657
658
659
660
661
def __exit__(self, *args):
    """
    Closes connection and its cursors upon exit.
    """
    self.close()
__getstate__

Allows block to be pickled and dumped.

Source code in prefect_duckdb/database.py
663
664
665
666
667
def __getstate__(self):
    """Allows block to be pickled and dumped."""
    data = self.__dict__.copy()
    data.update({k: None for k in {"_connection"}})
    return data
__setstate__

Reset connection and cursors upon loading.

Source code in prefect_duckdb/database.py
669
670
671
def __setstate__(self, data: dict):
    """Reset connection and cursors upon loading."""
    self.__dict__.update(data)
close

Closes connection and its cursors.

Source code in prefect_duckdb/database.py
640
641
642
643
644
645
646
647
648
649
def close(self):
    """
    Closes connection and its cursors.
    """
    if self._connection is None:
        self.logger.info("There was no connection open to be closed.")
        return
    self._connection.close()
    self._connection = None
    self.logger.info("Successfully closed the DuckDB connection.")
create_function async

Create a function in the database.

Parameters:

Name Type Description Default
name str

string representing the unique name of the UDF within the catalog.

required
func Callable

The Python function you wish to register as a UDF.

required
parameters Optional[Dict[str, Any]]

This parameter takes a list of column types used as input.

None
return_type Optional[str]

Scalar functions return one element per row. This parameter specifies the return type of the function.

None
side_effects bool

Whether the function has side effects.

False
Source code in prefect_duckdb/database.py
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
@sync_compatible
async def create_function(
    self,
    name: str,
    func: Callable,
    parameters: Optional[Dict[str, Any]] = None,
    return_type: Optional[str] = None,
    side_effects: bool = False,
) -> None:
    """
    Create a function in the database.

    Args:
        name: string representing the unique name of the UDF within the catalog.
        func: The Python function you wish to register as a UDF.
        parameters: This parameter takes a list of column types used as input.
        return_type: Scalar functions return one element per row.
                     This parameter specifies the return type of the function.
        side_effects: Whether the function has side effects.
    """
    with self._connection.cursor() as cursor:
        await run_sync_in_worker_thread(
            cursor.create_function,
            name,
            func,
            parameters,
            return_type,
            side_effects=side_effects,
        )
        self.logger.info(f"Created function {name!r}.")
create_secret

Create a secret in DuckDB.

Parameters:

Name Type Description Default
name str

The name of the secret.

required
secret_type Literal['S3', 'AZURE']

The type of secret.

required
key_id Optional[str]

The key ID.

None
secret Optional[str]

The secret.

None
region Optional[str]

The region.

None
scope Optional[str]

The scope.

None

Examples:

from prefect_duckdb.database import DuckDBConnector
from prefect_aws import AwsCredentials

aws_credentials_block = AwsCredentials.load("BLOCK_NAME")
connector = DuckDBConnector().load("BLOCK_NAME")
connector.get_connection()
connector.create_secret(
    name="test_secret",
    secret_type="S3",
    key_id=aws_credentials_block.access_key,
    secret=aws_credentials_block.secret_access_key,
    region=aws_credentials_block.region_name
)
connector.execute("SELECT count(*) FROM 's3://<bucket>/<file>';")
Source code in prefect_duckdb/database.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
def create_secret(
    self,
    name: str,
    secret_type: Literal["S3", "AZURE"],
    key_id: Optional[str] = None,
    secret: Optional[str] = None,
    region: Optional[str] = None,
    scope: Optional[str] = None,
):
    """Create a secret in DuckDB.

    Args:
        name: The name of the secret.
        secret_type: The type of secret.
        key_id: The key ID.
        secret: The secret.
        region: The region.
        scope: The scope.

    Examples:
        ```python
        from prefect_duckdb.database import DuckDBConnector
        from prefect_aws import AwsCredentials

        aws_credentials_block = AwsCredentials.load("BLOCK_NAME")
        connector = DuckDBConnector().load("BLOCK_NAME")
        connector.get_connection()
        connector.create_secret(
            name="test_secret",
            secret_type="S3",
            key_id=aws_credentials_block.access_key,
            secret=aws_credentials_block.secret_access_key,
            region=aws_credentials_block.region_name
        )
        connector.execute("SELECT count(*) FROM 's3://<bucket>/<file>';")
        ```
    """
    if not self._connection:
        self.get_connection()

    args = []
    if type(secret) == SecretStr:
        secret = secret.get_secret_value()
    if key_id:
        args.append(f"KEY_ID '{key_id}'")
    if secret:
        args.append(f"SECRET '{secret}'")
    if region:
        args.append(f"REGION '{region}'")
    if scope:
        args.append(f"SCOPE '{scope}'")

    argstring = ", ".join(args)
    self._connection.execute(
        f"""CREATE SECRET {name} ( TYPE {secret_type}, {argstring});"""
    )
execute async

Execute the given SQL query, optionally using prepared statements with parameters set.

Parameters:

Name Type Description Default
operation str

The SQL operation to execute.

required
parameters Optional[Iterable[Any]]

The parameters to pass to the operation.

[]
multiple_parameter_sets bool

Whether to execute the operation multiple times.

False
debug Optional[bool]

Whether to run the operation in debug mode. Sends the query plan to the logger.

False

Examples:

from prefect_duckdb.database import DuckDBConnector

with DuckDBConnector.load("BLOCK_NAME") as conn:
    conn.execute(
        "CREATE TABLE test_table (i INTEGER, j STRING)"
    )
Source code in prefect_duckdb/database.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
@sync_compatible
async def execute(
    self,
    operation: str,
    parameters: Optional[Iterable[Any]] = [],
    multiple_parameter_sets: bool = False,
    debug: Optional[bool] = False,
) -> DuckDBPyConnection:
    """
    Execute the given SQL query, optionally using prepared statements
    with parameters set.

    Args:
        operation: The SQL operation to execute.
        parameters: The parameters to pass to the operation.
        multiple_parameter_sets: Whether to execute the operation multiple times.
        debug: Whether to run the operation in debug mode.
               Sends the query plan to the logger.

    Examples:
        ```python
        from prefect_duckdb.database import DuckDBConnector

        with DuckDBConnector.load("BLOCK_NAME") as conn:
            conn.execute(
                "CREATE TABLE test_table (i INTEGER, j STRING)"
            )
        ```
    """
    self.get_connection()
    cursor = self._connection.cursor()
    if self._debug or debug:
        await self.create_query_plan_markdown(operation, cursor, parameters)

    cursor = await run_sync_in_worker_thread(
        cursor.execute, operation, parameters, multiple_parameter_sets
    )
    self.logger.info(f"Executed the operation, {operation!r}.")
    return cursor
execute_many async

Execute the given prepared statement multiple times using the list of parameter sets in parameters

Parameters:

Name Type Description Default
operation str

The SQL operation to execute.

required
parameters Iterable[Iterable[Any]]

The parameters to pass to the operation.

[]
debug Optional[bool]

Whether to run the operation in debug mode. Sends the query plan to the logger.

False

Examples:

    from prefect_duckdb.database import DuckDBConnector

    with DuckDBConnector.load("BLOCK_NAME") as conn:
        conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
        conn.execute_many(
            "INSERT INTO test_table VALUES (?, ?)",
            parameters=[[1, "one"], [2, "two"], [3, "three"]]
        )
Source code in prefect_duckdb/database.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
@sync_compatible
async def execute_many(
    self,
    operation: str,
    parameters: Iterable[Iterable[Any]] = [],
    debug: Optional[bool] = False,
) -> DuckDBPyConnection:
    """
    Execute the given prepared statement multiple times using the
    list of parameter sets in parameters

    Args:
        operation: The SQL operation to execute.
        parameters: The parameters to pass to the operation.
        debug: Whether to run the operation in debug mode.
               Sends the query plan to the logger.

    Examples:
        ```python
            from prefect_duckdb.database import DuckDBConnector

            with DuckDBConnector.load("BLOCK_NAME") as conn:
                conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
                conn.execute_many(
                    "INSERT INTO test_table VALUES (?, ?)",
                    parameters=[[1, "one"], [2, "two"], [3, "three"]]
                )
        ```
    """
    cursor = self._connection.cursor()
    if self._debug or debug:
        await self.create_query_plan_markdown(operation, cursor, parameters)
    await run_sync_in_worker_thread(cursor.executemany, operation, parameters)
    self.logger.info(f"Executed {len(parameters)} operations off {operation!r}.")
    return cursor
fetch_all async

Fetch all results from the database.

Parameters:

Name Type Description Default
operation str

The SQL operation to execute.

required
parameters Optional[Dict[str, Any]]

The parameters to pass to the operation.

[]

Returns:

Type Description
List[Tuple[Any]]

A list of tuples representing the results.

Examples:

from prefect_duckdb.database import DuckDBConnector

with DuckDBConnector.load("BLOCK_NAME") as conn:
    conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
    conn.execute("INSERT INTO test_table VALUES (1, 'one')")
    result = conn.fetch_all("SELECT * FROM test_table")
    print(result)
Source code in prefect_duckdb/database.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
@sync_compatible
async def fetch_all(
    self,
    operation: str,
    parameters: Optional[Dict[str, Any]] = [],
) -> List[Tuple[Any]]:
    """
    Fetch all results from the database.

    Args:
        operation: The SQL operation to execute.
        parameters: The parameters to pass to the operation.

    Returns:
        A list of tuples representing the results.

    Examples:
        ```python
        from prefect_duckdb.database import DuckDBConnector

        with DuckDBConnector.load("BLOCK_NAME") as conn:
            conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
            conn.execute("INSERT INTO test_table VALUES (1, 'one')")
            result = conn.fetch_all("SELECT * FROM test_table")
            print(result)
        ```
    """
    with self._connection.cursor() as cursor:
        await run_sync_in_worker_thread(cursor.execute, operation, parameters)
        self.logger.debug("Preparing to fetch all rows.")
        result = await run_sync_in_worker_thread(cursor.fetchall)
        return result
fetch_arrow async

Fetch all results of the query from the database as an Arrow table.

Parameters:

Name Type Description Default
operation str

The SQL operation to execute.

required
parameters Optional[Dict[str, Any]]

The parameters to pass to the operation.

[]

Returns:

Type Description
Any

An Arrow table.

Examples:

from prefect_duckdb.database import DuckDBConnector

with DuckDBConnector.load("BLOCK_NAME") as conn:
    conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
    conn.execute("INSERT INTO test_table VALUES (1, 'one')")
    result = conn.fetch_arrow("SELECT * FROM test_table")
    print(result)
Source code in prefect_duckdb/database.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
@sync_compatible
async def fetch_arrow(
    self,
    operation: str,
    parameters: Optional[Dict[str, Any]] = [],
) -> Any:
    """
    Fetch all results of the query from the database as an Arrow table.

    Args:
        operation: The SQL operation to execute.
        parameters: The parameters to pass to the operation.

    Returns:
        An Arrow table.

    Examples:
        ```python
        from prefect_duckdb.database import DuckDBConnector

        with DuckDBConnector.load("BLOCK_NAME") as conn:
            conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
            conn.execute("INSERT INTO test_table VALUES (1, 'one')")
            result = conn.fetch_arrow("SELECT * FROM test_table")
            print(result)
        ```
    """
    with self._connection.cursor() as cursor:
        await run_sync_in_worker_thread(cursor.execute, operation, parameters)
        self.logger.debug("Preparing to fetch all rows.")
        result = await run_sync_in_worker_thread(cursor.arrow)
        return result
fetch_df async

Fetch all results of the query from the database as a dataframe.

Parameters:

Name Type Description Default
operation str

The SQL operation to execute.

required
parameters Optional[Dict[str, Any]]

The parameters to pass to the operation.

[]

Returns:

Type Description
DataFrame

A pandas dataframe.

Examples:

from prefect_duckdb.database import DuckDBConnector

with DuckDBConnector.load("BLOCK_NAME") as conn:
    conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
    conn.execute("INSERT INTO test_table VALUES (1, 'one')")
    result = conn.fetch_df("SELECT * FROM test_table")
    print(result)
Source code in prefect_duckdb/database.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
@sync_compatible
async def fetch_df(
    self,
    operation: str,
    parameters: Optional[Dict[str, Any]] = [],
    date_as_object: bool = False,
) -> pandas.DataFrame:
    """
    Fetch all results of the query from the database as a dataframe.

    Args:
        operation: The SQL operation to execute.
        parameters: The parameters to pass to the operation.

    Returns:
        A pandas dataframe.

    Examples:
        ```python
        from prefect_duckdb.database import DuckDBConnector

        with DuckDBConnector.load("BLOCK_NAME") as conn:
            conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
            conn.execute("INSERT INTO test_table VALUES (1, 'one')")
            result = conn.fetch_df("SELECT * FROM test_table")
            print(result)
        ```
    """
    with self._connection.cursor() as cursor:
        await run_sync_in_worker_thread(
            cursor.execute, operation, parameters, date_as_object
        )
        self.logger.debug("Preparing to fetch all rows.")
        result = await run_sync_in_worker_thread(cursor.df)
        return result
fetch_many async

Fetch multiple results from the database.

Parameters:

Name Type Description Default
operation str

The SQL operation to execute.

required
parameters Optional[Dict[str, Any]]

The parameters to pass to the operation.

[]
size Optional[int]

The number of rows to fetch.

1

Returns:

Type Description
List[Tuple[Any]]

A list of tuples representing the results.

Examples:

from prefect_duckdb.database import DuckDBConnector

with DuckDBConnector.load("BLOCK_NAME") as conn:
    conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
    conn.execute_many(
        "INSERT INTO test_table VALUES (?, ?)",
        parameters=[[1, "one"], [2, "two"], [3, "three"]]
    )
    result = conn.fetch_many("SELECT * FROM test_table", size=2)
    print(result)
Source code in prefect_duckdb/database.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
@sync_compatible
async def fetch_many(
    self,
    operation: str,
    parameters: Optional[Dict[str, Any]] = [],
    size: Optional[int] = 1,
) -> List[Tuple[Any]]:
    """
    Fetch multiple results from the database.

    Args:
        operation: The SQL operation to execute.
        parameters: The parameters to pass to the operation.
        size: The number of rows to fetch.

    Returns:
        A list of tuples representing the results.

    Examples:
        ```python
        from prefect_duckdb.database import DuckDBConnector

        with DuckDBConnector.load("BLOCK_NAME") as conn:
            conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
            conn.execute_many(
                "INSERT INTO test_table VALUES (?, ?)",
                parameters=[[1, "one"], [2, "two"], [3, "three"]]
            )
            result = conn.fetch_many("SELECT * FROM test_table", size=2)
            print(result)
        ```
    """
    with self._connection.cursor() as cursor:
        await run_sync_in_worker_thread(cursor.execute, operation, parameters)
        size = size
        self.logger.debug(f"Preparing to fetch {size} rows.")
        result = await run_sync_in_worker_thread(cursor.fetchmany, size=size)
        return result
fetch_numpy async

Fetch all results of the query from the database as a numpy array. Args: operation: The SQL operation to execute. parameters: The parameters to pass to the operation.

Returns:

Type Description
dict

A dictionary representing a numpy array.

Examples:

from prefect_duckdb.database import DuckDBConnector

with DuckDBConnector.load("BLOCK_NAME") as conn:
    conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
    conn.execute("INSERT INTO test_table VALUES (1, 'one')")
    result = conn.fetch_numpy("SELECT * FROM test_table")
    print(result)
Source code in prefect_duckdb/database.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
@sync_compatible
async def fetch_numpy(
    self,
    operation: str,
    parameters: Optional[Dict[str, Any]] = [],
) -> dict:
    """
    Fetch all results of the query from the database as a numpy array.
    Args:
        operation: The SQL operation to execute.
        parameters: The parameters to pass to the operation.

    Returns:
        A dictionary representing a numpy array.

    Examples:
        ```python
        from prefect_duckdb.database import DuckDBConnector

        with DuckDBConnector.load("BLOCK_NAME") as conn:
            conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
            conn.execute("INSERT INTO test_table VALUES (1, 'one')")
            result = conn.fetch_numpy("SELECT * FROM test_table")
            print(result)
        ```
    """
    with self._connection.cursor() as cursor:
        await run_sync_in_worker_thread(cursor.execute, operation, parameters)
        self.logger.debug("Preparing to fetch all rows.")
        result = await run_sync_in_worker_thread(cursor.fetchnumpy)
        return result
fetch_one async

Fetch a single result from the database.

Parameters:

Name Type Description Default
operation str

The SQL operation to execute.

required
parameters Optional[Dict[str, Any]]

The parameters to pass to the operation.

[]

Returns:

Type Description
Tuple[Any]

A tuple representing the result.

Examples:

from prefect_duckdb.database import DuckDBConnector

with DuckDBConnector.load("BLOCK_NAME") as conn:
    conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
    conn.execute("INSERT INTO test_table VALUES (1, 'one')")
    result = conn.fetch_one("SELECT * FROM test_table")
    print(result)
Source code in prefect_duckdb/database.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
@sync_compatible
async def fetch_one(
    self,
    operation: str,
    parameters: Optional[Dict[str, Any]] = [],
) -> Tuple[Any]:
    """
    Fetch a single result from the database.

    Args:
        operation: The SQL operation to execute.
        parameters: The parameters to pass to the operation.

    Returns:
        A tuple representing the result.

    Examples:
        ```python
        from prefect_duckdb.database import DuckDBConnector

        with DuckDBConnector.load("BLOCK_NAME") as conn:
            conn.execute("CREATE TABLE test_table (i INTEGER, j STRING)")
            conn.execute("INSERT INTO test_table VALUES (1, 'one')")
            result = conn.fetch_one("SELECT * FROM test_table")
            print(result)
        ```
    """
    with self._connection.cursor() as cursor:
        await run_sync_in_worker_thread(cursor.execute, operation, parameters)
        self.logger.debug("Preparing to fetch a row.")
        result = await run_sync_in_worker_thread(cursor.fetchone)
        return result
from_arrow async

Create a table from an Arrow object.

Parameters:

Name Type Description Default
arrow_object Table

The Arrow object.

required
Source code in prefect_duckdb/database.py
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
@sync_compatible
async def from_arrow(
    self, arrow_object: pa.Table, table_name: Optional[str] = None
) -> DuckDBPyRelation:
    """
    Create a table from an Arrow object.

    Args:
        arrow_object: The Arrow object.
    """
    cursor = self._connection.cursor()
    table = await run_sync_in_worker_thread(cursor.from_arrow, arrow_object)
    if table_name:
        await run_sync_in_worker_thread(cursor.register, table_name, table)
    return cursor
from_csv_auto async

Create a table from a CSV file.

Parameters:

Name Type Description Default
file_name str

The name of the CSV file.

required
Source code in prefect_duckdb/database.py
529
530
531
532
533
534
535
536
537
538
539
540
541
@sync_compatible
async def from_csv_auto(
    self,
    file_name: str,
) -> DuckDBPyRelation:
    """
    Create a table from a CSV file.

    Args:
        file_name: The name of the CSV file.
    """
    with self._connection.cursor() as cursor:
        return await run_sync_in_worker_thread(cursor.from_csv_auto, file_name)
from_df async

Create a table from a Pandas DataFrame.

Parameters:

Name Type Description Default
df DataFrame

The Pandas DataFrame.

required
table_name Optional[str]

The name of the table.

None
Source code in prefect_duckdb/database.py
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
@sync_compatible
async def from_df(
    self,
    df: pandas.DataFrame,
    table_name: Optional[str] = None,
) -> DuckDBPyRelation:
    """
    Create a table from a Pandas DataFrame.

    Args:
        df: The Pandas DataFrame.
        table_name: The name of the table.
    """
    cursor = self._connection.cursor()
    table = await run_sync_in_worker_thread(cursor.from_df, df)
    if table_name:
        await run_sync_in_worker_thread(cursor.register, table_name, table)
    return cursor
from_parquet async

Create a table from a Parquet file.

Parameters:

Name Type Description Default
file_name str

The name of the Parquet file.

required
Source code in prefect_duckdb/database.py
578
579
580
581
582
583
584
585
586
587
588
589
590
@sync_compatible
async def from_parquet(
    self,
    file_name: str,
) -> DuckDBPyRelation:
    """
    Create a table from a Parquet file.

    Args:
        file_name: The name of the Parquet file.
    """
    with self._connection.cursor() as cursor:
        return await run_sync_in_worker_thread(cursor.from_parquet, file_name)
get_connection

Returns a DuckDB connection, if mother_ducktoken is found in enviroment or config, it will be passed in the connection.

Parameters:

Name Type Description Default
read_only Optional[bool]

Whether the connection should be read-only.

None
config Optional[dict]

Configuration to be used when creating connection.

None

Returns:

Type Description
DuckDBPyConnection

A DuckDBPyConnection object.

Examples:

from prefect_duckdb.database import DuckDBConnector

duckdb_connector = DuckDBConnector.load("BLOCK_NAME")

with duckdb_connector as conn:
    conn.execute("CREATE TABLE test_table (i INTEGER, j STRING);")
    ...
Source code in prefect_duckdb/database.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def get_connection(
    self, read_only: Optional[bool] = None, config: Optional[dict] = None
) -> DuckDBPyConnection:
    """
    Returns a  DuckDB connection, if `mother_ducktoken` is found in enviroment
    or config, it will be passed in the connection.

    Args:
        read_only: Whether the connection should be read-only.
        config: Configuration to be used when creating connection.

    Returns:
        A `DuckDBPyConnection` object.

    Examples:
        ```python
        from prefect_duckdb.database import DuckDBConnector

        duckdb_connector = DuckDBConnector.load("BLOCK_NAME")

        with duckdb_connector as conn:
            conn.execute("CREATE TABLE test_table (i INTEGER, j STRING);")
            ...
        ```
    """
    if self._connection is not None:
        return self._connection

    config = config or self.configuration or {}
    read_only = read_only or self.read_only

    if os.environ.get("motherduck_token") and "motherduck_token" not in config:
        config["motherduck_token"] = os.environ.get("motherduck_token")

    connection = duckdb.connect(
        database=self.database,
        read_only=read_only,
        config=config,
    )

    self._connection = connection
    self.logger.info(f"Started a new connection to {self.database}.")
    return connection
remove_function

Remove a function from the database.

Parameters:

Name Type Description Default
name str

string representing the unique name of the UDF within the catalog.

required
Source code in prefect_duckdb/database.py
592
593
594
595
596
597
598
599
def remove_function(self, name: str) -> None:
    """
    Remove a function from the database.

    Args:
        name: string representing the unique name of the UDF within the catalog.
    """
    self._connection.remove_function(name)
set_debug

Set the debug mode of the connector.

Parameters:

Name Type Description Default
debug bool

Whether to enable debug mode.

required
Source code in prefect_duckdb/database.py
601
602
603
604
605
606
607
608
609
def set_debug(self, debug: bool) -> None:
    """
    Set the debug mode of the connector.

    Args:
        debug: Whether to enable debug mode.
    """
    self._debug = debug
    self.logger.info(f"Set debug mode to {debug}.")
sql async

Execute the given SQL query, optionally using prepared statements with parameters set.

Parameters:

Name Type Description Default
operation str

The SQL operation to execute.

required
debug Optional[bool]

Whether to run the operation in debug mode. Sends the query plan to the logger.

False

Examples:

from prefect_duckdb.database import DuckDBConnector

with DuckDBConnector.load("BLOCK_NAME") as conn:
    conn.sql(
        "CREATE TABLE test_table (i INTEGER, j STRING)"
    )
Source code in prefect_duckdb/database.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
@sync_compatible
async def sql(
    self,
    operation: str,
    debug: Optional[bool] = False,
) -> DuckDBPyRelation:
    """
    Execute the given SQL query, optionally using prepared statements
    with parameters set.

    Args:
        operation: The SQL operation to execute.
        debug: Whether to run the operation in debug mode.
               Sends the query plan to the logger.

    Examples:
        ```python
        from prefect_duckdb.database import DuckDBConnector

        with DuckDBConnector.load("BLOCK_NAME") as conn:
            conn.sql(
                "CREATE TABLE test_table (i INTEGER, j STRING)"
            )
        ```
    """
    self.get_connection()
    cursor = self._connection.cursor()
    if self._debug or debug:
        await self.create_query_plan_markdown(operation=operation, cursor=cursor)
    cursor = await run_sync_in_worker_thread(cursor.sql, operation)
    self.logger.info(f"Executed the operation, {operation!r}.")
    return cursor

Functions

duckdb_query

Execute a query against a DuckDB database.

Parameters:

Name Type Description Default
query str

The SQL query to execute.

required
duckdb_connector DuckDBConnector

The DuckDBConnector block to use.

required
parameters Optional[Iterable[Any]]

The parameters to pass to the operation.

[]

Returns:

Type Description
List[Tuple[Any]]

A list of tuples representing the results.

Examples:

from prefect import Flow
from prefect_duckdb.database import DuckDBConnector, duckdb_query

@flow
def duckdb_query_flow():
    duckdb_connector = DuckDBConnector.load("BLOCK_NAME")

    result = duckdb_query("SELECT * FROM test_table", duckdb_connector)
    print(result)

duckdb_query_flow()
Source code in prefect_duckdb/database.py
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
@task
def duckdb_query(
    query: str,
    duckdb_connector: DuckDBConnector,
    parameters: Optional[Iterable[Any]] = [],
    debug: Optional[bool] = False,
) -> List[Tuple[Any]]:
    """
    Execute a query against a DuckDB database.

    Args:
        query: The SQL query to execute.
        duckdb_connector: The DuckDBConnector block to use.
        parameters: The parameters to pass to the operation.

    Returns:
        A list of tuples representing the results.

    Examples:
        ```python
        from prefect import Flow
        from prefect_duckdb.database import DuckDBConnector, duckdb_query

        @flow
        def duckdb_query_flow():
            duckdb_connector = DuckDBConnector.load("BLOCK_NAME")

            result = duckdb_query("SELECT * FROM test_table", duckdb_connector)
            print(result)

        duckdb_query_flow()
        ```

    """
    result = duckdb_connector.execute(query, parameters, debug=debug)
    return result