Skip to content

prefect_gcp.cloud_storage

Tasks for interacting with GCP Cloud Storage.

GcsBucket pydantic-model

Block used to store data using GCP Cloud Storage Buckets.

Attributes:

Name Type Description
bucket str

Name of the bucket.

gcp_credentials GcpCredentials

The credentials to authenticate with GCP.

bucket_folder str

A default path to a folder within the GCS bucket to use for reading and writing objects.

Examples:

Load stored GCP Cloud Storage Bucket:

from prefect_gcp import GcsBucket
gcp_cloud_storage_bucket_block = GcsBucket.load("BLOCK_NAME")

Source code in prefect_gcp/cloud_storage.py
class GcsBucket(WritableDeploymentStorage, WritableFileSystem):
    """
    Block used to store data using GCP Cloud Storage Buckets.

    Attributes:
        bucket: Name of the bucket.
        gcp_credentials: The credentials to authenticate with GCP.
        bucket_folder: A default path to a folder within the GCS bucket to use
            for reading and writing objects.

    Example:
        Load stored GCP Cloud Storage Bucket:
        ```python
        from prefect_gcp import GcsBucket
        gcp_cloud_storage_bucket_block = GcsBucket.load("BLOCK_NAME")
        ```
    """

    _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/4CD4wwbiIKPkZDt4U3TEuW/c112fe85653da054b6d5334ef662bec4/gcp.png?h=250"  # noqa
    _block_type_name = "GCS Bucket"

    bucket: str = Field(..., description="Name of the bucket.")
    gcp_credentials: GcpCredentials = Field(
        default_factory=GcpCredentials,
        description="The credentials to authenticate with GCP.",
    )
    bucket_folder: str = Field(
        default="",
        description=(
            "A default path to a folder within the GCS bucket to use "
            "for reading and writing objects."
        ),
    )

    @validator("bucket_folder", pre=True, always=True)
    def _bucket_folder_suffix(cls, value):
        """
        Ensures that the bucket folder is suffixed with a forward slash.
        """
        if value != "" and not value.endswith("/"):
            value = f"{value}/"
        return value

    def _resolve_path(self, path: str) -> str:
        """
        A helper function used in write_path to join `self.bucket_folder` and `path`.

        Args:
            path: Name of the key, e.g. "file1". Each object in your
                bucket has a unique key (or key name).

        Returns:
            The joined path.
        """
        path = path or str(uuid4())

        # If bucket_folder provided, it means we won't write to the root dir of
        # the bucket. So we need to add it on the front of the path.
        path = os.path.join(self.bucket_folder, path) if self.bucket_folder else path
        return path

    @sync_compatible
    async def get_directory(
        self, from_path: Optional[str] = None, local_path: Optional[str] = None
    ) -> None:
        """
        Copies a folder from the configured GCS bucket to a local directory.
        Defaults to copying the entire contents of the block's bucket_folder
        to the current working directory.

        Args:
            from_path: Path in GCS bucket to download from. Defaults to the block's
                configured bucket_folder.
            local_path: Local path to download GCS bucket contents to.
                Defaults to the current working directory.
        """
        from_path = (
            self.bucket_folder if from_path is None else self._resolve_path(from_path)
        )

        if local_path is None:
            local_path = os.path.abspath(".")
        else:
            local_path = os.path.expanduser(local_path)

        project = self.gcp_credentials.project
        client = self.gcp_credentials.get_cloud_storage_client(project=project)

        blobs = await run_sync_in_worker_thread(
            client.list_blobs, self.bucket, prefix=from_path
        )
        for blob in blobs:
            blob_path = blob.name
            if blob_path[-1] == "/":
                # object is a folder and will be created if it contains any objects
                continue
            local_file_path = os.path.join(local_path, blob_path)
            os.makedirs(os.path.dirname(local_file_path), exist_ok=True)

            with disable_run_logger():
                await cloud_storage_download_blob_to_file.fn(
                    bucket=self.bucket,
                    blob=blob_path,
                    path=local_file_path,
                    gcp_credentials=self.gcp_credentials,
                )

    @sync_compatible
    async def put_directory(
        self,
        local_path: Optional[str] = None,
        to_path: Optional[str] = None,
        ignore_file: Optional[str] = None,
    ) -> int:
        """
        Uploads a directory from a given local path to the configured GCS bucket in a
        given folder.

        Defaults to uploading the entire contents the current working directory to the
        block's bucket_folder.

        Args:
            local_path: Path to local directory to upload from.
            to_path: Path in GCS bucket to upload to. Defaults to block's configured
                bucket_folder.
            ignore_file: Path to file containing gitignore style expressions for
                filepaths to ignore.

        Returns:
            The number of files uploaded.
        """
        if local_path is None:
            local_path = os.path.abspath(".")
        else:
            local_path = os.path.expanduser(local_path)

        to_path = self.bucket_folder if to_path is None else self._resolve_path(to_path)

        included_files = None
        if ignore_file:
            with open(ignore_file, "r") as f:
                ignore_patterns = f.readlines()
            included_files = filter_files(local_path, ignore_patterns)

        uploaded_file_count = 0
        for local_file_path in Path(local_path).rglob("*"):
            if (
                included_files is not None
                and local_file_path.name not in included_files
            ):
                continue
            elif not local_file_path.is_dir():
                remote_file_path = os.path.join(
                    to_path, local_file_path.relative_to(local_path)
                )
                local_file_content = local_file_path.read_bytes()
                await self.write_path(remote_file_path, content=local_file_content)
                uploaded_file_count += 1

        return uploaded_file_count

    @sync_compatible
    async def read_path(self, path: str) -> bytes:
        """
        Read specified path from GCS and return contents. Provide the entire
        path to the key in GCS.

        Args:
            path: Entire path to (and including) the key.

        Returns:
            A bytes or string representation of the blob object.
        """
        path = self._resolve_path(path)
        with disable_run_logger():
            contents = await cloud_storage_download_blob_as_bytes.fn(
                bucket=self.bucket, blob=path, gcp_credentials=self.gcp_credentials
            )
        return contents

    @sync_compatible
    async def write_path(self, path: str, content: bytes) -> str:
        """
        Writes to an GCS bucket.

        Args:
            path: The key name. Each object in your bucket has a unique
                key (or key name).
            content: What you are uploading to GCS Bucket.

        Returns:
            The path that the contents were written to.
        """
        path = self._resolve_path(path)
        with disable_run_logger():
            await cloud_storage_upload_blob_from_string.fn(
                data=content,
                bucket=self.bucket,
                blob=path,
                gcp_credentials=self.gcp_credentials,
            )
        return path

bucket: str pydantic-field required

Name of the bucket.

bucket_folder: str pydantic-field

A default path to a folder within the GCS bucket to use for reading and writing objects.

gcp_credentials: GcpCredentials pydantic-field

The credentials to authenticate with GCP.

get_directory async

Copies a folder from the configured GCS bucket to a local directory. Defaults to copying the entire contents of the block's bucket_folder to the current working directory.

Parameters:

Name Type Description Default
from_path Optional[str]

Path in GCS bucket to download from. Defaults to the block's configured bucket_folder.

None
local_path Optional[str]

Local path to download GCS bucket contents to. Defaults to the current working directory.

None
Source code in prefect_gcp/cloud_storage.py
@sync_compatible
async def get_directory(
    self, from_path: Optional[str] = None, local_path: Optional[str] = None
) -> None:
    """
    Copies a folder from the configured GCS bucket to a local directory.
    Defaults to copying the entire contents of the block's bucket_folder
    to the current working directory.

    Args:
        from_path: Path in GCS bucket to download from. Defaults to the block's
            configured bucket_folder.
        local_path: Local path to download GCS bucket contents to.
            Defaults to the current working directory.
    """
    from_path = (
        self.bucket_folder if from_path is None else self._resolve_path(from_path)
    )

    if local_path is None:
        local_path = os.path.abspath(".")
    else:
        local_path = os.path.expanduser(local_path)

    project = self.gcp_credentials.project
    client = self.gcp_credentials.get_cloud_storage_client(project=project)

    blobs = await run_sync_in_worker_thread(
        client.list_blobs, self.bucket, prefix=from_path
    )
    for blob in blobs:
        blob_path = blob.name
        if blob_path[-1] == "/":
            # object is a folder and will be created if it contains any objects
            continue
        local_file_path = os.path.join(local_path, blob_path)
        os.makedirs(os.path.dirname(local_file_path), exist_ok=True)

        with disable_run_logger():
            await cloud_storage_download_blob_to_file.fn(
                bucket=self.bucket,
                blob=blob_path,
                path=local_file_path,
                gcp_credentials=self.gcp_credentials,
            )

put_directory async

Uploads a directory from a given local path to the configured GCS bucket in a given folder.

Defaults to uploading the entire contents the current working directory to the block's bucket_folder.

Parameters:

Name Type Description Default
local_path Optional[str]

Path to local directory to upload from.

None
to_path Optional[str]

Path in GCS bucket to upload to. Defaults to block's configured bucket_folder.

None
ignore_file Optional[str]

Path to file containing gitignore style expressions for filepaths to ignore.

None

Returns:

Type Description
int

The number of files uploaded.

Source code in prefect_gcp/cloud_storage.py
@sync_compatible
async def put_directory(
    self,
    local_path: Optional[str] = None,
    to_path: Optional[str] = None,
    ignore_file: Optional[str] = None,
) -> int:
    """
    Uploads a directory from a given local path to the configured GCS bucket in a
    given folder.

    Defaults to uploading the entire contents the current working directory to the
    block's bucket_folder.

    Args:
        local_path: Path to local directory to upload from.
        to_path: Path in GCS bucket to upload to. Defaults to block's configured
            bucket_folder.
        ignore_file: Path to file containing gitignore style expressions for
            filepaths to ignore.

    Returns:
        The number of files uploaded.
    """
    if local_path is None:
        local_path = os.path.abspath(".")
    else:
        local_path = os.path.expanduser(local_path)

    to_path = self.bucket_folder if to_path is None else self._resolve_path(to_path)

    included_files = None
    if ignore_file:
        with open(ignore_file, "r") as f:
            ignore_patterns = f.readlines()
        included_files = filter_files(local_path, ignore_patterns)

    uploaded_file_count = 0
    for local_file_path in Path(local_path).rglob("*"):
        if (
            included_files is not None
            and local_file_path.name not in included_files
        ):
            continue
        elif not local_file_path.is_dir():
            remote_file_path = os.path.join(
                to_path, local_file_path.relative_to(local_path)
            )
            local_file_content = local_file_path.read_bytes()
            await self.write_path(remote_file_path, content=local_file_content)
            uploaded_file_count += 1

    return uploaded_file_count

read_path async

Read specified path from GCS and return contents. Provide the entire path to the key in GCS.

Parameters:

Name Type Description Default
path str

Entire path to (and including) the key.

required

Returns:

Type Description
bytes

A bytes or string representation of the blob object.

Source code in prefect_gcp/cloud_storage.py
@sync_compatible
async def read_path(self, path: str) -> bytes:
    """
    Read specified path from GCS and return contents. Provide the entire
    path to the key in GCS.

    Args:
        path: Entire path to (and including) the key.

    Returns:
        A bytes or string representation of the blob object.
    """
    path = self._resolve_path(path)
    with disable_run_logger():
        contents = await cloud_storage_download_blob_as_bytes.fn(
            bucket=self.bucket, blob=path, gcp_credentials=self.gcp_credentials
        )
    return contents

write_path async

Writes to an GCS bucket.

Parameters:

Name Type Description Default
path str

The key name. Each object in your bucket has a unique key (or key name).

required
content bytes

What you are uploading to GCS Bucket.

required

Returns:

Type Description
str

The path that the contents were written to.

Source code in prefect_gcp/cloud_storage.py
@sync_compatible
async def write_path(self, path: str, content: bytes) -> str:
    """
    Writes to an GCS bucket.

    Args:
        path: The key name. Each object in your bucket has a unique
            key (or key name).
        content: What you are uploading to GCS Bucket.

    Returns:
        The path that the contents were written to.
    """
    path = self._resolve_path(path)
    with disable_run_logger():
        await cloud_storage_upload_blob_from_string.fn(
            data=content,
            bucket=self.bucket,
            blob=path,
            gcp_credentials=self.gcp_credentials,
        )
    return path

cloud_storage_copy_blob async

Copies data from one Google Cloud Storage bucket to another, without downloading it locally.

Parameters:

Name Type Description Default
source_bucket str

Source bucket name.

required
dest_bucket str

Destination bucket name.

required
source_blob str

Source blob name.

required
gcp_credentials GcpCredentials

Credentials to use for authentication with GCP.

required
dest_blob Optional[str]

Destination blob name; if not provided, defaults to source_blob.

None
timeout Union[float, Tuple[float, float]]

The number of seconds the transport should wait for the server response. Can also be passed as a tuple (connect_timeout, read_timeout).

60
project Optional[str]

Name of the project to use; overrides the gcp_credentials project if provided.

None
**copy_kwargs Dict[str, Any]

Additional keyword arguments to pass to Bucket.copy_blob.

{}

Returns:

Type Description
str

Destination blob name.

Examples:

Copies blob from one bucket to another.

from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import cloud_storage_copy_blob

@flow()
def example_cloud_storage_copy_blob_flow():
    gcp_credentials = GcpCredentials(
        service_account_file="/path/to/service/account/keyfile.json")
    blob = cloud_storage_copy_blob(
        "source_bucket",
        "dest_bucket",
        "source_blob",
        gcp_credentials
    )
    return blob

example_cloud_storage_copy_blob_flow()

Source code in prefect_gcp/cloud_storage.py
@task
async def cloud_storage_copy_blob(
    source_bucket: str,
    dest_bucket: str,
    source_blob: str,
    gcp_credentials: GcpCredentials,
    dest_blob: Optional[str] = None,
    timeout: Union[float, Tuple[float, float]] = 60,
    project: Optional[str] = None,
    **copy_kwargs: Dict[str, Any],
) -> str:
    """
    Copies data from one Google Cloud Storage bucket to another,
    without downloading it locally.

    Args:
        source_bucket: Source bucket name.
        dest_bucket: Destination bucket name.
        source_blob: Source blob name.
        gcp_credentials: Credentials to use for authentication with GCP.
        dest_blob: Destination blob name; if not provided, defaults to source_blob.
        timeout: The number of seconds the transport should wait
            for the server response. Can also be passed as a tuple
            (connect_timeout, read_timeout).
        project: Name of the project to use; overrides the
            gcp_credentials project if provided.
        **copy_kwargs: Additional keyword arguments to pass to
            `Bucket.copy_blob`.

    Returns:
        Destination blob name.

    Example:
        Copies blob from one bucket to another.
        ```python
        from prefect import flow
        from prefect_gcp import GcpCredentials
        from prefect_gcp.cloud_storage import cloud_storage_copy_blob

        @flow()
        def example_cloud_storage_copy_blob_flow():
            gcp_credentials = GcpCredentials(
                service_account_file="/path/to/service/account/keyfile.json")
            blob = cloud_storage_copy_blob(
                "source_bucket",
                "dest_bucket",
                "source_blob",
                gcp_credentials
            )
            return blob

        example_cloud_storage_copy_blob_flow()
        ```
    """
    logger = get_run_logger()
    logger.info(
        "Copying blob named %s from the %s bucket to the %s bucket",
        source_blob,
        source_bucket,
        dest_bucket,
    )

    source_bucket_obj = await _get_bucket(
        source_bucket, gcp_credentials, project=project
    )

    dest_bucket_obj = await _get_bucket(dest_bucket, gcp_credentials, project=project)
    if dest_blob is None:
        dest_blob = source_blob

    source_blob_obj = source_bucket_obj.blob(source_blob)
    await run_sync_in_worker_thread(
        source_bucket_obj.copy_blob,
        blob=source_blob_obj,
        destination_bucket=dest_bucket_obj,
        new_name=dest_blob,
        timeout=timeout,
        **copy_kwargs,
    )

    return dest_blob

cloud_storage_create_bucket async

Creates a bucket.

Parameters:

Name Type Description Default
bucket str

Name of the bucket.

required
gcp_credentials GcpCredentials

Credentials to use for authentication with GCP.

required
project Optional[str]

Name of the project to use; overrides the gcp_credentials project if provided.

None
location Optional[str]

Location of the bucket.

None
**create_kwargs Dict[str, Any]

Additional keyword arguments to pass to client.create_bucket.

{}

Returns:

Type Description
str

The bucket name.

Examples:

Creates a bucket named "prefect".

from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import cloud_storage_create_bucket

@flow()
def example_cloud_storage_create_bucket_flow():
    gcp_credentials = GcpCredentials(
        service_account_file="/path/to/service/account/keyfile.json")
    bucket = cloud_storage_create_bucket("prefect", gcp_credentials)

example_cloud_storage_create_bucket_flow()

Source code in prefect_gcp/cloud_storage.py
@task
async def cloud_storage_create_bucket(
    bucket: str,
    gcp_credentials: GcpCredentials,
    project: Optional[str] = None,
    location: Optional[str] = None,
    **create_kwargs: Dict[str, Any],
) -> str:
    """
    Creates a bucket.

    Args:
        bucket: Name of the bucket.
        gcp_credentials: Credentials to use for authentication with GCP.
        project: Name of the project to use; overrides the
            gcp_credentials project if provided.
        location: Location of the bucket.
        **create_kwargs: Additional keyword arguments to pass to `client.create_bucket`.

    Returns:
        The bucket name.

    Example:
        Creates a bucket named "prefect".
        ```python
        from prefect import flow
        from prefect_gcp import GcpCredentials
        from prefect_gcp.cloud_storage import cloud_storage_create_bucket

        @flow()
        def example_cloud_storage_create_bucket_flow():
            gcp_credentials = GcpCredentials(
                service_account_file="/path/to/service/account/keyfile.json")
            bucket = cloud_storage_create_bucket("prefect", gcp_credentials)

        example_cloud_storage_create_bucket_flow()
        ```
    """
    logger = get_run_logger()
    logger.info("Creating %s bucket", bucket)

    client = gcp_credentials.get_cloud_storage_client(project=project)
    await run_sync_in_worker_thread(
        client.create_bucket, bucket, location=location, **create_kwargs
    )
    return bucket

cloud_storage_download_blob_as_bytes async

Downloads a blob as bytes.

Parameters:

Name Type Description Default
bucket str

Name of the bucket.

required
blob str

Name of the Cloud Storage blob.

required
gcp_credentials GcpCredentials

Credentials to use for authentication with GCP.

required
chunk_size int

The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification.

None
encryption_key Optional[str]

An encryption key.

None
timeout Union[float, Tuple[float, float]]

The number of seconds the transport should wait for the server response. Can also be passed as a tuple (connect_timeout, read_timeout).

60
project Optional[str]

Name of the project to use; overrides the gcp_credentials project if provided.

None
**download_kwargs Dict[str, Any]

Additional keyword arguments to pass to Blob.download_as_bytes.

{}

Returns:

Type Description
bytes

A bytes or string representation of the blob object.

Examples:

Downloads blob from bucket.

from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import cloud_storage_download_blob_as_bytes

@flow()
def example_cloud_storage_download_blob_flow():
    gcp_credentials = GcpCredentials(
        service_account_file="/path/to/service/account/keyfile.json")
    contents = cloud_storage_download_blob_as_bytes(
        "bucket", "blob", gcp_credentials)
    return contents

example_cloud_storage_download_blob_flow()

Source code in prefect_gcp/cloud_storage.py
@task
async def cloud_storage_download_blob_as_bytes(
    bucket: str,
    blob: str,
    gcp_credentials: GcpCredentials,
    chunk_size: Optional[int] = None,
    encryption_key: Optional[str] = None,
    timeout: Union[float, Tuple[float, float]] = 60,
    project: Optional[str] = None,
    **download_kwargs: Dict[str, Any],
) -> bytes:
    """
    Downloads a blob as bytes.

    Args:
        bucket: Name of the bucket.
        blob: Name of the Cloud Storage blob.
        gcp_credentials: Credentials to use for authentication with GCP.
        chunk_size (int, optional): The size of a chunk of data whenever
            iterating (in bytes). This must be a multiple of 256 KB
            per the API specification.
        encryption_key: An encryption key.
        timeout: The number of seconds the transport should wait
            for the server response. Can also be passed as a tuple
            (connect_timeout, read_timeout).
        project: Name of the project to use; overrides the
            gcp_credentials project if provided.
        **download_kwargs: Additional keyword arguments to pass to
            `Blob.download_as_bytes`.

    Returns:
        A bytes or string representation of the blob object.

    Example:
        Downloads blob from bucket.
        ```python
        from prefect import flow
        from prefect_gcp import GcpCredentials
        from prefect_gcp.cloud_storage import cloud_storage_download_blob_as_bytes

        @flow()
        def example_cloud_storage_download_blob_flow():
            gcp_credentials = GcpCredentials(
                service_account_file="/path/to/service/account/keyfile.json")
            contents = cloud_storage_download_blob_as_bytes(
                "bucket", "blob", gcp_credentials)
            return contents

        example_cloud_storage_download_blob_flow()
        ```
    """
    logger = get_run_logger()
    logger.info("Downloading blob named %s from the %s bucket", blob, bucket)

    bucket_obj = await _get_bucket(bucket, gcp_credentials, project=project)
    blob_obj = bucket_obj.blob(
        blob, chunk_size=chunk_size, encryption_key=encryption_key
    )

    contents = await run_sync_in_worker_thread(
        blob_obj.download_as_bytes, timeout=timeout, **download_kwargs
    )
    return contents

cloud_storage_download_blob_to_file async

Downloads a blob to a file path.

Parameters:

Name Type Description Default
bucket str

Name of the bucket.

required
blob str

Name of the Cloud Storage blob.

required
path Union[str, pathlib.Path]

Downloads the contents to the provided file path; if the path is a directory, automatically joins the blob name.

required
gcp_credentials GcpCredentials

Credentials to use for authentication with GCP.

required
chunk_size int

The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification.

None
encryption_key Optional[str]

An encryption key.

None
timeout Union[float, Tuple[float, float]]

The number of seconds the transport should wait for the server response. Can also be passed as a tuple (connect_timeout, read_timeout).

60
project Optional[str]

Name of the project to use; overrides the gcp_credentials project if provided.

None
**download_kwargs Dict[str, Any]

Additional keyword arguments to pass to Blob.download_to_filename.

{}

Returns:

Type Description
Union[str, pathlib.Path]

The path to the blob object.

Examples:

Downloads blob from bucket.

from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import cloud_storage_download_blob_to_file

@flow()
def example_cloud_storage_download_blob_flow():
    gcp_credentials = GcpCredentials(
        service_account_file="/path/to/service/account/keyfile.json")
    path = cloud_storage_download_blob_to_file(
        "bucket", "blob", "file_path", gcp_credentials)
    return path

example_cloud_storage_download_blob_flow()

Source code in prefect_gcp/cloud_storage.py
@task
async def cloud_storage_download_blob_to_file(
    bucket: str,
    blob: str,
    path: Union[str, Path],
    gcp_credentials: GcpCredentials,
    chunk_size: Optional[int] = None,
    encryption_key: Optional[str] = None,
    timeout: Union[float, Tuple[float, float]] = 60,
    project: Optional[str] = None,
    **download_kwargs: Dict[str, Any],
) -> Union[str, Path]:
    """
    Downloads a blob to a file path.

    Args:
        bucket: Name of the bucket.
        blob: Name of the Cloud Storage blob.
        path: Downloads the contents to the provided file path;
            if the path is a directory, automatically joins the blob name.
        gcp_credentials: Credentials to use for authentication with GCP.
        chunk_size (int, optional): The size of a chunk of data whenever
            iterating (in bytes). This must be a multiple of 256 KB
            per the API specification.
        encryption_key: An encryption key.
        timeout: The number of seconds the transport should wait
            for the server response. Can also be passed as a tuple
            (connect_timeout, read_timeout).
        project: Name of the project to use; overrides the
            gcp_credentials project if provided.
        **download_kwargs: Additional keyword arguments to pass to
            `Blob.download_to_filename`.

    Returns:
        The path to the blob object.

    Example:
        Downloads blob from bucket.
        ```python
        from prefect import flow
        from prefect_gcp import GcpCredentials
        from prefect_gcp.cloud_storage import cloud_storage_download_blob_to_file

        @flow()
        def example_cloud_storage_download_blob_flow():
            gcp_credentials = GcpCredentials(
                service_account_file="/path/to/service/account/keyfile.json")
            path = cloud_storage_download_blob_to_file(
                "bucket", "blob", "file_path", gcp_credentials)
            return path

        example_cloud_storage_download_blob_flow()
        ```
    """
    logger = get_run_logger()
    logger.info(
        "Downloading blob named %s from the %s bucket to %s", blob, bucket, path
    )

    bucket_obj = await _get_bucket(bucket, gcp_credentials, project=project)
    blob_obj = bucket_obj.blob(
        blob, chunk_size=chunk_size, encryption_key=encryption_key
    )

    if os.path.isdir(path):
        if isinstance(path, Path):
            path = path.joinpath(blob)  # keep as Path if Path is passed
        else:
            path = os.path.join(path, blob)  # keep as str if a str is passed

    await run_sync_in_worker_thread(
        blob_obj.download_to_filename, path, timeout=timeout, **download_kwargs
    )
    return path

cloud_storage_upload_blob_from_file async

Uploads a blob from file path or file-like object. Usage for passing in file-like object is if the data was downloaded from the web; can bypass writing to disk and directly upload to Cloud Storage.

Parameters:

Name Type Description Default
file Union[str, pathlib.Path, _io.BytesIO]

Path to data or file like object to upload.

required
bucket str

Name of the bucket.

required
blob str

Name of the Cloud Storage blob.

required
gcp_credentials GcpCredentials

Credentials to use for authentication with GCP.

required
content_type Optional[str]

Type of content being uploaded.

None
chunk_size Optional[int]

The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification.

None
encryption_key Optional[str]

An encryption key.

None
timeout Union[float, Tuple[float, float]]

The number of seconds the transport should wait for the server response. Can also be passed as a tuple (connect_timeout, read_timeout).

60
project Optional[str]

Name of the project to use; overrides the gcp_credentials project if provided.

None
**upload_kwargs Dict[str, Any]

Additional keyword arguments to pass to Blob.upload_from_file or Blob.upload_from_filename.

{}

Returns:

Type Description
str

The blob name.

Examples:

Uploads blob to bucket.

from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import cloud_storage_upload_blob_from_file

@flow()
def example_cloud_storage_upload_blob_from_file_flow():
    gcp_credentials = GcpCredentials(
        service_account_file="/path/to/service/account/keyfile.json")
    blob = cloud_storage_upload_blob_from_file(
        "/path/somewhere", "bucket", "blob", gcp_credentials)
    return blob

example_cloud_storage_upload_blob_from_file_flow()

Source code in prefect_gcp/cloud_storage.py
@task
async def cloud_storage_upload_blob_from_file(
    file: Union[str, Path, BytesIO],
    bucket: str,
    blob: str,
    gcp_credentials: GcpCredentials,
    content_type: Optional[str] = None,
    chunk_size: Optional[int] = None,
    encryption_key: Optional[str] = None,
    timeout: Union[float, Tuple[float, float]] = 60,
    project: Optional[str] = None,
    **upload_kwargs: Dict[str, Any],
) -> str:
    """
    Uploads a blob from file path or file-like object. Usage for passing in
    file-like object is if the data was downloaded from the web;
    can bypass writing to disk and directly upload to Cloud Storage.

    Args:
        file: Path to data or file like object to upload.
        bucket: Name of the bucket.
        blob: Name of the Cloud Storage blob.
        gcp_credentials: Credentials to use for authentication with GCP.
        content_type: Type of content being uploaded.
        chunk_size: The size of a chunk of data whenever
            iterating (in bytes). This must be a multiple of 256 KB
            per the API specification.
        encryption_key: An encryption key.
        timeout: The number of seconds the transport should wait
            for the server response. Can also be passed as a tuple
            (connect_timeout, read_timeout).
        project: Name of the project to use; overrides the
            gcp_credentials project if provided.
        **upload_kwargs: Additional keyword arguments to pass to
            `Blob.upload_from_file` or `Blob.upload_from_filename`.

    Returns:
        The blob name.

    Example:
        Uploads blob to bucket.
        ```python
        from prefect import flow
        from prefect_gcp import GcpCredentials
        from prefect_gcp.cloud_storage import cloud_storage_upload_blob_from_file

        @flow()
        def example_cloud_storage_upload_blob_from_file_flow():
            gcp_credentials = GcpCredentials(
                service_account_file="/path/to/service/account/keyfile.json")
            blob = cloud_storage_upload_blob_from_file(
                "/path/somewhere", "bucket", "blob", gcp_credentials)
            return blob

        example_cloud_storage_upload_blob_from_file_flow()
        ```
    """
    logger = get_run_logger()
    logger.info("Uploading blob named %s to the %s bucket", blob, bucket)

    bucket_obj = await _get_bucket(bucket, gcp_credentials, project=project)
    blob_obj = bucket_obj.blob(
        blob, chunk_size=chunk_size, encryption_key=encryption_key
    )

    if isinstance(file, BytesIO):
        await run_sync_in_worker_thread(
            blob_obj.upload_from_file,
            file,
            content_type=content_type,
            timeout=timeout,
            **upload_kwargs,
        )
    else:
        await run_sync_in_worker_thread(
            blob_obj.upload_from_filename,
            file,
            content_type=content_type,
            timeout=timeout,
            **upload_kwargs,
        )
    return blob

cloud_storage_upload_blob_from_string async

Uploads a blob from a string or bytes representation of data.

Parameters:

Name Type Description Default
data Union[str, bytes]

String or bytes representation of data to upload.

required
bucket str

Name of the bucket.

required
blob str

Name of the Cloud Storage blob.

required
gcp_credentials GcpCredentials

Credentials to use for authentication with GCP.

required
content_type Optional[str]

Type of content being uploaded.

None
chunk_size Optional[int]

The size of a chunk of data whenever iterating (in bytes). This must be a multiple of 256 KB per the API specification.

None
encryption_key Optional[str]

An encryption key.

None
timeout Union[float, Tuple[float, float]]

The number of seconds the transport should wait for the server response. Can also be passed as a tuple (connect_timeout, read_timeout).

60
project Optional[str]

Name of the project to use; overrides the gcp_credentials project if provided.

None
**upload_kwargs Dict[str, Any]

Additional keyword arguments to pass to Blob.upload_from_string.

{}

Returns:

Type Description
str

The blob name.

Examples:

Uploads blob to bucket.

from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_storage import cloud_storage_upload_blob_from_string

@flow()
def example_cloud_storage_upload_blob_from_string_flow():
    gcp_credentials = GcpCredentials(
        service_account_file="/path/to/service/account/keyfile.json")
    blob = cloud_storage_upload_blob_from_string(
        "data", "bucket", "blob", gcp_credentials)
    return blob

example_cloud_storage_upload_blob_from_string_flow()

Source code in prefect_gcp/cloud_storage.py
@task
async def cloud_storage_upload_blob_from_string(
    data: Union[str, bytes],
    bucket: str,
    blob: str,
    gcp_credentials: GcpCredentials,
    content_type: Optional[str] = None,
    chunk_size: Optional[int] = None,
    encryption_key: Optional[str] = None,
    timeout: Union[float, Tuple[float, float]] = 60,
    project: Optional[str] = None,
    **upload_kwargs: Dict[str, Any],
) -> str:
    """
    Uploads a blob from a string or bytes representation of data.

    Args:
        data: String or bytes representation of data to upload.
        bucket: Name of the bucket.
        blob: Name of the Cloud Storage blob.
        gcp_credentials: Credentials to use for authentication with GCP.
        content_type: Type of content being uploaded.
        chunk_size: The size of a chunk of data whenever
            iterating (in bytes). This must be a multiple of 256 KB
            per the API specification.
        encryption_key: An encryption key.
        timeout: The number of seconds the transport should wait
            for the server response. Can also be passed as a tuple
            (connect_timeout, read_timeout).
        project: Name of the project to use; overrides the
            gcp_credentials project if provided.
        **upload_kwargs: Additional keyword arguments to pass to
            `Blob.upload_from_string`.

    Returns:
        The blob name.

    Example:
        Uploads blob to bucket.
        ```python
        from prefect import flow
        from prefect_gcp import GcpCredentials
        from prefect_gcp.cloud_storage import cloud_storage_upload_blob_from_string

        @flow()
        def example_cloud_storage_upload_blob_from_string_flow():
            gcp_credentials = GcpCredentials(
                service_account_file="/path/to/service/account/keyfile.json")
            blob = cloud_storage_upload_blob_from_string(
                "data", "bucket", "blob", gcp_credentials)
            return blob

        example_cloud_storage_upload_blob_from_string_flow()
        ```
    """
    logger = get_run_logger()
    logger.info("Uploading blob named %s to the %s bucket", blob, bucket)

    bucket_obj = await _get_bucket(bucket, gcp_credentials, project=project)
    blob_obj = bucket_obj.blob(
        blob, chunk_size=chunk_size, encryption_key=encryption_key
    )

    await run_sync_in_worker_thread(
        blob_obj.upload_from_string,
        data,
        content_type=content_type,
        timeout=timeout,
        **upload_kwargs,
    )
    return blob