Skip to content

prefect-azure

PyPI

Welcome!

prefect-azure is a collection of prebuilt Prefect tasks that can be used to quickly construct Prefect flows.

Getting Started

Python setup

Requires an installation of Python 3.8+

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

These tasks are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the Prefect documentation.

Installation

Install prefect-azure with pip

pip install prefect-azure

To use Blob Storage:

pip install "prefect-azure[blob_storage]"

To use Cosmos DB:

pip install "prefect-azure[cosmos_db]"

To use ML Datastore:

pip install "prefect-azure[ml_datastore]"

A list of available blocks in prefect-azure and their setup instructions can be found here.

Download a blob

from prefect import flow

from prefect_azure import AzureBlobStorageCredentials
from prefect_azure.blob_storage import blob_storage_download

@flow
def example_blob_storage_download_flow():
    connection_string = "connection_string"
    blob_storage_credentials = AzureBlobStorageCredentials(
        connection_string=connection_string,
    )
    data = blob_storage_download(
        blob="prefect.txt",
        container="prefect",
        azure_credentials=blob_storage_credentials,
    )
    return data

example_blob_storage_download_flow()

Use with_options to customize options on any existing task or flow:

custom_blob_storage_download_flow = example_blob_storage_download_flow.with_options(
    name="My custom task name",
    retries=2,
    retry_delay_seconds=10,
)

Run a command on an Azure container instance

from prefect import flow
from prefect_azure import AzureContainerInstanceCredentials
from prefect_azure.container_instance import AzureContainerInstanceJob


@flow
def container_instance_job_flow():
    aci_credentials = AzureContainerInstanceCredentials.load("MY_BLOCK_NAME")
    container_instance_job = AzureContainerInstanceJob(
        aci_credentials=aci_credentials,
        resource_group_name="azure_resource_group.example.name",
        subscription_id="<MY_AZURE_SUBSCRIPTION_ID>",
        command=["echo", "hello world"],
    )
    return container_instance_job.run()

Use Azure Container Instance as infrastructure

If we have a_flow_module.py:

from prefect import flow, get_run_logger

@flow
def log_hello_flow(name="Marvin"):
    logger = get_run_logger()
    logger.info(f"{name} said hello!")

if __name__ == "__main__":
    log_hello_flow()

We can run that flow using an Azure Container Instance, but first create the infrastructure block:

from prefect_azure import AzureContainerInstanceCredentials
from prefect_azure.container_instance import AzureContainerInstanceJob

container_instance_job = AzureContainerInstanceJob(
    aci_credentials=AzureContainerInstanceCredentials.load("MY_BLOCK_NAME"),
    resource_group_name="azure_resource_group.example.name",
    subscription_id="<MY_AZURE_SUBSCRIPTION_ID>",
)
container_instance_job.save("aci-dev")

Then, create the deployment either on the UI or through the CLI:

prefect deployment build a_flow_module.py:log_hello_flow --name aci-dev -ib container-instance-job/aci-dev

Visit Prefect Deployments for more information about deployments.

For more tips on how to use tasks and flows in a Collection, check out Using Collections!

Azure Container Instance Worker

The Azure Container Instance worker is an excellent way to run Prefect projects on Azure.

To get started, create an Azure Container Instances typed work pool:

prefect work-pool create -t azure-container-instance my-aci-work-pool

Then, run a worker that pulls jobs from the work pool:

prefect worker start -n my-aci-worker -p my-aci-work-pool

The worker should automatically read the work pool's type and start an Azure Container Instance worker.

Resources

If you encounter and bugs while using prefect-azure, feel free to open an issue in the prefect-azure repository.

If you have any questions or issues while using prefect-azure, you can find help in either the Prefect Discourse forum or the Prefect Slack community

Feel free to star or watch prefect-azure for updates too!

Contributing

If you'd like to help contribute to fix an issue or add a feature to prefect-azure, please propose changes through a pull request from a fork of the repository.

Here are the steps:

  1. Fork the repository
  2. Clone the forked repository
  3. Install the repository and its dependencies:
    pip install -e ".[dev]"
    
  4. Make desired changes
  5. Add tests
  6. Insert an entry to CHANGELOG.md
  7. Install pre-commit to perform quality checks prior to commit:
    pre-commit install
    
  8. git commit, git push, and create a pull request