Skip to content

Examples Catalog

Below is a list of examples for prefect-gcp.

Bigquery Module

Execute operation with parameters:

from prefect_gcp.bigquery import BigQueryWarehouse

with BigQueryWarehouse.load("BLOCK_NAME") as warehouse:
    operation = '''
        CREATE TABLE mydataset.trips AS (
        SELECT
            bikeid,
            start_time,
            duration_minutes
        FROM
            bigquery-public-data.austin_bikeshare.bikeshare_trips
        LIMIT %(limit)s
        );
    '''
    warehouse.execute(operation, parameters={"limit": 5})
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_insert_stream
from google.cloud.bigquery import SchemaField

@flow
def example_bigquery_insert_stream_flow():
    gcp_credentials = GcpCredentials(project="project")
    records = [
        {"number": 1, "text": "abc", "bool": True},
        {"number": 2, "text": "def", "bool": False},
    ]
    result = bigquery_insert_stream(
        dataset="integrations",
        table="test_table",
        records=records,
        gcp_credentials=gcp_credentials
    )
    return result

example_bigquery_insert_stream_flow()
Queries the public names database, returning 10 results.
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_query

@flow
def example_bigquery_query_flow():
    gcp_credentials = GcpCredentials(
        service_account_file="/path/to/service/account/keyfile.json",
        project="project"
    )
    query = '''
        SELECT word, word_count
        FROM `bigquery-public-data.samples.shakespeare`
        WHERE corpus = @corpus
        AND word_count >= @min_word_count
        ORDER BY word_count DESC;
    '''
    query_params = [
        ("corpus", "STRING", "romeoandjuliet"),
        ("min_word_count", "INT64", 250)
    ]
    result = bigquery_query(
        query, gcp_credentials, query_params=query_params
    )
    return result

example_bigquery_query_flow()
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_load_cloud_storage

@flow
def example_bigquery_load_cloud_storage_flow():
    gcp_credentials = GcpCredentials(project="project")
    result = bigquery_load_cloud_storage(
        dataset="dataset",
        table="test_table",
        uri="uri",
        gcp_credentials=gcp_credentials
    )
    return result

example_bigquery_load_cloud_storage_flow()
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_load_file
from google.cloud.bigquery import SchemaField

@flow
def example_bigquery_load_file_flow():
    gcp_credentials = GcpCredentials(project="project")
    result = bigquery_load_file(
        dataset="dataset",
        table="test_table",
        path="path",
        gcp_credentials=gcp_credentials
    )
    return result

example_bigquery_load_file_flow()
Create mytable in mydataset and insert two rows into it:
from prefect_gcp.bigquery import BigQueryWarehouse

with BigQueryWarehouse.load("bigquery") as warehouse:
    create_operation = '''
    CREATE TABLE IF NOT EXISTS mydataset.mytable (
        col1 STRING,
        col2 INTEGER,
        col3 BOOLEAN
    )
    '''
    warehouse.execute(create_operation)
    insert_operation = '''
    INSERT INTO mydataset.mytable (col1, col2, col3) VALUES (%s, %s, %s)
    '''
    seq_of_parameters = [
        ("a", 1, True),
        ("b", 2, False),
    ]
    warehouse.execute_many(
        insert_operation,
        seq_of_parameters=seq_of_parameters
    )
Execute operation with parameters, fetching one new row at a time:
from prefect_gcp.bigquery import BigQueryWarehouse

with BigQueryWarehouse.load("BLOCK_NAME") as warehouse:
    operation = '''
        SELECT word, word_count
        FROM `bigquery-public-data.samples.shakespeare`
        WHERE corpus = %(corpus)s
        AND word_count >= %(min_word_count)s
        ORDER BY word_count DESC
        LIMIT 3;
    '''
    parameters = {
        "corpus": "romeoandjuliet",
        "min_word_count": 250,
    }
    for _ in range(0, 3):
        result = warehouse.fetch_one(operation, parameters=parameters)
        print(result)
Execute operation with parameters, fetching all rows:
from prefect_gcp.bigquery import BigQueryWarehouse

with BigQueryWarehouse.load("BLOCK_NAME") as warehouse:
    operation = '''
        SELECT word, word_count
        FROM `bigquery-public-data.samples.shakespeare`
        WHERE corpus = %(corpus)s
        AND word_count >= %(min_word_count)s
        ORDER BY word_count DESC
        LIMIT 3;
    '''
    parameters = {
        "corpus": "romeoandjuliet",
        "min_word_count": 250,
    }
    result = warehouse.fetch_all(operation, parameters=parameters)
Execute operation with parameters, fetching two new rows at a time:
from prefect_gcp.bigquery import BigQueryWarehouse

with BigQueryWarehouse.load("BLOCK_NAME") as warehouse:
    operation = '''
        SELECT word, word_count
        FROM `bigquery-public-data.samples.shakespeare`
        WHERE corpus = %(corpus)s
        AND word_count >= %(min_word_count)s
        ORDER BY word_count DESC
        LIMIT 6;
    '''
    parameters = {
        "corpus": "romeoandjuliet",
        "min_word_count": 250,
    }
    for _ in range(0, 3):
        result = warehouse.fetch_many(
            operation,
            parameters=parameters,
            size=2
        )
        print(result)

Cloud Storage Module

Creates a bucket named "prefect".

from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import cloud_storage_create_bucket

@flow()
def example_cloud_storage_create_bucket_flow():
    gcp_credentials = GcpCredentials(
        service_account_file="/path/to/service/account/keyfile.json")
    bucket = cloud_storage_create_bucket("prefect", gcp_credentials)

example_cloud_storage_create_bucket_flow()
Upload local folder my_folder to the bucket's folder my_folder.
from prefect_gcp.cloud_storage import GcsBucket

gcs_bucket = GcsBucket.load("my-bucket")
gcs_bucket.upload_from_folder("my_folder")
Downloads blob from bucket.
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import cloud_storage_download_blob_as_bytes

@flow()
def example_cloud_storage_download_blob_flow():
    gcp_credentials = GcpCredentials(
        service_account_file="/path/to/service/account/keyfile.json")
    contents = cloud_storage_download_blob_as_bytes(
        "bucket", "blob", gcp_credentials)
    return contents

example_cloud_storage_download_blob_flow()
Get all folders from a bucket named "my-bucket".
from prefect_gcp.cloud_storage import GcsBucket

gcs_bucket = GcsBucket.load("my-bucket")
gcs_bucket.list_folders()

Get all folders from a folder called years

from prefect_gcp.cloud_storage import GcsBucket

gcs_bucket = GcsBucket.load("my-bucket")
gcs_bucket.list_folders("years")
Download my_folder to a local folder named my_folder.
from prefect_gcp.cloud_storage import GcsBucket

gcs_bucket = GcsBucket.load("my-bucket")
gcs_bucket.download_folder_to_path("my_folder", "my_folder")
Uploads blob to bucket.
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import cloud_storage_upload_blob_from_file

@flow()
def example_cloud_storage_upload_blob_from_file_flow():
    gcp_credentials = GcpCredentials(
        service_account_file="/path/to/service/account/keyfile.json")
    blob = cloud_storage_upload_blob_from_file(
        "/path/somewhere", "bucket", "blob", gcp_credentials)
    return blob

example_cloud_storage_upload_blob_from_file_flow()
Uploads blob to bucket.
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import cloud_storage_upload_blob_from_string

@flow()
def example_cloud_storage_upload_blob_from_string_flow():
    gcp_credentials = GcpCredentials(
        service_account_file="/path/to/service/account/keyfile.json")
    blob = cloud_storage_upload_blob_from_string(
        "data", "bucket", "blob", gcp_credentials)
    return blob

example_cloud_storage_upload_blob_from_string_flow()
Create a bucket.
from prefect_gcp.cloud_storage import GcsBucket

gcs_bucket = GcsBucket(bucket="my-bucket")
gcs_bucket.create_bucket()
Download my_folder/notes.txt object to notes.txt.
from prefect_gcp.cloud_storage import GcsBucket

gcs_bucket = GcsBucket.load("my-bucket")
gcs_bucket.download_object_to_path("my_folder/notes.txt", "notes.txt")
Get the bucket object.
from prefect_gcp.cloud_storage import GcsBucket

gcs_bucket = GcsBucket.load("my-bucket")
gcs_bucket.get_bucket()
Get all blobs from a folder named "prefect".
from prefect_gcp.cloud_storage import GcsBucket

gcs_bucket = GcsBucket.load("my-bucket")
gcs_bucket.list_blobs("prefect")
Upload my_folder/notes.txt object to a BytesIO object.
from io import BytesIO
from prefect_gcp.cloud_storage import GcsBucket

gcs_bucket = GcsBucket.load("my-bucket")
with open("notes.txt", "rb") as f:
    gcs_bucket.upload_from_file_object(f, "my_folder/notes.txt")

Upload BufferedReader object to my_folder/notes.txt.

from io import BufferedReader
from prefect_gcp.cloud_storage import GcsBucket

gcs_bucket = GcsBucket.load("my-bucket")
with open("notes.txt", "rb") as f:
    gcs_bucket.upload_from_file_object(
        BufferedReader(f), "my_folder/notes.txt"
    )
Download my_folder/notes.txt object to a BytesIO object.
from io import BytesIO
from prefect_gcp.cloud_storage import GcsBucket

gcs_bucket = GcsBucket.load("my-bucket")
with BytesIO() as buf:
    gcs_bucket.download_object_to_file_object("my_folder/notes.txt", buf)

Download my_folder/notes.txt object to a BufferedWriter.

    from prefect_gcp.cloud_storage import GcsBucket

    gcs_bucket = GcsBucket.load("my-bucket")
    with open("notes.txt", "wb") as f:
        gcs_bucket.download_object_to_file_object("my_folder/notes.txt", f)
Copies blob from one bucket to another.
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import cloud_storage_copy_blob

@flow()
def example_cloud_storage_copy_blob_flow():
    gcp_credentials = GcpCredentials(
        service_account_file="/path/to/service/account/keyfile.json")
    blob = cloud_storage_copy_blob(
        "source_bucket",
        "dest_bucket",
        "source_blob",
        gcp_credentials
    )
    return blob

example_cloud_storage_copy_blob_flow()
Downloads blob from bucket.
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import cloud_storage_download_blob_to_file

@flow()
def example_cloud_storage_download_blob_flow():
    gcp_credentials = GcpCredentials(
        service_account_file="/path/to/service/account/keyfile.json")
    path = cloud_storage_download_blob_to_file(
        "bucket", "blob", "file_path", gcp_credentials)
    return path

example_cloud_storage_download_blob_flow()
Upload notes.txt to my_folder/notes.txt.
from prefect_gcp.cloud_storage import GcsBucket

gcs_bucket = GcsBucket.load("my-bucket")
gcs_bucket.upload_from_path("notes.txt", "my_folder/notes.txt")

Credentials Module

Gets a GCP Cloud Storage client from a path.

from prefect import flow
from prefect_gcp.credentials import GcpCredentials

@flow()
def example_get_client_flow():
    service_account_file = "~/.secrets/prefect-service-account.json"
    client = GcpCredentials(
        service_account_file=service_account_file
    ).get_cloud_storage_client()
example_get_client_flow()

Gets a GCP Cloud Storage client from a dictionary.

from prefect import flow
from prefect_gcp.credentials import GcpCredentials

@flow()
def example_get_client_flow():
    service_account_info = {
        "type": "service_account",
        "project_id": "project_id",
        "private_key_id": "private_key_id",
        "private_key": "private_key",
        "client_email": "client_email",
        "client_id": "client_id",
        "auth_uri": "auth_uri",
        "token_uri": "token_uri",
        "auth_provider_x509_cert_url": "auth_provider_x509_cert_url",
        "client_x509_cert_url": "client_x509_cert_url"
    }
    client = GcpCredentials(
        service_account_info=service_account_info
    ).get_cloud_storage_client()
example_get_client_flow()
Gets a GCP Job Service client from a path.
from prefect import flow
from prefect_gcp.credentials import GcpCredentials

@flow()
def example_get_client_flow():
    service_account_file = "~/.secrets/prefect-service-account.json"
    client = GcpCredentials(
        service_account_file=service_account_file
    ).get_job_service_client()

example_get_client_flow()

Gets a GCP Cloud Storage client from a dictionary.

from prefect import flow
from prefect_gcp.credentials import GcpCredentials

@flow()
def example_get_client_flow():
    service_account_info = {
        "type": "service_account",
        "project_id": "project_id",
        "private_key_id": "private_key_id",
        "private_key": "private_key",
        "client_email": "client_email",
        "client_id": "client_id",
        "auth_uri": "auth_uri",
        "token_uri": "token_uri",
        "auth_provider_x509_cert_url": "auth_provider_x509_cert_url",
        "client_x509_cert_url": "client_x509_cert_url"
    }
    client = GcpCredentials(
        service_account_info=service_account_info
    ).get_job_service_client()

example_get_client_flow()
Gets a GCP Secret Manager client from a path.
from prefect import flow
from prefect_gcp.credentials import GcpCredentials

@flow()
def example_get_client_flow():
    service_account_file = "~/.secrets/prefect-service-account.json"
    client = GcpCredentials(
        service_account_file=service_account_file
    ).get_secret_manager_client()
example_get_client_flow()

Gets a GCP Cloud Storage client from a dictionary.

from prefect import flow
from prefect_gcp.credentials import GcpCredentials

@flow()
def example_get_client_flow():
    service_account_info = {
        "type": "service_account",
        "project_id": "project_id",
        "private_key_id": "private_key_id",
        "private_key": "private_key",
        "client_email": "client_email",
        "client_id": "client_id",
        "auth_uri": "auth_uri",
        "token_uri": "token_uri",
        "auth_provider_x509_cert_url": "auth_provider_x509_cert_url",
        "client_x509_cert_url": "client_x509_cert_url"
    }
    client = GcpCredentials(
        service_account_info=service_account_info
    ).get_secret_manager_client()
example_get_client_flow()
Gets a GCP BigQuery client from a path.
from prefect import flow
from prefect_gcp.credentials import GcpCredentials

@flow()
def example_get_client_flow():
    service_account_file = "~/.secrets/prefect-service-account.json"
    client = GcpCredentials(
        service_account_file=service_account_file
    ).get_bigquery_client()
example_get_client_flow()

Gets a GCP BigQuery client from a dictionary.

from prefect import flow
from prefect_gcp.credentials import GcpCredentials

@flow()
def example_get_client_flow():
    service_account_info = {
        "type": "service_account",
        "project_id": "project_id",
        "private_key_id": "private_key_id",
        "private_key": "private_key",
        "client_email": "client_email",
        "client_id": "client_id",
        "auth_uri": "auth_uri",
        "token_uri": "token_uri",
        "auth_provider_x509_cert_url": "auth_provider_x509_cert_url",
        "client_x509_cert_url": "client_x509_cert_url"
    }
    client = GcpCredentials(
        service_account_info=service_account_info
    ).get_bigquery_client()

example_get_client_flow()