Skip to content

Coordinate and incorporate AWS in your dataflow with prefect-aws


PyPI

Welcome!

The prefect-aws collection makes it easy to leverage the capabilities of AWS in your flows, featuring support for ECSTask, S3, Secrets Manager, Batch Job, and Client Waiter.

Getting Started

Saving credentials to a block

You will need an AWS account and credentials in order to use prefect-aws.

  1. Refer to the AWS Configuration documentation on how to retrieve your access key ID and secret access key
  2. Copy the access key ID and secret access key
  3. Create a short script and replace the placeholders with your credential information and desired block name:
from prefect_aws import AwsCredentials
AwsCredentials(
    aws_access_key_id="PLACEHOLDER",
    aws_secret_access_key="PLACEHOLDER",
    aws_session_token=None,  # replace this with token if necessary
    region_name="us-east-2"
).save("BLOCK-NAME-PLACEHOLDER")

Congrats! You can now load the saved block to use your credentials in your Python code:

from prefect_aws import AwsCredentials
AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")

Registering blocks

Register blocks in this module to view and edit them on Prefect Cloud:

prefect block register -m prefect_aws

Using Prefect with AWS ECS

prefect_aws allows you to use AWS ECS as infrastructure for your deployments. Using ECS for scheduled flow runs enables the dynamic provisioning of infrastructure for containers and unlocks greater scalability.

The snippets below show how you can use prefect_aws to run a task on ECS. The first example uses the ECSTask block as infrastructure and the second example shows using ECS within a flow.

As deployment Infrastructure

Set variables

To expedite copy/paste without the needing to update placeholders manually, update and execute the following.

export CREDENTIALS_BLOCK_NAME="aws-credentials"
export VPC_ID="vpc-id"
export ECS_TASK_BLOCK_NAME="ecs-task-example"
export S3_BUCKET_BLOCK_NAME="ecs-task-bucket-example"
Save an infrastructure and storage block

Save a custom infrastructure and storage block by executing the following snippet.

import os
from prefect_aws import AwsCredentials, ECSTask, S3Bucket

aws_credentials = AwsCredentials.load(os.environ["CREDENTIALS_BLOCK_NAME"])

ecs_task = ECSTask(
    image="prefecthq/prefect:2-python3.10",
    aws_credentials=aws_credentials,
    vpc_id=os.environ["VPC_ID"],
)
ecs_task.save(os.environ["ECS_TASK_BLOCK_NAME"], overwrite=True)

bucket_name = "ecs-task-bucket-example"
s3_client = aws_credentials.get_s3_client()
s3_client.create_bucket(
    Bucket=bucket_name,
    CreateBucketConfiguration={"LocationConstraint": aws_credentials.region_name}
)
s3_bucket = S3Bucket(
    bucket_name=bucket_name,
    credentials=aws_credentials,
)
s3_bucket.save(os.environ["S3_BUCKET_BLOCK_NAME"], overwrite=True)
Write a flow

Then, use an existing flow to create a deployment with, or use the flow below if you don't have an existing flow handy.

from prefect import flow

@flow(log_prints=True)
def ecs_task_flow():
    print("Hello, Prefect!")

if __name__ == "__main__":
    ecs_task_flow()
Create a deployment

If the script was named "ecs_task_script.py", build a deployment manifest with the following command.

prefect deployment build ecs_task_script.py:ecs_task_flow \
    -n ecs-task-deployment \
    -ib ecs-task/${ECS_TASK_BLOCK_NAME} \
    -sb s3-bucket/${S3_BUCKET_BLOCK_NAME} \
    --override env.EXTRA_PIP_PACKAGES=prefect-aws

Now apply the deployment!

prefect deployment apply ecs_task_flow-deployment.yaml
Test the deployment

Start an agent in a separate terminal. The agent will poll the Prefect API's work pool for scheduled flow runs.

prefect agent start -q 'default'

Run the deployment once to test it:

prefect deployment run ecs-task-flow/ecs-task-deployment

Once the flow run has completed, you will see Hello, Prefect! logged in the CLI and the Prefect UI.

No class found for dispatch key

If you encounter an error message like KeyError: "No class found for dispatch key 'ecs-task' in registry for type 'Block'.", ensure prefect-aws is installed in the environment in which your agent is running!

Another tutorial on ECSTask can be found here.

Within Flow

You can also execute commands with an ECSTask block directly within a Prefect flow. Running containers via ECS in your flows is useful for executing non-Python code in a distributed manner while using Prefect.

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.ecs import ECSTask

@flow
def ecs_task_flow():
    ecs_task = ECSTask(
        image="prefecthq/prefect:2-python3.10",
        credentials=AwsCredentials.load("BLOCK-NAME-PLACEHOLDER"),
        region="us-east-2",
        command=["echo", "Hello, Prefect!"],
    )
    return ecs_task.run()

This setup gives you all of the observation and orchestration benefits of Prefect, while also providing you the scalability of ECS.

Using Prefect with AWS S3

prefect_aws allows you to read and write objects with AWS S3 within your Prefect flows.

The provided code snippet shows how you can use prefect_aws to upload a file to a AWS S3 bucket and download the same file under a different file name.

Note, the following code assumes that the bucket already exists.

from pathlib import Path
from prefect import flow
from prefect_aws import AwsCredentials, S3Bucket

@flow
def s3_flow():
    # create a dummy file to upload
    file_path = Path("test-example.txt")
    file_path.write_text("Hello, Prefect!")

    aws_credentials = AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
    s3_bucket = S3Bucket(
        bucket_name="BUCKET-NAME-PLACEHOLDER",
        aws_credentials=aws_credentials
    )

    s3_bucket_path = s3_bucket.upload_from_path(file_path)
    downloaded_file_path = s3_bucket.download_object_to_path(
        s3_bucket_path, "downloaded-test-example.txt"
    )
    return downloaded_file_path.read_text()

s3_flow()

Using Prefect with AWS Secrets Manager

prefect_aws allows you to read and write secrets with AWS Secrets Manager within your Prefect flows.

The provided code snippet shows how you can use prefect_aws to write a secret to the Secret Manager, read the secret data, delete the secret, and finally return the secret data.

from prefect import flow
from prefect_aws import AwsCredentials, AwsSecret

@flow
def secrets_manager_flow():
    aws_credentials = AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
    aws_secret = AwsSecret(secret_name="test-example", aws_credentials=aws_credentials)
    aws_secret.write_secret(secret_data=b"Hello, Prefect!")
    secret_data = aws_secret.read_secret()
    aws_secret.delete_secret()
    return secret_data

secrets_manager_flow()

Resources

Refer to the API documentation on the sidebar to explore all the capabilities of Prefect AWS!

For more tips on how to use tasks and flows in a Collection, check out Using Collections!

Recipes

For additional recipes and examples, check out prefect-recipes.

Installation

Install prefect-aws

pip install prefect-aws

A list of available blocks in prefect-aws and their setup instructions can be found here.

Requires an installation of Python 3.7+

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the Prefect documentation.

Feedback

If you encounter any bugs while using prefect-aws, feel free to open an issue in the prefect-aws repository.

If you have any questions or issues while using prefect-aws, you can find help in either the Prefect Discourse forum or the Prefect Slack community.

Feel free to star or watch prefect-aws for updates too!