Skip to content

prefect_aws.s3

Tasks for interacting with AWS S3

Classes

S3Bucket (WritableFileSystem, WritableDeploymentStorage, ObjectStorageBlock) pydantic-model

Block used to store data using AWS S3 or S3-compatible object storage like MinIO.

Attributes:

Name Type Description
bucket_name str

Name of your bucket.

credentials Union[prefect_aws.credentials.AwsCredentials, prefect_aws.credentials.MinIOCredentials]

A block containing your credentials to AWS or MinIO.

bucket_folder str

A default path to a folder within the S3 bucket to use for reading and writing objects.

Source code in prefect_aws/s3.py
class S3Bucket(WritableFileSystem, WritableDeploymentStorage, ObjectStorageBlock):

    """
    Block used to store data using AWS S3 or S3-compatible object storage like MinIO.

    Attributes:
        bucket_name: Name of your bucket.
        credentials: A block containing your credentials to AWS or MinIO.
        bucket_folder: A default path to a folder within the S3 bucket to use
            for reading and writing objects.
    """

    _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/1jbV4lceHOjGgunX15lUwT/db88e184d727f721575aeb054a37e277/aws.png?h=250"  # noqa
    _block_type_name = "S3 Bucket"
    _documentation_url = (
        "https://prefecthq.github.io/prefect-aws/s3/#prefect_aws.s3.S3Bucket"  # noqa
    )

    bucket_name: str = Field(default=..., description="Name of your bucket.")

    credentials: Union[AwsCredentials, MinIOCredentials] = Field(
        default_factory=AwsCredentials,
        description="A block containing your credentials to AWS or MinIO.",
    )

    bucket_folder: str = Field(
        default="",
        description=(
            "A default path to a folder within the S3 bucket to use "
            "for reading and writing objects."
        ),
    )

    class Config:
        smart_union = True

    # Property to maintain compatibility with storage block based deployments
    @property
    def basepath(self) -> str:
        """
        The base path of the S3 bucket.

        Returns:
            str: The base path of the S3 bucket.
        """
        return self.bucket_folder

    @basepath.setter
    def basepath(self, value: str) -> None:
        self.bucket_folder = value

    def _resolve_path(self, path: str) -> str:
        """
        A helper function used in write_path to join `self.basepath` and `path`.

        Args:

            path: Name of the key, e.g. "file1". Each object in your
                bucket has a unique key (or key name).

        """
        # If bucket_folder provided, it means we won't write to the root dir of
        # the bucket. So we need to add it on the front of the path.
        #
        # AWS object key naming guidelines require '/' for bucket folders.
        # Get POSIX path to prevent `pathlib` from inferring '\' on Windows OS
        path = (
            (Path(self.bucket_folder) / path).as_posix() if self.bucket_folder else path
        )

        return path

    def _get_s3_client(self) -> boto3.client:
        """
        Authenticate MinIO credentials or AWS credentials and return an S3 client.
        This is a helper function called by read_path() or write_path().
        """
        return self.credentials.get_s3_client()

    def _get_bucket_resource(self) -> boto3.resource:
        """
        Retrieves boto3 resource object for the configured bucket
        """
        params_override = self.credentials.aws_client_parameters.get_params_override()
        bucket = (
            self.credentials.get_boto3_session()
            .resource("s3", **params_override)
            .Bucket(self.bucket_name)
        )
        return bucket

    @sync_compatible
    async def get_directory(
        self, from_path: Optional[str] = None, local_path: Optional[str] = None
    ) -> None:
        """
        Copies a folder from the configured S3 bucket to a local directory.

        Defaults to copying the entire contents of the block's basepath to the current
        working directory.

        Args:
            from_path: Path in S3 bucket to download from. Defaults to the block's
                configured basepath.
            local_path: Local path to download S3 contents to. Defaults to the current
                working directory.
        """
        bucket_folder = self.bucket_folder
        if from_path is None:
            from_path = str(bucket_folder) if bucket_folder else ""

        if local_path is None:
            local_path = str(Path(".").absolute())
        else:
            local_path = str(Path(local_path).expanduser())

        bucket = self._get_bucket_resource()
        for obj in bucket.objects.filter(Prefix=from_path):
            if obj.key[-1] == "/":
                # object is a folder and will be created if it contains any objects
                continue
            target = os.path.join(
                local_path,
                os.path.relpath(obj.key, from_path),
            )
            os.makedirs(os.path.dirname(target), exist_ok=True)
            bucket.download_file(obj.key, target)

    @sync_compatible
    async def put_directory(
        self,
        local_path: Optional[str] = None,
        to_path: Optional[str] = None,
        ignore_file: Optional[str] = None,
    ) -> int:
        """
        Uploads a directory from a given local path to the configured S3 bucket in a
        given folder.

        Defaults to uploading the entire contents the current working directory to the
        block's basepath.

        Args:
            local_path: Path to local directory to upload from.
            to_path: Path in S3 bucket to upload to. Defaults to block's configured
                basepath.
            ignore_file: Path to file containing gitignore style expressions for
                filepaths to ignore.

        """
        to_path = "" if to_path is None else to_path

        if local_path is None:
            local_path = "."

        included_files = None
        if ignore_file:
            with open(ignore_file, "r") as f:
                ignore_patterns = f.readlines()

            included_files = filter_files(local_path, ignore_patterns)

        uploaded_file_count = 0
        for local_file_path in Path(local_path).expanduser().rglob("*"):
            if (
                included_files is not None
                and str(local_file_path.relative_to(local_path)) not in included_files
            ):
                continue
            elif not local_file_path.is_dir():
                remote_file_path = Path(to_path) / local_file_path.relative_to(
                    local_path
                )
                with open(local_file_path, "rb") as local_file:
                    local_file_content = local_file.read()

                await self.write_path(
                    remote_file_path.as_posix(), content=local_file_content
                )
                uploaded_file_count += 1

        return uploaded_file_count

    @sync_compatible
    async def read_path(self, path: str) -> bytes:
        """
        Read specified path from S3 and return contents. Provide the entire
        path to the key in S3.

        Args:
            path: Entire path to (and including) the key.

        Example:
            Read "subfolder/file1" contents from an S3 bucket named "bucket":
            ```python
            from prefect_aws import AwsCredentials
            from prefect_aws.s3 import S3Bucket

            aws_creds = AwsCredentials(
                aws_access_key_id=AWS_ACCESS_KEY_ID,
                aws_secret_access_key=AWS_SECRET_ACCESS_KEY
            )

            s3_bucket_block = S3Bucket(
                bucket_name="bucket",
                aws_credentials=aws_creds,
                basepath="subfolder"
            )

            key_contents = s3_bucket_block.read_path(path="subfolder/file1")
            ```
        """
        path = self._resolve_path(path)

        return await run_sync_in_worker_thread(self._read_sync, path)

    def _read_sync(self, key: str) -> bytes:
        """
        Called by read_path(). Creates an S3 client and retrieves the
        contents from  a specified path.
        """

        s3_client = self._get_s3_client()

        with io.BytesIO() as stream:
            s3_client.download_fileobj(Bucket=self.bucket_name, Key=key, Fileobj=stream)
            stream.seek(0)
            output = stream.read()
            return output

    @sync_compatible
    async def write_path(self, path: str, content: bytes) -> str:
        """
        Writes to an S3 bucket.

        Args:

            path: The key name. Each object in your bucket has a unique
                key (or key name).
            content: What you are uploading to S3.

        Example:

            Write data to the path `dogs/small_dogs/havanese` in an S3 Bucket:
            ```python
            from prefect_aws import MinioCredentials
            from prefect_aws.s3 import S3Bucket

            minio_creds = MinIOCredentials(
                minio_root_user = "minioadmin",
                minio_root_password = "minioadmin",
            )

            s3_bucket_block = S3Bucket(
                bucket_name="bucket",
                minio_credentials=minio_creds,
                basepath="dogs/smalldogs",
                endpoint_url="http://localhost:9000",
            )
            s3_havanese_path = s3_bucket_block.write_path(path="havanese", content=data)
            ```
        """

        path = self._resolve_path(path)

        await run_sync_in_worker_thread(self._write_sync, path, content)

        return path

    def _write_sync(self, key: str, data: bytes) -> None:
        """
        Called by write_path(). Creates an S3 client and uploads a file
        object.
        """

        s3_client = self._get_s3_client()

        with io.BytesIO(data) as stream:
            s3_client.upload_fileobj(Fileobj=stream, Bucket=self.bucket_name, Key=key)

    # NEW BLOCK INTERFACE METHODS BELOW
    @staticmethod
    def _list_objects_sync(page_iterator: PageIterator) -> List[Dict[str, Any]]:
        """
        Synchronous method to collect S3 objects into a list

        Args:
            page_iterator: AWS Paginator for S3 objects

        Returns:
            List[Dict]: List of object information
        """
        return [
            content for page in page_iterator for content in page.get("Contents", [])
        ]

    def _join_bucket_folder(self, bucket_path: str = "") -> str:
        """
        Joins the base bucket folder to the bucket path.
        NOTE: If a method reuses another method in this class, be careful to not
        call this  twice because it'll join the bucket folder twice.
        See https://github.com/PrefectHQ/prefect-aws/issues/141 for a past issue.
        """
        if not self.bucket_folder and not bucket_path:
            # there's a difference between "." and "", at least in the tests
            return ""

        bucket_path = str(bucket_path)
        if self.bucket_folder != "" and bucket_path.startswith(self.bucket_folder):
            self.logger.info(
                f"Bucket path {bucket_path!r} is already prefixed with "
                f"bucket folder {self.bucket_folder!r}; is this intentional?"
            )

        return (Path(self.bucket_folder) / bucket_path).as_posix() + (
            "" if not bucket_path.endswith("/") else "/"
        )

    @sync_compatible
    async def list_objects(
        self,
        folder: str = "",
        delimiter: str = "",
        page_size: Optional[int] = None,
        max_items: Optional[int] = None,
        jmespath_query: Optional[str] = None,
    ) -> List[Dict[str, Any]]:
        """
        Args:
            folder: Folder to list objects from.
            delimiter: Character used to group keys of listed objects.
            page_size: Number of objects to return in each request to the AWS API.
            max_items: Maximum number of objects that to be returned by task.
            jmespath_query: Query used to filter objects based on object attributes refer to
                the [boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/paginators.html#filtering-results-with-jmespath)
                for more information on how to construct queries.

        Returns:
            List of objects and their metadata in the bucket.

        Examples:
            List objects under the `base_folder`.
            ```python
            from prefect_aws.s3 import S3Bucket

            s3_bucket = S3Bucket.load("my-bucket")
            s3_bucket.list_objects("base_folder")
            ```
        """  # noqa: E501
        bucket_path = self._join_bucket_folder(folder)
        client = self.credentials.get_s3_client()
        paginator = client.get_paginator("list_objects_v2")
        page_iterator = paginator.paginate(
            Bucket=self.bucket_name,
            Prefix=bucket_path,
            Delimiter=delimiter,
            PaginationConfig={"PageSize": page_size, "MaxItems": max_items},
        )
        if jmespath_query:
            page_iterator = page_iterator.search(f"{jmespath_query} | {{Contents: @}}")

        self.logger.info(f"Listing objects in bucket {bucket_path}.")
        objects = await run_sync_in_worker_thread(
            self._list_objects_sync, page_iterator
        )
        return objects

    @sync_compatible
    async def download_object_to_path(
        self,
        from_path: str,
        to_path: Optional[Union[str, Path]],
        **download_kwargs: Dict[str, Any],
    ) -> Path:
        """
        Downloads an object from the S3 bucket to a path.

        Args:
            from_path: The path to the object to download; this gets prefixed
                with the bucket_folder.
            to_path: The path to download the object to. If not provided, the
                object's name will be used.
            **download_kwargs: Additional keyword arguments to pass to
                `Client.download_file`.

        Returns:
            The absolute path that the object was downloaded to.

        Examples:
            Download my_folder/notes.txt object to notes.txt.
            ```python
            from prefect_aws.s3 import S3Bucket

            s3_bucket = S3Bucket.load("my-bucket")
            s3_bucket.download_object_to_path("my_folder/notes.txt", "notes.txt")
            ```
        """
        if to_path is None:
            to_path = Path(from_path).name

        # making path absolute, but converting back to str here
        # since !r looks nicer that way and filename arg expects str
        to_path = str(Path(to_path).absolute())
        bucket_path = self._join_bucket_folder(from_path)
        client = self.credentials.get_s3_client()

        self.logger.debug(
            f"Preparing to download object from bucket {self.bucket_name!r} "
            f"path {bucket_path!r} to {to_path!r}."
        )
        await run_sync_in_worker_thread(
            client.download_file,
            Bucket=self.bucket_name,
            Key=from_path,
            Filename=to_path,
            **download_kwargs,
        )
        self.logger.info(
            f"Downloaded object from bucket {self.bucket_name!r} path {bucket_path!r} "
            f"to {to_path!r}."
        )
        return Path(to_path)

    @sync_compatible
    async def download_object_to_file_object(
        self,
        from_path: str,
        to_file_object: BinaryIO,
        **download_kwargs: Dict[str, Any],
    ) -> BinaryIO:
        """
        Downloads an object from the object storage service to a file-like object,
        which can be a BytesIO object or a BufferedWriter.

        Args:
            from_path: The path to the object to download from; this gets prefixed
                with the bucket_folder.
            to_file_object: The file-like object to download the object to.
            **download_kwargs: Additional keyword arguments to pass to
                `Client.download_fileobj`.

        Returns:
            The file-like object that the object was downloaded to.

        Examples:
            Download my_folder/notes.txt object to a BytesIO object.
            ```python
            from io import BytesIO

            from prefect_aws.s3 import S3Bucket

            s3_bucket = S3Bucket.load("my-bucket")
            with BytesIO() as buf:
                s3_bucket.download_object_to_file_object("my_folder/notes.txt", buf)
            ```

            Download my_folder/notes.txt object to a BufferedWriter.
            ```python
            from prefect_aws.s3 import S3Bucket

            s3_bucket = S3Bucket.load("my-bucket")
            with open("notes.txt", "wb") as f:
                s3_bucket.download_object_to_file_object("my_folder/notes.txt", f)
            ```
        """
        client = self.credentials.get_s3_client()
        bucket_path = self._join_bucket_folder(from_path)

        self.logger.debug(
            f"Preparing to download object from bucket {self.bucket_name!r} "
            f"path {bucket_path!r} to file object."
        )
        await run_sync_in_worker_thread(
            client.download_fileobj,
            Bucket=self.bucket_name,
            Key=bucket_path,
            Fileobj=to_file_object,
            **download_kwargs,
        )
        self.logger.info(
            f"Downloaded object from bucket {self.bucket_name!r} path {bucket_path!r} "
            "to file object."
        )
        return to_file_object

    @sync_compatible
    async def download_folder_to_path(
        self,
        from_folder: str,
        to_folder: Optional[Union[str, Path]] = None,
        **download_kwargs: Dict[str, Any],
    ) -> Path:
        """
        Downloads objects *within* a folder (excluding the folder itself)
        from the S3 bucket to a folder.

        Args:
            from_folder: The path to the folder to download from.
            to_folder: The path to download the folder to.
            **download_kwargs: Additional keyword arguments to pass to
                `Client.download_file`.

        Returns:
            The absolute path that the folder was downloaded to.

        Examples:
            Download my_folder to a local folder named my_folder.
            ```python
            from prefect_aws.s3 import S3Bucket

            s3_bucket = S3Bucket.load("my-bucket")
            s3_bucket.download_folder_to_path("my_folder", "my_folder")
            ```
        """
        if to_folder is None:
            to_folder = ""
        to_folder = Path(to_folder).absolute()

        client = self.credentials.get_s3_client()
        objects = await self.list_objects(folder=from_folder)

        # do not call self._join_bucket_folder for filter
        # because it's built-in to that method already!
        # however, we still need to do it because we're using relative_to
        bucket_folder = self._join_bucket_folder(from_folder)

        async_coros = []
        for object in objects:
            bucket_path = Path(object["Key"]).relative_to(bucket_folder)
            # this skips the actual directory itself, e.g.
            # `my_folder/` will be skipped
            # `my_folder/notes.txt` will be downloaded
            if bucket_path.is_dir():
                continue
            to_path = to_folder / bucket_path
            to_path.parent.mkdir(parents=True, exist_ok=True)
            to_path = str(to_path)  # must be string
            self.logger.info(
                f"Downloading object from bucket {self.bucket_name!r} path "
                f"{bucket_path.as_posix()!r} to {to_path!r}."
            )
            async_coros.append(
                run_sync_in_worker_thread(
                    client.download_file,
                    Bucket=self.bucket_name,
                    Key=object["Key"],
                    Filename=to_path,
                    **download_kwargs,
                )
            )
        await asyncio.gather(*async_coros)

        return Path(to_folder)

    @sync_compatible
    async def stream_from(
        self,
        bucket: "S3Bucket",
        from_path: str,
        to_path: Optional[str] = None,
        **upload_kwargs: Dict[str, Any],
    ) -> str:
        """Streams an object from another bucket to this bucket.

        Args:
            bucket: The bucket to stream from.
            from_path: The path of the object to stream.
            to_path: The path to stream the object to. Defaults to the object's name.
            **upload_kwargs: Additional keyword arguments to pass to
                `Client.upload_fileobj`.

        Returns:
            The path that the object was uploaded to.

        Examples:
            Stream notes.txt from your-bucket/notes.txt to my-bucket/landed/notes.txt.

            ```python
            from prefect_aws.s3 import S3Bucket

            your_s3_bucket = S3Bucket.load("your-bucket")
            my_s3_bucket = S3Bucket.load("my-bucket")

            my_s3_bucket.stream_from(
                your_s3_bucket,
                "notes.txt",
                to_path="landed/notes.txt"
            )
            ```

        """
        if to_path is None:
            to_path = Path(from_path).name

        # Get the source object's StreamingBody
        from_path: str = bucket._join_bucket_folder(from_path)
        from_client = bucket.credentials.get_s3_client()
        obj = await run_sync_in_worker_thread(
            from_client.get_object, Bucket=bucket.bucket_name, Key=from_path
        )
        body: StreamingBody = obj["Body"]

        # Upload the StreamingBody to this bucket
        bucket_path = str(self._join_bucket_folder(to_path))
        to_client = self.credentials.get_s3_client()
        await run_sync_in_worker_thread(
            to_client.upload_fileobj,
            Fileobj=body,
            Bucket=self.bucket_name,
            Key=bucket_path,
            **upload_kwargs,
        )
        self.logger.info(
            f"Streamed s3://{bucket.bucket_name}/{from_path} to the bucket "
            f"{self.bucket_name!r} path {bucket_path!r}."
        )
        return bucket_path

    @sync_compatible
    async def upload_from_path(
        self,
        from_path: Union[str, Path],
        to_path: Optional[str] = None,
        **upload_kwargs: Dict[str, Any],
    ) -> str:
        """
        Uploads an object from a path to the S3 bucket.

        Args:
            from_path: The path to the file to upload from.
            to_path: The path to upload the file to.
            **upload_kwargs: Additional keyword arguments to pass to `Client.upload`.

        Returns:
            The path that the object was uploaded to.

        Examples:
            Upload notes.txt to my_folder/notes.txt.
            ```python
            from prefect_aws.s3 import S3Bucket

            s3_bucket = S3Bucket.load("my-bucket")
            s3_bucket.upload_from_path("notes.txt", "my_folder/notes.txt")
            ```
        """
        from_path = str(Path(from_path).absolute())
        if to_path is None:
            to_path = Path(from_path).name

        bucket_path = str(self._join_bucket_folder(to_path))
        client = self.credentials.get_s3_client()

        await run_sync_in_worker_thread(
            client.upload_file,
            Filename=from_path,
            Bucket=self.bucket_name,
            Key=bucket_path,
            **upload_kwargs,
        )
        self.logger.info(
            f"Uploaded from {from_path!r} to the bucket "
            f"{self.bucket_name!r} path {bucket_path!r}."
        )
        return bucket_path

    @sync_compatible
    async def upload_from_file_object(
        self, from_file_object: BinaryIO, to_path: str, **upload_kwargs: Dict[str, Any]
    ) -> str:
        """
        Uploads an object to the S3 bucket from a file-like object,
        which can be a BytesIO object or a BufferedReader.

        Args:
            from_file_object: The file-like object to upload from.
            to_path: The path to upload the object to.
            **upload_kwargs: Additional keyword arguments to pass to
                `Client.upload_fileobj`.

        Returns:
            The path that the object was uploaded to.

        Examples:
            Upload BytesIO object to my_folder/notes.txt.
            ```python
            from io import BytesIO

            from prefect_aws.s3 import S3Bucket

            s3_bucket = S3Bucket.load("my-bucket")
            with open("notes.txt", "rb") as f:
                s3_bucket.upload_from_file_object(f, "my_folder/notes.txt")
            ```

            Upload BufferedReader object to my_folder/notes.txt.
            ```python
            from prefect_aws.s3 import S3Bucket

            s3_bucket = S3Bucket.load("my-bucket")
            with open("notes.txt", "rb") as f:
                s3_bucket.upload_from_file_object(
                    f, "my_folder/notes.txt"
                )
            ```
        """
        bucket_path = str(self._join_bucket_folder(to_path))
        client = self.credentials.get_s3_client()
        await run_sync_in_worker_thread(
            client.upload_fileobj,
            Fileobj=from_file_object,
            Bucket=self.bucket_name,
            Key=bucket_path,
            **upload_kwargs,
        )
        self.logger.info(
            "Uploaded from file object to the bucket "
            f"{self.bucket_name!r} path {bucket_path!r}."
        )
        return bucket_path

    @sync_compatible
    async def upload_from_folder(
        self,
        from_folder: Union[str, Path],
        to_folder: Optional[str] = None,
        **upload_kwargs: Dict[str, Any],
    ) -> str:
        """
        Uploads files *within* a folder (excluding the folder itself)
        to the object storage service folder.

        Args:
            from_folder: The path to the folder to upload from.
            to_folder: The path to upload the folder to.
            **upload_kwargs: Additional keyword arguments to pass to
                `Client.upload_fileobj`.

        Returns:
            The path that the folder was uploaded to.

        Examples:
            Upload contents from my_folder to new_folder.
            ```python
            from prefect_aws.s3 import S3Bucket

            s3_bucket = S3Bucket.load("my-bucket")
            s3_bucket.upload_from_folder("my_folder", "new_folder")
            ```
        """
        from_folder = Path(from_folder)
        bucket_folder = self._join_bucket_folder(to_folder or "")

        num_uploaded = 0
        client = self.credentials.get_s3_client()

        async_coros = []
        for from_path in from_folder.rglob("**/*"):
            # this skips the actual directory itself, e.g.
            # `my_folder/` will be skipped
            # `my_folder/notes.txt` will be uploaded
            if from_path.is_dir():
                continue
            bucket_path = (
                Path(bucket_folder) / from_path.relative_to(from_folder)
            ).as_posix()
            self.logger.info(
                f"Uploading from {str(from_path)!r} to the bucket "
                f"{self.bucket_name!r} path {bucket_path!r}."
            )
            async_coros.append(
                run_sync_in_worker_thread(
                    client.upload_file,
                    Filename=str(from_path),
                    Bucket=self.bucket_name,
                    Key=bucket_path,
                    **upload_kwargs,
                )
            )
            num_uploaded += 1
        await asyncio.gather(*async_coros)

        if num_uploaded == 0:
            self.logger.warning(f"No files were uploaded from {str(from_folder)!r}.")
        else:
            self.logger.info(
                f"Uploaded {num_uploaded} files from {str(from_folder)!r} to "
                f"the bucket {self.bucket_name!r} path {bucket_path!r}"
            )

        return to_folder

Attributes

basepath: str property writable

The base path of the S3 bucket.

Returns:

Type Description
str

The base path of the S3 bucket.

bucket_folder: str pydantic-field

A default path to a folder within the S3 bucket to use for reading and writing objects.

bucket_name: str pydantic-field required

Name of your bucket.

credentials: Union[prefect_aws.credentials.AwsCredentials, prefect_aws.credentials.MinIOCredentials] pydantic-field

A block containing your credentials to AWS or MinIO.

Methods

__json_encoder__ special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

download_folder_to_path async

Downloads objects within a folder (excluding the folder itself) from the S3 bucket to a folder.

Parameters:

Name Type Description Default
from_folder str

The path to the folder to download from.

required
to_folder Union[str, pathlib.Path]

The path to download the folder to.

None
**download_kwargs Dict[str, Any]

Additional keyword arguments to pass to Client.download_file.

{}

Returns:

Type Description
Path

The absolute path that the folder was downloaded to.

Examples:

Download my_folder to a local folder named my_folder.

from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
s3_bucket.download_folder_to_path("my_folder", "my_folder")

Source code in prefect_aws/s3.py
@sync_compatible
async def download_folder_to_path(
    self,
    from_folder: str,
    to_folder: Optional[Union[str, Path]] = None,
    **download_kwargs: Dict[str, Any],
) -> Path:
    """
    Downloads objects *within* a folder (excluding the folder itself)
    from the S3 bucket to a folder.

    Args:
        from_folder: The path to the folder to download from.
        to_folder: The path to download the folder to.
        **download_kwargs: Additional keyword arguments to pass to
            `Client.download_file`.

    Returns:
        The absolute path that the folder was downloaded to.

    Examples:
        Download my_folder to a local folder named my_folder.
        ```python
        from prefect_aws.s3 import S3Bucket

        s3_bucket = S3Bucket.load("my-bucket")
        s3_bucket.download_folder_to_path("my_folder", "my_folder")
        ```
    """
    if to_folder is None:
        to_folder = ""
    to_folder = Path(to_folder).absolute()

    client = self.credentials.get_s3_client()
    objects = await self.list_objects(folder=from_folder)

    # do not call self._join_bucket_folder for filter
    # because it's built-in to that method already!
    # however, we still need to do it because we're using relative_to
    bucket_folder = self._join_bucket_folder(from_folder)

    async_coros = []
    for object in objects:
        bucket_path = Path(object["Key"]).relative_to(bucket_folder)
        # this skips the actual directory itself, e.g.
        # `my_folder/` will be skipped
        # `my_folder/notes.txt` will be downloaded
        if bucket_path.is_dir():
            continue
        to_path = to_folder / bucket_path
        to_path.parent.mkdir(parents=True, exist_ok=True)
        to_path = str(to_path)  # must be string
        self.logger.info(
            f"Downloading object from bucket {self.bucket_name!r} path "
            f"{bucket_path.as_posix()!r} to {to_path!r}."
        )
        async_coros.append(
            run_sync_in_worker_thread(
                client.download_file,
                Bucket=self.bucket_name,
                Key=object["Key"],
                Filename=to_path,
                **download_kwargs,
            )
        )
    await asyncio.gather(*async_coros)

    return Path(to_folder)
download_object_to_file_object async

Downloads an object from the object storage service to a file-like object, which can be a BytesIO object or a BufferedWriter.

Parameters:

Name Type Description Default
from_path str

The path to the object to download from; this gets prefixed with the bucket_folder.

required
to_file_object BinaryIO

The file-like object to download the object to.

required
**download_kwargs Dict[str, Any]

Additional keyword arguments to pass to Client.download_fileobj.

{}

Returns:

Type Description
BinaryIO

The file-like object that the object was downloaded to.

Examples:

Download my_folder/notes.txt object to a BytesIO object.

from io import BytesIO

from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
with BytesIO() as buf:
    s3_bucket.download_object_to_file_object("my_folder/notes.txt", buf)

Download my_folder/notes.txt object to a BufferedWriter.

from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
with open("notes.txt", "wb") as f:
    s3_bucket.download_object_to_file_object("my_folder/notes.txt", f)

Source code in prefect_aws/s3.py
@sync_compatible
async def download_object_to_file_object(
    self,
    from_path: str,
    to_file_object: BinaryIO,
    **download_kwargs: Dict[str, Any],
) -> BinaryIO:
    """
    Downloads an object from the object storage service to a file-like object,
    which can be a BytesIO object or a BufferedWriter.

    Args:
        from_path: The path to the object to download from; this gets prefixed
            with the bucket_folder.
        to_file_object: The file-like object to download the object to.
        **download_kwargs: Additional keyword arguments to pass to
            `Client.download_fileobj`.

    Returns:
        The file-like object that the object was downloaded to.

    Examples:
        Download my_folder/notes.txt object to a BytesIO object.
        ```python
        from io import BytesIO

        from prefect_aws.s3 import S3Bucket

        s3_bucket = S3Bucket.load("my-bucket")
        with BytesIO() as buf:
            s3_bucket.download_object_to_file_object("my_folder/notes.txt", buf)
        ```

        Download my_folder/notes.txt object to a BufferedWriter.
        ```python
        from prefect_aws.s3 import S3Bucket

        s3_bucket = S3Bucket.load("my-bucket")
        with open("notes.txt", "wb") as f:
            s3_bucket.download_object_to_file_object("my_folder/notes.txt", f)
        ```
    """
    client = self.credentials.get_s3_client()
    bucket_path = self._join_bucket_folder(from_path)

    self.logger.debug(
        f"Preparing to download object from bucket {self.bucket_name!r} "
        f"path {bucket_path!r} to file object."
    )
    await run_sync_in_worker_thread(
        client.download_fileobj,
        Bucket=self.bucket_name,
        Key=bucket_path,
        Fileobj=to_file_object,
        **download_kwargs,
    )
    self.logger.info(
        f"Downloaded object from bucket {self.bucket_name!r} path {bucket_path!r} "
        "to file object."
    )
    return to_file_object
download_object_to_path async

Downloads an object from the S3 bucket to a path.

Parameters:

Name Type Description Default
from_path str

The path to the object to download; this gets prefixed with the bucket_folder.

required
to_path Union[str, pathlib.Path]

The path to download the object to. If not provided, the object's name will be used.

required
**download_kwargs Dict[str, Any]

Additional keyword arguments to pass to Client.download_file.

{}

Returns:

Type Description
Path

The absolute path that the object was downloaded to.

Examples:

Download my_folder/notes.txt object to notes.txt.

from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
s3_bucket.download_object_to_path("my_folder/notes.txt", "notes.txt")

Source code in prefect_aws/s3.py
@sync_compatible
async def download_object_to_path(
    self,
    from_path: str,
    to_path: Optional[Union[str, Path]],
    **download_kwargs: Dict[str, Any],
) -> Path:
    """
    Downloads an object from the S3 bucket to a path.

    Args:
        from_path: The path to the object to download; this gets prefixed
            with the bucket_folder.
        to_path: The path to download the object to. If not provided, the
            object's name will be used.
        **download_kwargs: Additional keyword arguments to pass to
            `Client.download_file`.

    Returns:
        The absolute path that the object was downloaded to.

    Examples:
        Download my_folder/notes.txt object to notes.txt.
        ```python
        from prefect_aws.s3 import S3Bucket

        s3_bucket = S3Bucket.load("my-bucket")
        s3_bucket.download_object_to_path("my_folder/notes.txt", "notes.txt")
        ```
    """
    if to_path is None:
        to_path = Path(from_path).name

    # making path absolute, but converting back to str here
    # since !r looks nicer that way and filename arg expects str
    to_path = str(Path(to_path).absolute())
    bucket_path = self._join_bucket_folder(from_path)
    client = self.credentials.get_s3_client()

    self.logger.debug(
        f"Preparing to download object from bucket {self.bucket_name!r} "
        f"path {bucket_path!r} to {to_path!r}."
    )
    await run_sync_in_worker_thread(
        client.download_file,
        Bucket=self.bucket_name,
        Key=from_path,
        Filename=to_path,
        **download_kwargs,
    )
    self.logger.info(
        f"Downloaded object from bucket {self.bucket_name!r} path {bucket_path!r} "
        f"to {to_path!r}."
    )
    return Path(to_path)
get_directory async

Copies a folder from the configured S3 bucket to a local directory.

Defaults to copying the entire contents of the block's basepath to the current working directory.

Parameters:

Name Type Description Default
from_path Optional[str]

Path in S3 bucket to download from. Defaults to the block's configured basepath.

None
local_path Optional[str]

Local path to download S3 contents to. Defaults to the current working directory.

None
Source code in prefect_aws/s3.py
@sync_compatible
async def get_directory(
    self, from_path: Optional[str] = None, local_path: Optional[str] = None
) -> None:
    """
    Copies a folder from the configured S3 bucket to a local directory.

    Defaults to copying the entire contents of the block's basepath to the current
    working directory.

    Args:
        from_path: Path in S3 bucket to download from. Defaults to the block's
            configured basepath.
        local_path: Local path to download S3 contents to. Defaults to the current
            working directory.
    """
    bucket_folder = self.bucket_folder
    if from_path is None:
        from_path = str(bucket_folder) if bucket_folder else ""

    if local_path is None:
        local_path = str(Path(".").absolute())
    else:
        local_path = str(Path(local_path).expanduser())

    bucket = self._get_bucket_resource()
    for obj in bucket.objects.filter(Prefix=from_path):
        if obj.key[-1] == "/":
            # object is a folder and will be created if it contains any objects
            continue
        target = os.path.join(
            local_path,
            os.path.relpath(obj.key, from_path),
        )
        os.makedirs(os.path.dirname(target), exist_ok=True)
        bucket.download_file(obj.key, target)
list_objects async

Parameters:

Name Type Description Default
folder str

Folder to list objects from.

''
delimiter str

Character used to group keys of listed objects.

''
page_size Optional[int]

Number of objects to return in each request to the AWS API.

None
max_items Optional[int]

Maximum number of objects that to be returned by task.

None
jmespath_query Optional[str]

Query used to filter objects based on object attributes refer to the boto3 docs for more information on how to construct queries.

None

Returns:

Type Description
List[Dict[str, Any]]

List of objects and their metadata in the bucket.

Examples:

List objects under the base_folder.

from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
s3_bucket.list_objects("base_folder")

Source code in prefect_aws/s3.py
@sync_compatible
async def list_objects(
    self,
    folder: str = "",
    delimiter: str = "",
    page_size: Optional[int] = None,
    max_items: Optional[int] = None,
    jmespath_query: Optional[str] = None,
) -> List[Dict[str, Any]]:
    """
    Args:
        folder: Folder to list objects from.
        delimiter: Character used to group keys of listed objects.
        page_size: Number of objects to return in each request to the AWS API.
        max_items: Maximum number of objects that to be returned by task.
        jmespath_query: Query used to filter objects based on object attributes refer to
            the [boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/paginators.html#filtering-results-with-jmespath)
            for more information on how to construct queries.

    Returns:
        List of objects and their metadata in the bucket.

    Examples:
        List objects under the `base_folder`.
        ```python
        from prefect_aws.s3 import S3Bucket

        s3_bucket = S3Bucket.load("my-bucket")
        s3_bucket.list_objects("base_folder")
        ```
    """  # noqa: E501
    bucket_path = self._join_bucket_folder(folder)
    client = self.credentials.get_s3_client()
    paginator = client.get_paginator("list_objects_v2")
    page_iterator = paginator.paginate(
        Bucket=self.bucket_name,
        Prefix=bucket_path,
        Delimiter=delimiter,
        PaginationConfig={"PageSize": page_size, "MaxItems": max_items},
    )
    if jmespath_query:
        page_iterator = page_iterator.search(f"{jmespath_query} | {{Contents: @}}")

    self.logger.info(f"Listing objects in bucket {bucket_path}.")
    objects = await run_sync_in_worker_thread(
        self._list_objects_sync, page_iterator
    )
    return objects
put_directory async

Uploads a directory from a given local path to the configured S3 bucket in a given folder.

Defaults to uploading the entire contents the current working directory to the block's basepath.

Parameters:

Name Type Description Default
local_path Optional[str]

Path to local directory to upload from.

None
to_path Optional[str]

Path in S3 bucket to upload to. Defaults to block's configured basepath.

None
ignore_file Optional[str]

Path to file containing gitignore style expressions for filepaths to ignore.

None
Source code in prefect_aws/s3.py
@sync_compatible
async def put_directory(
    self,
    local_path: Optional[str] = None,
    to_path: Optional[str] = None,
    ignore_file: Optional[str] = None,
) -> int:
    """
    Uploads a directory from a given local path to the configured S3 bucket in a
    given folder.

    Defaults to uploading the entire contents the current working directory to the
    block's basepath.

    Args:
        local_path: Path to local directory to upload from.
        to_path: Path in S3 bucket to upload to. Defaults to block's configured
            basepath.
        ignore_file: Path to file containing gitignore style expressions for
            filepaths to ignore.

    """
    to_path = "" if to_path is None else to_path

    if local_path is None:
        local_path = "."

    included_files = None
    if ignore_file:
        with open(ignore_file, "r") as f:
            ignore_patterns = f.readlines()

        included_files = filter_files(local_path, ignore_patterns)

    uploaded_file_count = 0
    for local_file_path in Path(local_path).expanduser().rglob("*"):
        if (
            included_files is not None
            and str(local_file_path.relative_to(local_path)) not in included_files
        ):
            continue
        elif not local_file_path.is_dir():
            remote_file_path = Path(to_path) / local_file_path.relative_to(
                local_path
            )
            with open(local_file_path, "rb") as local_file:
                local_file_content = local_file.read()

            await self.write_path(
                remote_file_path.as_posix(), content=local_file_content
            )
            uploaded_file_count += 1

    return uploaded_file_count
read_path async

Read specified path from S3 and return contents. Provide the entire path to the key in S3.

Parameters:

Name Type Description Default
path str

Entire path to (and including) the key.

required

Examples:

Read "subfolder/file1" contents from an S3 bucket named "bucket":

from prefect_aws import AwsCredentials
from prefect_aws.s3 import S3Bucket

aws_creds = AwsCredentials(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)

s3_bucket_block = S3Bucket(
    bucket_name="bucket",
    aws_credentials=aws_creds,
    basepath="subfolder"
)

key_contents = s3_bucket_block.read_path(path="subfolder/file1")

Source code in prefect_aws/s3.py
@sync_compatible
async def read_path(self, path: str) -> bytes:
    """
    Read specified path from S3 and return contents. Provide the entire
    path to the key in S3.

    Args:
        path: Entire path to (and including) the key.

    Example:
        Read "subfolder/file1" contents from an S3 bucket named "bucket":
        ```python
        from prefect_aws import AwsCredentials
        from prefect_aws.s3 import S3Bucket

        aws_creds = AwsCredentials(
            aws_access_key_id=AWS_ACCESS_KEY_ID,
            aws_secret_access_key=AWS_SECRET_ACCESS_KEY
        )

        s3_bucket_block = S3Bucket(
            bucket_name="bucket",
            aws_credentials=aws_creds,
            basepath="subfolder"
        )

        key_contents = s3_bucket_block.read_path(path="subfolder/file1")
        ```
    """
    path = self._resolve_path(path)

    return await run_sync_in_worker_thread(self._read_sync, path)
stream_from async

Streams an object from another bucket to this bucket.

Parameters:

Name Type Description Default
bucket S3Bucket

The bucket to stream from.

required
from_path str

The path of the object to stream.

required
to_path Optional[str]

The path to stream the object to. Defaults to the object's name.

None
**upload_kwargs Dict[str, Any]

Additional keyword arguments to pass to Client.upload_fileobj.

{}

Returns:

Type Description
str

The path that the object was uploaded to.

Examples:

Stream notes.txt from your-bucket/notes.txt to my-bucket/landed/notes.txt.

from prefect_aws.s3 import S3Bucket

your_s3_bucket = S3Bucket.load("your-bucket")
my_s3_bucket = S3Bucket.load("my-bucket")

my_s3_bucket.stream_from(
    your_s3_bucket,
    "notes.txt",
    to_path="landed/notes.txt"
)
Source code in prefect_aws/s3.py
@sync_compatible
async def stream_from(
    self,
    bucket: "S3Bucket",
    from_path: str,
    to_path: Optional[str] = None,
    **upload_kwargs: Dict[str, Any],
) -> str:
    """Streams an object from another bucket to this bucket.

    Args:
        bucket: The bucket to stream from.
        from_path: The path of the object to stream.
        to_path: The path to stream the object to. Defaults to the object's name.
        **upload_kwargs: Additional keyword arguments to pass to
            `Client.upload_fileobj`.

    Returns:
        The path that the object was uploaded to.

    Examples:
        Stream notes.txt from your-bucket/notes.txt to my-bucket/landed/notes.txt.

        ```python
        from prefect_aws.s3 import S3Bucket

        your_s3_bucket = S3Bucket.load("your-bucket")
        my_s3_bucket = S3Bucket.load("my-bucket")

        my_s3_bucket.stream_from(
            your_s3_bucket,
            "notes.txt",
            to_path="landed/notes.txt"
        )
        ```

    """
    if to_path is None:
        to_path = Path(from_path).name

    # Get the source object's StreamingBody
    from_path: str = bucket._join_bucket_folder(from_path)
    from_client = bucket.credentials.get_s3_client()
    obj = await run_sync_in_worker_thread(
        from_client.get_object, Bucket=bucket.bucket_name, Key=from_path
    )
    body: StreamingBody = obj["Body"]

    # Upload the StreamingBody to this bucket
    bucket_path = str(self._join_bucket_folder(to_path))
    to_client = self.credentials.get_s3_client()
    await run_sync_in_worker_thread(
        to_client.upload_fileobj,
        Fileobj=body,
        Bucket=self.bucket_name,
        Key=bucket_path,
        **upload_kwargs,
    )
    self.logger.info(
        f"Streamed s3://{bucket.bucket_name}/{from_path} to the bucket "
        f"{self.bucket_name!r} path {bucket_path!r}."
    )
    return bucket_path
upload_from_file_object async

Uploads an object to the S3 bucket from a file-like object, which can be a BytesIO object or a BufferedReader.

Parameters:

Name Type Description Default
from_file_object BinaryIO

The file-like object to upload from.

required
to_path str

The path to upload the object to.

required
**upload_kwargs Dict[str, Any]

Additional keyword arguments to pass to Client.upload_fileobj.

{}

Returns:

Type Description
str

The path that the object was uploaded to.

Examples:

Upload BytesIO object to my_folder/notes.txt.

from io import BytesIO

from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
with open("notes.txt", "rb") as f:
    s3_bucket.upload_from_file_object(f, "my_folder/notes.txt")

Upload BufferedReader object to my_folder/notes.txt.

from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
with open("notes.txt", "rb") as f:
    s3_bucket.upload_from_file_object(
        f, "my_folder/notes.txt"
    )

Source code in prefect_aws/s3.py
@sync_compatible
async def upload_from_file_object(
    self, from_file_object: BinaryIO, to_path: str, **upload_kwargs: Dict[str, Any]
) -> str:
    """
    Uploads an object to the S3 bucket from a file-like object,
    which can be a BytesIO object or a BufferedReader.

    Args:
        from_file_object: The file-like object to upload from.
        to_path: The path to upload the object to.
        **upload_kwargs: Additional keyword arguments to pass to
            `Client.upload_fileobj`.

    Returns:
        The path that the object was uploaded to.

    Examples:
        Upload BytesIO object to my_folder/notes.txt.
        ```python
        from io import BytesIO

        from prefect_aws.s3 import S3Bucket

        s3_bucket = S3Bucket.load("my-bucket")
        with open("notes.txt", "rb") as f:
            s3_bucket.upload_from_file_object(f, "my_folder/notes.txt")
        ```

        Upload BufferedReader object to my_folder/notes.txt.
        ```python
        from prefect_aws.s3 import S3Bucket

        s3_bucket = S3Bucket.load("my-bucket")
        with open("notes.txt", "rb") as f:
            s3_bucket.upload_from_file_object(
                f, "my_folder/notes.txt"
            )
        ```
    """
    bucket_path = str(self._join_bucket_folder(to_path))
    client = self.credentials.get_s3_client()
    await run_sync_in_worker_thread(
        client.upload_fileobj,
        Fileobj=from_file_object,
        Bucket=self.bucket_name,
        Key=bucket_path,
        **upload_kwargs,
    )
    self.logger.info(
        "Uploaded from file object to the bucket "
        f"{self.bucket_name!r} path {bucket_path!r}."
    )
    return bucket_path
upload_from_folder async

Uploads files within a folder (excluding the folder itself) to the object storage service folder.

Parameters:

Name Type Description Default
from_folder Union[str, pathlib.Path]

The path to the folder to upload from.

required
to_folder Optional[str]

The path to upload the folder to.

None
**upload_kwargs Dict[str, Any]

Additional keyword arguments to pass to Client.upload_fileobj.

{}

Returns:

Type Description
str

The path that the folder was uploaded to.

Examples:

Upload contents from my_folder to new_folder.

from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
s3_bucket.upload_from_folder("my_folder", "new_folder")

Source code in prefect_aws/s3.py
@sync_compatible
async def upload_from_folder(
    self,
    from_folder: Union[str, Path],
    to_folder: Optional[str] = None,
    **upload_kwargs: Dict[str, Any],
) -> str:
    """
    Uploads files *within* a folder (excluding the folder itself)
    to the object storage service folder.

    Args:
        from_folder: The path to the folder to upload from.
        to_folder: The path to upload the folder to.
        **upload_kwargs: Additional keyword arguments to pass to
            `Client.upload_fileobj`.

    Returns:
        The path that the folder was uploaded to.

    Examples:
        Upload contents from my_folder to new_folder.
        ```python
        from prefect_aws.s3 import S3Bucket

        s3_bucket = S3Bucket.load("my-bucket")
        s3_bucket.upload_from_folder("my_folder", "new_folder")
        ```
    """
    from_folder = Path(from_folder)
    bucket_folder = self._join_bucket_folder(to_folder or "")

    num_uploaded = 0
    client = self.credentials.get_s3_client()

    async_coros = []
    for from_path in from_folder.rglob("**/*"):
        # this skips the actual directory itself, e.g.
        # `my_folder/` will be skipped
        # `my_folder/notes.txt` will be uploaded
        if from_path.is_dir():
            continue
        bucket_path = (
            Path(bucket_folder) / from_path.relative_to(from_folder)
        ).as_posix()
        self.logger.info(
            f"Uploading from {str(from_path)!r} to the bucket "
            f"{self.bucket_name!r} path {bucket_path!r}."
        )
        async_coros.append(
            run_sync_in_worker_thread(
                client.upload_file,
                Filename=str(from_path),
                Bucket=self.bucket_name,
                Key=bucket_path,
                **upload_kwargs,
            )
        )
        num_uploaded += 1
    await asyncio.gather(*async_coros)

    if num_uploaded == 0:
        self.logger.warning(f"No files were uploaded from {str(from_folder)!r}.")
    else:
        self.logger.info(
            f"Uploaded {num_uploaded} files from {str(from_folder)!r} to "
            f"the bucket {self.bucket_name!r} path {bucket_path!r}"
        )

    return to_folder
upload_from_path async

Uploads an object from a path to the S3 bucket.

Parameters:

Name Type Description Default
from_path Union[str, pathlib.Path]

The path to the file to upload from.

required
to_path Optional[str]

The path to upload the file to.

None
**upload_kwargs Dict[str, Any]

Additional keyword arguments to pass to Client.upload.

{}

Returns:

Type Description
str

The path that the object was uploaded to.

Examples:

Upload notes.txt to my_folder/notes.txt.

from prefect_aws.s3 import S3Bucket

s3_bucket = S3Bucket.load("my-bucket")
s3_bucket.upload_from_path("notes.txt", "my_folder/notes.txt")

Source code in prefect_aws/s3.py
@sync_compatible
async def upload_from_path(
    self,
    from_path: Union[str, Path],
    to_path: Optional[str] = None,
    **upload_kwargs: Dict[str, Any],
) -> str:
    """
    Uploads an object from a path to the S3 bucket.

    Args:
        from_path: The path to the file to upload from.
        to_path: The path to upload the file to.
        **upload_kwargs: Additional keyword arguments to pass to `Client.upload`.

    Returns:
        The path that the object was uploaded to.

    Examples:
        Upload notes.txt to my_folder/notes.txt.
        ```python
        from prefect_aws.s3 import S3Bucket

        s3_bucket = S3Bucket.load("my-bucket")
        s3_bucket.upload_from_path("notes.txt", "my_folder/notes.txt")
        ```
    """
    from_path = str(Path(from_path).absolute())
    if to_path is None:
        to_path = Path(from_path).name

    bucket_path = str(self._join_bucket_folder(to_path))
    client = self.credentials.get_s3_client()

    await run_sync_in_worker_thread(
        client.upload_file,
        Filename=from_path,
        Bucket=self.bucket_name,
        Key=bucket_path,
        **upload_kwargs,
    )
    self.logger.info(
        f"Uploaded from {from_path!r} to the bucket "
        f"{self.bucket_name!r} path {bucket_path!r}."
    )
    return bucket_path
write_path async

Writes to an S3 bucket.

Parameters:

Name Type Description Default
path str

The key name. Each object in your bucket has a unique key (or key name).

required
content bytes

What you are uploading to S3.

required

Examples:

Write data to the path dogs/small_dogs/havanese in an S3 Bucket:

from prefect_aws import MinioCredentials
from prefect_aws.s3 import S3Bucket

minio_creds = MinIOCredentials(
    minio_root_user = "minioadmin",
    minio_root_password = "minioadmin",
)

s3_bucket_block = S3Bucket(
    bucket_name="bucket",
    minio_credentials=minio_creds,
    basepath="dogs/smalldogs",
    endpoint_url="http://localhost:9000",
)
s3_havanese_path = s3_bucket_block.write_path(path="havanese", content=data)

Source code in prefect_aws/s3.py
@sync_compatible
async def write_path(self, path: str, content: bytes) -> str:
    """
    Writes to an S3 bucket.

    Args:

        path: The key name. Each object in your bucket has a unique
            key (or key name).
        content: What you are uploading to S3.

    Example:

        Write data to the path `dogs/small_dogs/havanese` in an S3 Bucket:
        ```python
        from prefect_aws import MinioCredentials
        from prefect_aws.s3 import S3Bucket

        minio_creds = MinIOCredentials(
            minio_root_user = "minioadmin",
            minio_root_password = "minioadmin",
        )

        s3_bucket_block = S3Bucket(
            bucket_name="bucket",
            minio_credentials=minio_creds,
            basepath="dogs/smalldogs",
            endpoint_url="http://localhost:9000",
        )
        s3_havanese_path = s3_bucket_block.write_path(path="havanese", content=data)
        ```
    """

    path = self._resolve_path(path)

    await run_sync_in_worker_thread(self._write_sync, path, content)

    return path

Functions

s3_download async

Downloads an object with a given key from a given S3 bucket.

Parameters:

Name Type Description Default
bucket str

Name of bucket to download object from. Required if a default value was not supplied when creating the task.

required
key str

Key of object to download. Required if a default value was not supplied when creating the task.

required
aws_credentials AwsCredentials

Credentials to use for authentication with AWS.

required
aws_client_parameters AwsClientParameters

Custom parameter for the boto3 client initialization.

AwsClientParameters(api_version=None, use_ssl=True, verify=True, verify_cert_path=None, endpoint_url=None, config=None)

Returns:

Type Description
bytes

A bytes representation of the downloaded object.

Examples:

Download a file from an S3 bucket:

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.s3 import s3_download


@flow
async def example_s3_download_flow():
    aws_credentials = AwsCredentials(
        aws_access_key_id="acccess_key_id",
        aws_secret_access_key="secret_access_key"
    )
    data = await s3_download(
        bucket="bucket",
        key="key",
        aws_credentials=aws_credentials,
    )

example_s3_download_flow()
Source code in prefect_aws/s3.py
@task
async def s3_download(
    bucket: str,
    key: str,
    aws_credentials: AwsCredentials,
    aws_client_parameters: AwsClientParameters = AwsClientParameters(),
) -> bytes:
    """
    Downloads an object with a given key from a given S3 bucket.

    Args:
        bucket: Name of bucket to download object from. Required if a default value was
            not supplied when creating the task.
        key: Key of object to download. Required if a default value was not supplied
            when creating the task.
        aws_credentials: Credentials to use for authentication with AWS.
        aws_client_parameters: Custom parameter for the boto3 client initialization.


    Returns:
        A `bytes` representation of the downloaded object.

    Example:
        Download a file from an S3 bucket:

        ```python
        from prefect import flow
        from prefect_aws import AwsCredentials
        from prefect_aws.s3 import s3_download


        @flow
        async def example_s3_download_flow():
            aws_credentials = AwsCredentials(
                aws_access_key_id="acccess_key_id",
                aws_secret_access_key="secret_access_key"
            )
            data = await s3_download(
                bucket="bucket",
                key="key",
                aws_credentials=aws_credentials,
            )

        example_s3_download_flow()
        ```
    """
    logger = get_run_logger()
    logger.info("Downloading object from bucket %s with key %s", bucket, key)

    s3_client = aws_credentials.get_boto3_session().client(
        "s3", **aws_client_parameters.get_params_override()
    )
    stream = io.BytesIO()
    await run_sync_in_worker_thread(
        s3_client.download_fileobj, Bucket=bucket, Key=key, Fileobj=stream
    )
    stream.seek(0)
    output = stream.read()

    return output

s3_list_objects async

Lists details of objects in a given S3 bucket.

Parameters:

Name Type Description Default
bucket str

Name of bucket to list items from. Required if a default value was not supplied when creating the task.

required
aws_credentials AwsCredentials

Credentials to use for authentication with AWS.

required
aws_client_parameters AwsClientParameters

Custom parameter for the boto3 client initialization..

AwsClientParameters(api_version=None, use_ssl=True, verify=True, verify_cert_path=None, endpoint_url=None, config=None)
prefix str

Used to filter objects with keys starting with the specified prefix.

''
delimiter str

Character used to group keys of listed objects.

''
page_size Optional[int]

Number of objects to return in each request to the AWS API.

None
max_items Optional[int]

Maximum number of objects that to be returned by task.

None
jmespath_query Optional[str]

Query used to filter objects based on object attributes refer to the boto3 docs for more information on how to construct queries.

None

Returns:

Type Description
List[Dict[str, Any]]

A list of dictionaries containing information about the objects retrieved. Refer to the boto3 docs for an example response.

Examples:

List all objects in a bucket:

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.s3 import s3_list_objects


@flow
async def example_s3_list_objects_flow():
    aws_credentials = AwsCredentials(
        aws_access_key_id="acccess_key_id",
        aws_secret_access_key="secret_access_key"
    )
    objects = await s3_list_objects(
        bucket="data_bucket",
        aws_credentials=aws_credentials
    )

example_s3_list_objects_flow()
Source code in prefect_aws/s3.py
@task
async def s3_list_objects(
    bucket: str,
    aws_credentials: AwsCredentials,
    aws_client_parameters: AwsClientParameters = AwsClientParameters(),
    prefix: str = "",
    delimiter: str = "",
    page_size: Optional[int] = None,
    max_items: Optional[int] = None,
    jmespath_query: Optional[str] = None,
) -> List[Dict[str, Any]]:
    """
    Lists details of objects in a given S3 bucket.

    Args:
        bucket: Name of bucket to list items from. Required if a default value was not
            supplied when creating the task.
        aws_credentials: Credentials to use for authentication with AWS.
        aws_client_parameters: Custom parameter for the boto3 client initialization..
        prefix: Used to filter objects with keys starting with the specified prefix.
        delimiter: Character used to group keys of listed objects.
        page_size: Number of objects to return in each request to the AWS API.
        max_items: Maximum number of objects that to be returned by task.
        jmespath_query: Query used to filter objects based on object attributes refer to
            the [boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/paginators.html#filtering-results-with-jmespath)
            for more information on how to construct queries.

    Returns:
        A list of dictionaries containing information about the objects retrieved. Refer
            to the boto3 docs for an example response.

    Example:
        List all objects in a bucket:

        ```python
        from prefect import flow
        from prefect_aws import AwsCredentials
        from prefect_aws.s3 import s3_list_objects


        @flow
        async def example_s3_list_objects_flow():
            aws_credentials = AwsCredentials(
                aws_access_key_id="acccess_key_id",
                aws_secret_access_key="secret_access_key"
            )
            objects = await s3_list_objects(
                bucket="data_bucket",
                aws_credentials=aws_credentials
            )

        example_s3_list_objects_flow()
        ```
    """  # noqa E501
    logger = get_run_logger()
    logger.info("Listing objects in bucket %s with prefix %s", bucket, prefix)

    s3_client = aws_credentials.get_boto3_session().client(
        "s3", **aws_client_parameters.get_params_override()
    )
    paginator = s3_client.get_paginator("list_objects_v2")
    page_iterator = paginator.paginate(
        Bucket=bucket,
        Prefix=prefix,
        Delimiter=delimiter,
        PaginationConfig={"PageSize": page_size, "MaxItems": max_items},
    )
    if jmespath_query:
        page_iterator = page_iterator.search(f"{jmespath_query} | {{Contents: @}}")

    return await run_sync_in_worker_thread(_list_objects_sync, page_iterator)

s3_upload async

Uploads data to an S3 bucket.

Parameters:

Name Type Description Default
data bytes

Bytes representation of data to upload to S3.

required
bucket str

Name of bucket to upload data to. Required if a default value was not supplied when creating the task.

required
aws_credentials AwsCredentials

Credentials to use for authentication with AWS.

required
aws_client_parameters AwsClientParameters

Custom parameter for the boto3 client initialization..

AwsClientParameters(api_version=None, use_ssl=True, verify=True, verify_cert_path=None, endpoint_url=None, config=None)
key Optional[str]

Key of object to download. Defaults to a UUID string.

None

Returns:

Type Description
str

The key of the uploaded object

Examples:

Read and upload a file to an S3 bucket:

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.s3 import s3_upload


@flow
async def example_s3_upload_flow():
    aws_credentials = AwsCredentials(
        aws_access_key_id="acccess_key_id",
        aws_secret_access_key="secret_access_key"
    )
    with open("data.csv", "rb") as file:
        key = await s3_upload(
            bucket="bucket",
            key="data.csv",
            data=file.read(),
            aws_credentials=aws_credentials,
        )

example_s3_upload_flow()
Source code in prefect_aws/s3.py
@task
async def s3_upload(
    data: bytes,
    bucket: str,
    aws_credentials: AwsCredentials,
    aws_client_parameters: AwsClientParameters = AwsClientParameters(),
    key: Optional[str] = None,
) -> str:
    """
    Uploads data to an S3 bucket.

    Args:
        data: Bytes representation of data to upload to S3.
        bucket: Name of bucket to upload data to. Required if a default value was not
            supplied when creating the task.
        aws_credentials: Credentials to use for authentication with AWS.
        aws_client_parameters: Custom parameter for the boto3 client initialization..
        key: Key of object to download. Defaults to a UUID string.

    Returns:
        The key of the uploaded object

    Example:
        Read and upload a file to an S3 bucket:

        ```python
        from prefect import flow
        from prefect_aws import AwsCredentials
        from prefect_aws.s3 import s3_upload


        @flow
        async def example_s3_upload_flow():
            aws_credentials = AwsCredentials(
                aws_access_key_id="acccess_key_id",
                aws_secret_access_key="secret_access_key"
            )
            with open("data.csv", "rb") as file:
                key = await s3_upload(
                    bucket="bucket",
                    key="data.csv",
                    data=file.read(),
                    aws_credentials=aws_credentials,
                )

        example_s3_upload_flow()
        ```
    """
    logger = get_run_logger()

    key = key or str(uuid.uuid4())

    logger.info("Uploading object to bucket %s with key %s", bucket, key)

    s3_client = aws_credentials.get_boto3_session().client(
        "s3", **aws_client_parameters.get_params_override()
    )
    stream = io.BytesIO(data)
    await run_sync_in_worker_thread(
        s3_client.upload_fileobj, stream, Bucket=bucket, Key=key
    )

    return key