Skip to content

prefect_azure.blob_storage

Tasks for interacting with Azure Blob Storage

Classes

Functions

blob_storage_download(container, blob, blob_storage_credentials) async

Downloads a blob with a given key from a given Blob Storage container.

Parameters:

Name Type Description Default
blob str

Name of the blob within this container to retrieve.

required
container str

Name of the Blob Storage container to retrieve from.

required
blob_storage_credentials AzureBlobStorageCredentials

Credentials to use for authentication with Azure.

required

Returns:

Type Description
bytes

A bytes representation of the downloaded blob.

Example

Download a file from a Blob Storage container

from prefect import flow

from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_download

@flow
def example_blob_storage_download_flow():
    connection_string = "connection_string"
    blob_storage_credentials = AzureBlobStorageCredentials(
        connection_string=connection_string,
    )
    data = blob_storage_download(
        container="prefect",
        blob="prefect.txt",
        blob_storage_credentials=blob_storage_credentials,
    )
    return data

example_blob_storage_download_flow()

Source code in prefect_azure/blob_storage.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@task
async def blob_storage_download(
    container: str,
    blob: str,
    blob_storage_credentials: "AzureBlobStorageCredentials",
) -> bytes:
    """
    Downloads a blob with a given key from a given Blob Storage container.
    Args:
        blob: Name of the blob within this container to retrieve.
        container: Name of the Blob Storage container to retrieve from.
        blob_storage_credentials: Credentials to use for authentication with Azure.
    Returns:
        A `bytes` representation of the downloaded blob.
    Example:
        Download a file from a Blob Storage container
        ```python
        from prefect import flow

        from prefect_azure import AzureBlobStorageCredentials
        from prefect_azure.blob_storage import blob_storage_download

        @flow
        def example_blob_storage_download_flow():
            connection_string = "connection_string"
            blob_storage_credentials = AzureBlobStorageCredentials(
                connection_string=connection_string,
            )
            data = blob_storage_download(
                container="prefect",
                blob="prefect.txt",
                blob_storage_credentials=blob_storage_credentials,
            )
            return data

        example_blob_storage_download_flow()
        ```
    """
    logger = get_run_logger()
    logger.info("Downloading blob from container %s with key %s", container, blob)

    async with blob_storage_credentials.get_blob_client(container, blob) as blob_client:
        blob_obj = await blob_client.download_blob()
        output = await blob_obj.content_as_bytes()

    return output

blob_storage_list(container, blob_storage_credentials, name_starts_with=None, include=None, **kwargs) async

List objects from a given Blob Storage container.

Parameters:

Name Type Description Default
container str

Name of the Blob Storage container to retrieve from.

required
blob_storage_credentials AzureBlobStorageCredentials

Credentials to use for authentication with Azure.

required
name_starts_with str

Filters the results to return only blobs whose names begin with the specified prefix.

None
include Union[str, List[str]]

Specifies one or more additional datasets to include in the response. Options include: 'snapshots', 'metadata', 'uncommittedblobs', 'copy', 'deleted', 'deletedwithversions', 'tags', 'versions', 'immutabilitypolicy', 'legalhold'.

None
**kwargs

Addtional kwargs passed to ContainerClient.list_blobs()

{}

Returns:

Type Description
List[BlobProperties]

A list of dicts containing metadata about the blob.

Example
from prefect import flow

from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_list

@flow
def example_blob_storage_list_flow():
    connection_string = "connection_string"
    blob_storage_credentials = AzureBlobStorageCredentials(
        connection_string="connection_string",
    )
    data = blob_storage_list(
        container="container",
        blob_storage_credentials=blob_storage_credentials,
    )
    return data

example_blob_storage_list_flow()
Source code in prefect_azure/blob_storage.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
@task
async def blob_storage_list(
    container: str,
    blob_storage_credentials: "AzureBlobStorageCredentials",
    name_starts_with: str = None,
    include: Union[str, List[str]] = None,
    **kwargs
) -> List["BlobProperties"]:
    """
    List objects from a given Blob Storage container.
    Args:
        container: Name of the Blob Storage container to retrieve from.
        blob_storage_credentials: Credentials to use for authentication with Azure.
        name_starts_with: Filters the results to return only blobs whose names
            begin with the specified prefix.
        include: Specifies one or more additional datasets to include in the response.
            Options include: 'snapshots', 'metadata', 'uncommittedblobs', 'copy',
            'deleted', 'deletedwithversions', 'tags', 'versions', 'immutabilitypolicy',
            'legalhold'.
        **kwargs: Addtional kwargs passed to `ContainerClient.list_blobs()`
    Returns:
        A `list` of `dict`s containing metadata about the blob.
    Example:
        ```python
        from prefect import flow

        from prefect_azure import AzureBlobStorageCredentials
        from prefect_azure.blob_storage import blob_storage_list

        @flow
        def example_blob_storage_list_flow():
            connection_string = "connection_string"
            blob_storage_credentials = AzureBlobStorageCredentials(
                connection_string="connection_string",
            )
            data = blob_storage_list(
                container="container",
                blob_storage_credentials=blob_storage_credentials,
            )
            return data

        example_blob_storage_list_flow()
        ```
    """
    logger = get_run_logger()
    logger.info("Listing blobs from container %s", container)

    async with blob_storage_credentials.get_container_client(
        container
    ) as container_client:
        blobs = [
            blob
            async for blob in container_client.list_blobs(
                name_starts_with=name_starts_with, include=include, **kwargs
            )
        ]

    return blobs

blob_storage_upload(data, container, blob_storage_credentials, blob=None, overwrite=False) async

Uploads data to an Blob Storage container.

Parameters:

Name Type Description Default
data bytes

Bytes representation of data to upload to Blob Storage.

required
container str

Name of the Blob Storage container to upload to.

required
blob_storage_credentials AzureBlobStorageCredentials

Credentials to use for authentication with Azure.

required
blob str

Name of the blob within this container to retrieve.

None
overwrite bool

If True, an existing blob with the same name will be overwritten. Defaults to False and an error will be thrown if the blob already exists.

False

Returns:

Type Description
str

The blob name of the uploaded object

Example

Read and upload a file to a Blob Storage container

from prefect import flow

from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_upload

@flow
def example_blob_storage_upload_flow():
    connection_string = "connection_string"
    blob_storage_credentials = AzureBlobStorageCredentials(
        connection_string=connection_string,
    )
    with open("data.csv", "rb") as f:
        blob = blob_storage_upload(
            data=f.read(),
            container="container",
            blob="data.csv",
            blob_storage_credentials=blob_storage_credentials,
            overwrite=False,
        )
    return blob

example_blob_storage_upload_flow()

Source code in prefect_azure/blob_storage.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@task
async def blob_storage_upload(
    data: bytes,
    container: str,
    blob_storage_credentials: "AzureBlobStorageCredentials",
    blob: str = None,
    overwrite: bool = False,
) -> str:
    """
    Uploads data to an Blob Storage container.
    Args:
        data: Bytes representation of data to upload to Blob Storage.
        container: Name of the Blob Storage container to upload to.
        blob_storage_credentials: Credentials to use for authentication with Azure.
        blob: Name of the blob within this container to retrieve.
        overwrite: If `True`, an existing blob with the same name will be overwritten.
            Defaults to `False` and an error will be thrown if the blob already exists.
    Returns:
        The blob name of the uploaded object
    Example:
        Read and upload a file to a Blob Storage container
        ```python
        from prefect import flow

        from prefect_azure import AzureBlobStorageCredentials
        from prefect_azure.blob_storage import blob_storage_upload

        @flow
        def example_blob_storage_upload_flow():
            connection_string = "connection_string"
            blob_storage_credentials = AzureBlobStorageCredentials(
                connection_string=connection_string,
            )
            with open("data.csv", "rb") as f:
                blob = blob_storage_upload(
                    data=f.read(),
                    container="container",
                    blob="data.csv",
                    blob_storage_credentials=blob_storage_credentials,
                    overwrite=False,
                )
            return blob

        example_blob_storage_upload_flow()
        ```
    """
    logger = get_run_logger()
    logger.info("Uploading blob to container %s with key %s", container, blob)

    # create key if not provided
    if blob is None:
        blob = str(uuid.uuid4())

    async with blob_storage_credentials.get_blob_client(container, blob) as blob_client:
        await blob_client.upload_blob(data, overwrite=overwrite)

    return blob