Coordinate and incorporate AWS in your dataflow with prefect-aws
Welcome!
The prefect-aws
collection makes it easy to leverage the capabilities of AWS in your flows, featuring support for ECSTask, S3, Secrets Manager, Batch Job, and Client Waiter.
Getting Started
Saving credentials to a block
You will need an AWS account and credentials in order to use prefect-aws
.
- Refer to the AWS Configuration documentation on how to retrieve your access key ID and secret access key
- Copy the access key ID and secret access key
- Create a short script and replace the placeholders with your credential information and desired block name:
from prefect_aws import AwsCredentials
AwsCredentials(
aws_access_key_id="PLACEHOLDER",
aws_secret_access_key="PLACEHOLDER",
aws_session_token=None, # replace this with token if necessary
region_name="us-east-2"
).save("BLOCK-NAME-PLACEHOLDER")
Congrats! You can now load the saved block to use your credentials in your Python code:
from prefect_aws import AwsCredentials
AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
Registering blocks
Register blocks in this module to view and edit them on Prefect Cloud:
prefect block register -m prefect_aws
Using Prefect with AWS ECS
prefect_aws
allows you to use AWS ECS as infrastructure for your deployments. Using ECS for scheduled flow runs enables the dynamic provisioning of infrastructure for containers and unlocks greater scalability.
The snippets below show how you can use prefect_aws
to run a task on ECS. The first example uses the ECSTask
block as infrastructure and the second example shows using ECS within a flow.
As deployment Infrastructure
Set variables
To expedite copy/paste without the needing to update placeholders manually, update and execute the following.
export CREDENTIALS_BLOCK_NAME="aws-credentials"
export VPC_ID="vpc-id"
export ECS_TASK_BLOCK_NAME="ecs-task-example"
export S3_BUCKET_BLOCK_NAME="ecs-task-bucket-example"
Save an infrastructure and storage block
Save a custom infrastructure and storage block by executing the following snippet.
import os
from prefect_aws import AwsCredentials, ECSTask, S3Bucket
aws_credentials = AwsCredentials.load(os.environ["CREDENTIALS_BLOCK_NAME"])
ecs_task = ECSTask(
image="prefecthq/prefect:2-python3.10",
aws_credentials=aws_credentials,
vpc_id=os.environ["VPC_ID"],
)
ecs_task.save(os.environ["ECS_TASK_BLOCK_NAME"], overwrite=True)
bucket_name = "ecs-task-bucket-example"
s3_client = aws_credentials.get_s3_client()
s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": aws_credentials.region_name}
)
s3_bucket = S3Bucket(
bucket_name=bucket_name,
credentials=aws_credentials,
)
s3_bucket.save(os.environ["S3_BUCKET_BLOCK_NAME"], overwrite=True)
Write a flow
Then, use an existing flow to create a deployment with, or use the flow below if you don't have an existing flow handy.
from prefect import flow
@flow(log_prints=True)
def ecs_task_flow():
print("Hello, Prefect!")
if __name__ == "__main__":
ecs_task_flow()
Create a deployment
If the script was named "ecs_task_script.py", build a deployment manifest with the following command.
prefect deployment build ecs_task_script.py:ecs_task_flow \
-n ecs-task-deployment \
-ib ecs-task/${ECS_TASK_BLOCK_NAME} \
-sb s3-bucket/${S3_BUCKET_BLOCK_NAME} \
--override env.EXTRA_PIP_PACKAGES=prefect-aws
Now apply the deployment!
prefect deployment apply ecs_task_flow-deployment.yaml
Test the deployment
Start an agent in a separate terminal. The agent will poll the Prefect API's work pool for scheduled flow runs.
prefect agent start -q 'default'
Run the deployment once to test it:
prefect deployment run ecs-task-flow/ecs-task-deployment
Once the flow run has completed, you will see Hello, Prefect!
logged in the CLI and the Prefect UI.
No class found for dispatch key
If you encounter an error message like KeyError: "No class found for dispatch key 'ecs-task' in registry for type 'Block'."
,
ensure prefect-aws
is installed in the environment in which your agent is running!
Another tutorial on ECSTask
can be found here.
Within Flow
You can also execute commands with an ECSTask
block directly within a Prefect flow. Running containers via ECS in your flows is useful for executing non-Python code in a distributed manner while using Prefect.
from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.ecs import ECSTask
@flow
def ecs_task_flow():
ecs_task = ECSTask(
image="prefecthq/prefect:2-python3.10",
credentials=AwsCredentials.load("BLOCK-NAME-PLACEHOLDER"),
region="us-east-2",
command=["echo", "Hello, Prefect!"],
)
return ecs_task.run()
This setup gives you all of the observation and orchestration benefits of Prefect, while also providing you the scalability of ECS.
Using Prefect with AWS S3
prefect_aws
allows you to read and write objects with AWS S3 within your Prefect flows.
The provided code snippet shows how you can use prefect_aws
to upload a file to a AWS S3 bucket and download the same file under a different file name.
Note, the following code assumes that the bucket already exists.
from pathlib import Path
from prefect import flow
from prefect_aws import AwsCredentials, S3Bucket
@flow
def s3_flow():
# create a dummy file to upload
file_path = Path("test-example.txt")
file_path.write_text("Hello, Prefect!")
aws_credentials = AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
s3_bucket = S3Bucket(
bucket_name="BUCKET-NAME-PLACEHOLDER",
aws_credentials=aws_credentials
)
s3_bucket_path = s3_bucket.upload_from_path(file_path)
downloaded_file_path = s3_bucket.download_object_to_path(
s3_bucket_path, "downloaded-test-example.txt"
)
return downloaded_file_path.read_text()
s3_flow()
Using Prefect with AWS Secrets Manager
prefect_aws
allows you to read and write secrets with AWS Secrets Manager within your Prefect flows.
The provided code snippet shows how you can use prefect_aws
to write a secret to the Secret Manager, read the secret data, delete the secret, and finally return the secret data.
from prefect import flow
from prefect_aws import AwsCredentials, AwsSecret
@flow
def secrets_manager_flow():
aws_credentials = AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
aws_secret = AwsSecret(secret_name="test-example", aws_credentials=aws_credentials)
aws_secret.write_secret(secret_data=b"Hello, Prefect!")
secret_data = aws_secret.read_secret()
aws_secret.delete_secret()
return secret_data
secrets_manager_flow()
Resources
Refer to the API documentation on the sidebar to explore all the capabilities of Prefect AWS!
For more tips on how to use tasks and flows in a Collection, check out Using Collections!
Recipes
For additional recipes and examples, check out prefect-recipes
.
Installation
Install prefect-aws
pip install prefect-aws
A list of available blocks in prefect-aws
and their setup instructions can be found here.
Requires an installation of Python 3.7+
We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.
These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the Prefect documentation.
Feedback
If you encounter any bugs while using prefect-aws
, feel free to open an issue in the prefect-aws
repository.
If you have any questions or issues while using prefect-aws
, you can find help in either the Prefect Discourse forum or the Prefect Slack community.
Feel free to star or watch prefect-aws
for updates too!