Skip to content

Examples Catalog

Below is a list of examples for prefect-aws.

Batch Module

Submits a job to batch.

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.batch import batch_submit


@flow
def example_batch_submit_flow():
    aws_credentials = AwsCredentials(
        aws_access_key_id="acccess_key_id",
        aws_secret_access_key="secret_access_key"
    )
    job_id = batch_submit(
        "job_name",
        "job_queue",
        "job_definition",
        aws_credentials
    )
    return job_id

example_batch_submit_flow()

Client Waiter Module

Run an ec2 waiter until instance_exists.

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.client_wait import client_waiter

@flow
def example_client_wait_flow():
    aws_credentials = AwsCredentials(
        aws_access_key_id="acccess_key_id",
        aws_secret_access_key="secret_access_key"
    )

    waiter = client_waiter(
        "ec2",
        "instance_exists",
        aws_credentials
    )

    return waiter
example_client_wait_flow()

Credentials Module

Create an S3 client from an authorized boto3 session:

aws_credentials = AwsCredentials(
    aws_access_key_id = "access_key_id",
    aws_secret_access_key = "secret_access_key"
    )
s3_client = aws_credentials.get_boto3_session().client("s3")
Create an S3 client from an authorized boto3 session

minio_credentials = MinIOCredentials(
    minio_root_user = "minio_root_user",
    minio_root_password = "minio_root_password"
)
s3_client = minio_credentials.get_boto3_session().client(
    service="s3",
    endpoint_url="http://localhost:9000"
)

Glue Job Module

Start a job to AWS Glue Job.

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.glue_job import GlueJobBlock


@flow
def example_run_glue_job():
    aws_credentials = AwsCredentials(
        aws_access_key_id="your_access_key_id",
        aws_secret_access_key="your_secret_access_key"
    )
    glue_job_run = GlueJobBlock(
        job_name="your_glue_job_name",
        arguments={"--YOUR_EXTRA_ARGUMENT": "YOUR_EXTRA_ARGUMENT_VALUE"},
    ).trigger()

    return glue_job_run.wait_for_completion()


example_run_glue_job()

S3 Module

Download my_folder to a local folder named my_folder.

from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
s3_bucket.download_folder_to_path("my_folder", "my_folder")
Read and upload a file to an S3 bucket:

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.s3 import s3_upload


@flow
async def example_s3_upload_flow():
    aws_credentials = AwsCredentials(
        aws_access_key_id="acccess_key_id",
        aws_secret_access_key="secret_access_key"
    )
    with open("data.csv", "rb") as file:
        key = await s3_upload(
            bucket="bucket",
            key="data.csv",
            data=file.read(),
            aws_credentials=aws_credentials,
        )

example_s3_upload_flow()
Upload contents from my_folder to new_folder.
from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
s3_bucket.upload_from_folder("my_folder", "new_folder")
Stream notes.txt from your-bucket/notes.txt to my-bucket/landed/notes.txt.

from prefect_aws.s3 import S3Bucket

your_s3_bucket = S3Bucket.load("your-bucket")
my_s3_bucket = S3Bucket.load("my-bucket")

my_s3_bucket.stream_from(
    your_s3_bucket,
    "notes.txt",
    to_path="landed/notes.txt"
)
Read "subfolder/file1" contents from an S3 bucket named "bucket":
from prefect_aws import AwsCredentials
from prefect_aws.s3 import S3Bucket

aws_creds = AwsCredentials(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)

s3_bucket_block = S3Bucket(
    bucket_name="bucket",
    credentials=aws_creds,
    bucket_folder="subfolder"
)

key_contents = s3_bucket_block.read_path(path="subfolder/file1")
Download a file from an S3 bucket:

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.s3 import s3_download


@flow
async def example_s3_download_flow():
    aws_credentials = AwsCredentials(
        aws_access_key_id="acccess_key_id",
        aws_secret_access_key="secret_access_key"
    )
    data = await s3_download(
        bucket="bucket",
        key="key",
        aws_credentials=aws_credentials,
    )

example_s3_download_flow()
Upload BytesIO object to my_folder/notes.txt.
from io import BytesIO

from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
with open("notes.txt", "rb") as f:
    s3_bucket.upload_from_file_object(f, "my_folder/notes.txt")

Upload BufferedReader object to my_folder/notes.txt.

from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
with open("notes.txt", "rb") as f:
    s3_bucket.upload_from_file_object(
        f, "my_folder/notes.txt"
    )
Download my_folder/notes.txt object to notes.txt.
from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
s3_bucket.download_object_to_path("my_folder/notes.txt", "notes.txt")
Download my_folder/notes.txt object to a BytesIO object.
from io import BytesIO

from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
with BytesIO() as buf:
    s3_bucket.download_object_to_file_object("my_folder/notes.txt", buf)

Download my_folder/notes.txt object to a BufferedWriter.

from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
with open("notes.txt", "wb") as f:
    s3_bucket.download_object_to_file_object("my_folder/notes.txt", f)
Upload notes.txt to my_folder/notes.txt.
from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
s3_bucket.upload_from_path("notes.txt", "my_folder/notes.txt")
List all objects in a bucket:

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.s3 import s3_list_objects


@flow
async def example_s3_list_objects_flow():
    aws_credentials = AwsCredentials(
        aws_access_key_id="acccess_key_id",
        aws_secret_access_key="secret_access_key"
    )
    objects = await s3_list_objects(
        bucket="data_bucket",
        aws_credentials=aws_credentials
    )

example_s3_list_objects_flow()
List objects under the base_folder.
from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
s3_bucket.list_objects("base_folder")

Secrets Manager Module

Read a secret value:

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.secrets_manager import read_secret

@flow
def example_read_secret():
    aws_credentials = AwsCredentials(
        aws_access_key_id="access_key_id",
        aws_secret_access_key="secret_access_key"
    )
    secret_value = read_secret(
        secret_name="db_password",
        aws_credentials=aws_credentials
    )

example_read_secret()
Reads a secret.
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.read_secret()
Write some secret data.
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.write_secret(b"my_secret_data")
Update a secret value:

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.secrets_manager import update_secret

@flow
def example_update_secret():
    aws_credentials = AwsCredentials(
        aws_access_key_id="access_key_id",
        aws_secret_access_key="secret_access_key"
    )
    update_secret(
        secret_name="life_the_universe_and_everything",
        secret_value="42",
        aws_credentials=aws_credentials
    )

example_update_secret()
Delete a secret immediately:

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.secrets_manager import delete_secret

@flow
def example_delete_secret_immediately():
    aws_credentials = AwsCredentials(
        aws_access_key_id="access_key_id",
        aws_secret_access_key="secret_access_key"
    )
    delete_secret(
        secret_name="life_the_universe_and_everything",
        aws_credentials=aws_credentials,
        force_delete_without_recovery: True
    )

example_delete_secret_immediately()

Delete a secret with a 90 day recovery window:

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.secrets_manager import delete_secret

@flow
def example_delete_secret_with_recovery_window():
    aws_credentials = AwsCredentials(
        aws_access_key_id="access_key_id",
        aws_secret_access_key="secret_access_key"
    )
    delete_secret(
        secret_name="life_the_universe_and_everything",
        aws_credentials=aws_credentials,
        recovery_window_in_days=90
    )

example_delete_secret_with_recovery_window()
Deletes the secret with a recovery window of 15 days.
secrets_manager = SecretsManager.load("MY_BLOCK")
secrets_manager.delete_secret(recovery_window_in_days=15)

[Utilities Module][prefect_aws.utilities]

from prefect_aws.utilities import hash_collection

hash_collection({"a": 1, "b": 2})