Skip to content

prefect_aws.workers.ecs_worker

Prefect worker for executing flow runs as ECS tasks.

Get started by creating a work pool:

$ prefect work-pool create --type ecs my-ecs-pool

Then, you can start a worker for the pool:

$ prefect worker start --pool my-ecs-pool

It's common to deploy the worker as an ECS task as well. However, you can run the worker locally to get started.

The worker may work without any additional configuration, but it is dependent on your specific AWS setup and we'd recommend opening the work pool editor in the UI to see the available options.

By default, the worker will register a task definition for each flow run and run a task in your default ECS cluster using AWS Fargate. Fargate requires tasks to configure subnets, which we will infer from your default VPC. If you do not have a default VPC, you must provide a VPC ID or manually setup the network configuration for your tasks.

Note, the worker caches task definitions for each deployment to avoid excessive registration. The worker will check that the cached task definition is compatible with your configuration before using it.

The launch type option can be used to run your tasks in different modes. For example, FARGATE_SPOT can be used to use spot instances for your Fargate tasks or EC2 can be used to run your tasks on a cluster backed by EC2 instances.

Generally, it is very useful to enable CloudWatch logging for your ECS tasks; this can help you debug task failures. To enable CloudWatch logging, you must provide an execution role ARN with permissions to create and write to log streams. See the configure_cloudwatch_logs field documentation for details.

The worker can be configured to use an existing task definition by setting the task definition arn variable or by providing a "taskDefinition" in the task run request. When a task definition is provided, the worker will never create a new task definition which may result in variables that are templated into the task definition payload being ignored.

Classes

ECSIdentifier (tuple)

The identifier for a running ECS task.

Source code in prefect_aws/workers/ecs_worker.py
class ECSIdentifier(NamedTuple):
    """
    The identifier for a running ECS task.
    """

    cluster: str
    task_arn: str

Methods

__getnewargs__ special

Return self as a plain tuple. Used by copy and pickle.

Source code in prefect_aws/workers/ecs_worker.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)
__new__ special staticmethod

Create new instance of ECSIdentifier(cluster, task_arn)

__repr__ special

Return a nicely formatted representation string

Source code in prefect_aws/workers/ecs_worker.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self

ECSJobConfiguration (BaseJobConfiguration) pydantic-model

Job configuration for an ECS worker.

Source code in prefect_aws/workers/ecs_worker.py
class ECSJobConfiguration(BaseJobConfiguration):
    """
    Job configuration for an ECS worker.
    """

    aws_credentials: Optional[AwsCredentials] = Field(default_factory=AwsCredentials)
    task_definition: Optional[Dict[str, Any]] = Field(
        template=_default_task_definition_template()
    )
    task_run_request: Dict[str, Any] = Field(
        template=_default_task_run_request_template()
    )
    configure_cloudwatch_logs: Optional[bool] = Field(default=None)
    cloudwatch_logs_options: Dict[str, str] = Field(default_factory=dict)
    network_configuration: Dict[str, Any] = Field(default_factory=dict)
    stream_output: Optional[bool] = Field(default=None)
    task_start_timeout_seconds: int = Field(default=300)
    task_watch_poll_interval: float = Field(default=5.0)
    auto_deregister_task_definition: bool = Field(default=False)
    vpc_id: Optional[str] = Field(default=None)
    container_name: Optional[str] = Field(default=None)
    cluster: Optional[str] = Field(default=None)
    match_latest_revision_in_family: bool = Field(default=False)

    @root_validator
    def task_run_request_requires_arn_if_no_task_definition_given(cls, values) -> dict:
        """
        If no task definition is provided, a task definition ARN must be present on the
        task run request.
        """
        if not values.get("task_run_request", {}).get(
            "taskDefinition"
        ) and not values.get("task_definition"):
            raise ValueError(
                "A task definition must be provided if a task definition ARN is not "
                "present on the task run request."
            )
        return values

    @root_validator
    def container_name_default_from_task_definition(cls, values) -> dict:
        """
        Infers the container name from the task definition if not provided.
        """
        if values.get("container_name") is None:
            values["container_name"] = _container_name_from_task_definition(
                values.get("task_definition")
            )

            # We may not have a name here still; for example if someone is using a task
            # definition arn. In that case, we'll perform similar logic later to find
            # the name to treat as the "orchestration" container.

        return values

    @root_validator(pre=True)
    def set_default_configure_cloudwatch_logs(cls, values: dict) -> dict:
        """
        Streaming output generally requires CloudWatch logs to be configured.

        To avoid entangled arguments in the simple case, `configure_cloudwatch_logs`
        defaults to matching the value of `stream_output`.
        """
        configure_cloudwatch_logs = values.get("configure_cloudwatch_logs")
        if configure_cloudwatch_logs is None:
            values["configure_cloudwatch_logs"] = values.get("stream_output")
        return values

    @root_validator
    def configure_cloudwatch_logs_requires_execution_role_arn(
        cls, values: dict
    ) -> dict:
        """
        Enforces that an execution role arn is provided (or could be provided by a
        runtime task definition) when configuring logging.
        """
        if (
            values.get("configure_cloudwatch_logs")
            and not values.get("execution_role_arn")
            # TODO: Does not match
            # Do not raise if they've linked to another task definition or provided
            # it without using our shortcuts
            and not values.get("task_run_request", {}).get("taskDefinition")
            and not (values.get("task_definition") or {}).get("executionRoleArn")
        ):
            raise ValueError(
                "An `execution_role_arn` must be provided to use "
                "`configure_cloudwatch_logs` or `stream_logs`."
            )
        return values

    @root_validator
    def cloudwatch_logs_options_requires_configure_cloudwatch_logs(
        cls, values: dict
    ) -> dict:
        """
        Enforces that an execution role arn is provided (or could be provided by a
        runtime task definition) when configuring logging.
        """
        if values.get("cloudwatch_logs_options") and not values.get(
            "configure_cloudwatch_logs"
        ):
            raise ValueError(
                "`configure_cloudwatch_log` must be enabled to use "
                "`cloudwatch_logs_options`."
            )
        return values

    @root_validator
    def network_configuration_requires_vpc_id(cls, values: dict) -> dict:
        """
        Enforces a `vpc_id` is provided when custom network configuration mode is
        enabled for network settings.
        """
        if values.get("network_configuration") and not values.get("vpc_id"):
            raise ValueError(
                "You must provide a `vpc_id` to enable custom `network_configuration`."
            )
        return values

Methods

cloudwatch_logs_options_requires_configure_cloudwatch_logs classmethod

Enforces that an execution role arn is provided (or could be provided by a runtime task definition) when configuring logging.

Source code in prefect_aws/workers/ecs_worker.py
@root_validator
def cloudwatch_logs_options_requires_configure_cloudwatch_logs(
    cls, values: dict
) -> dict:
    """
    Enforces that an execution role arn is provided (or could be provided by a
    runtime task definition) when configuring logging.
    """
    if values.get("cloudwatch_logs_options") and not values.get(
        "configure_cloudwatch_logs"
    ):
        raise ValueError(
            "`configure_cloudwatch_log` must be enabled to use "
            "`cloudwatch_logs_options`."
        )
    return values
configure_cloudwatch_logs_requires_execution_role_arn classmethod

Enforces that an execution role arn is provided (or could be provided by a runtime task definition) when configuring logging.

Source code in prefect_aws/workers/ecs_worker.py
@root_validator
def configure_cloudwatch_logs_requires_execution_role_arn(
    cls, values: dict
) -> dict:
    """
    Enforces that an execution role arn is provided (or could be provided by a
    runtime task definition) when configuring logging.
    """
    if (
        values.get("configure_cloudwatch_logs")
        and not values.get("execution_role_arn")
        # TODO: Does not match
        # Do not raise if they've linked to another task definition or provided
        # it without using our shortcuts
        and not values.get("task_run_request", {}).get("taskDefinition")
        and not (values.get("task_definition") or {}).get("executionRoleArn")
    ):
        raise ValueError(
            "An `execution_role_arn` must be provided to use "
            "`configure_cloudwatch_logs` or `stream_logs`."
        )
    return values
container_name_default_from_task_definition classmethod

Infers the container name from the task definition if not provided.

Source code in prefect_aws/workers/ecs_worker.py
@root_validator
def container_name_default_from_task_definition(cls, values) -> dict:
    """
    Infers the container name from the task definition if not provided.
    """
    if values.get("container_name") is None:
        values["container_name"] = _container_name_from_task_definition(
            values.get("task_definition")
        )

        # We may not have a name here still; for example if someone is using a task
        # definition arn. In that case, we'll perform similar logic later to find
        # the name to treat as the "orchestration" container.

    return values
network_configuration_requires_vpc_id classmethod

Enforces a vpc_id is provided when custom network configuration mode is enabled for network settings.

Source code in prefect_aws/workers/ecs_worker.py
@root_validator
def network_configuration_requires_vpc_id(cls, values: dict) -> dict:
    """
    Enforces a `vpc_id` is provided when custom network configuration mode is
    enabled for network settings.
    """
    if values.get("network_configuration") and not values.get("vpc_id"):
        raise ValueError(
            "You must provide a `vpc_id` to enable custom `network_configuration`."
        )
    return values
set_default_configure_cloudwatch_logs classmethod

Streaming output generally requires CloudWatch logs to be configured.

To avoid entangled arguments in the simple case, configure_cloudwatch_logs defaults to matching the value of stream_output.

Source code in prefect_aws/workers/ecs_worker.py
@root_validator(pre=True)
def set_default_configure_cloudwatch_logs(cls, values: dict) -> dict:
    """
    Streaming output generally requires CloudWatch logs to be configured.

    To avoid entangled arguments in the simple case, `configure_cloudwatch_logs`
    defaults to matching the value of `stream_output`.
    """
    configure_cloudwatch_logs = values.get("configure_cloudwatch_logs")
    if configure_cloudwatch_logs is None:
        values["configure_cloudwatch_logs"] = values.get("stream_output")
    return values
task_run_request_requires_arn_if_no_task_definition_given classmethod

If no task definition is provided, a task definition ARN must be present on the task run request.

Source code in prefect_aws/workers/ecs_worker.py
@root_validator
def task_run_request_requires_arn_if_no_task_definition_given(cls, values) -> dict:
    """
    If no task definition is provided, a task definition ARN must be present on the
    task run request.
    """
    if not values.get("task_run_request", {}).get(
        "taskDefinition"
    ) and not values.get("task_definition"):
        raise ValueError(
            "A task definition must be provided if a task definition ARN is not "
            "present on the task run request."
        )
    return values

ECSVariables (BaseVariables) pydantic-model

Variables for templating an ECS job.

Source code in prefect_aws/workers/ecs_worker.py
class ECSVariables(BaseVariables):
    """
    Variables for templating an ECS job.
    """

    task_definition_arn: Optional[str] = Field(
        default=None,
        description=(
            "An identifier for an existing task definition to use. If set, options that"
            " require changes to the task definition will be ignored. All contents of "
            "the task definition in the job configuration will be ignored."
        ),
    )
    env: Dict[str, Optional[str]] = Field(
        title="Environment Variables",
        default_factory=dict,
        description=(
            "Environment variables to provide to the task run. These variables are set "
            "on the Prefect container at task runtime. These will not be set on the "
            "task definition."
        ),
    )
    aws_credentials: AwsCredentials = Field(
        title="AWS Credentials",
        default_factory=AwsCredentials,
        description=(
            "The AWS credentials to use to connect to ECS. If not provided, credentials"
            " will be inferred from the local environment following AWS's boto client's"
            " rules."
        ),
    )
    cluster: Optional[str] = Field(
        default=None,
        description=(
            "The ECS cluster to run the task in. An ARN or name may be provided. If "
            "not provided, the default cluster will be used."
        ),
    )
    family: Optional[str] = Field(
        default=None,
        description=(
            "A family for the task definition. If not provided, it will be inferred "
            "from the task definition. If the task definition does not have a family, "
            "the name will be generated. When flow and deployment metadata is "
            "available, the generated name will include their names. Values for this "
            "field will be slugified to match AWS character requirements."
        ),
    )
    launch_type: Optional[Literal["FARGATE", "EC2", "EXTERNAL", "FARGATE_SPOT"]] = (
        Field(
            default=ECS_DEFAULT_LAUNCH_TYPE,
            description=(
                "The type of ECS task run infrastructure that should be used. Note that"
                " 'FARGATE_SPOT' is not a formal ECS launch type, but we will configure"
                " the proper capacity provider strategy if set here."
            ),
        )
    )
    image: Optional[str] = Field(
        default=None,
        description=(
            "The image to use for the Prefect container in the task. If this value is "
            "not null, it will override the value in the task definition. This value "
            "defaults to a Prefect base image matching your local versions."
        ),
    )
    cpu: int = Field(
        title="CPU",
        default=None,
        description=(
            "The amount of CPU to provide to the ECS task. Valid amounts are "
            "specified in the AWS documentation. If not provided, a default value of "
            f"{ECS_DEFAULT_CPU} will be used unless present on the task definition."
        ),
    )
    memory: int = Field(
        default=None,
        description=(
            "The amount of memory to provide to the ECS task. Valid amounts are "
            "specified in the AWS documentation. If not provided, a default value of "
            f"{ECS_DEFAULT_MEMORY} will be used unless present on the task definition."
        ),
    )
    container_name: str = Field(
        default=None,
        description=(
            "The name of the container flow run orchestration will occur in. If not "
            f"specified, a default value of {ECS_DEFAULT_CONTAINER_NAME} will be used "
            "and if that is not found in the task definition the first container will "
            "be used."
        ),
    )
    task_role_arn: str = Field(
        title="Task Role ARN",
        default=None,
        description=(
            "A role to attach to the task run. This controls the permissions of the "
            "task while it is running."
        ),
    )
    execution_role_arn: str = Field(
        title="Execution Role ARN",
        default=None,
        description=(
            "An execution role to use for the task. This controls the permissions of "
            "the task when it is launching. If this value is not null, it will "
            "override the value in the task definition. An execution role must be "
            "provided to capture logs from the container."
        ),
    )
    vpc_id: Optional[str] = Field(
        title="VPC ID",
        default=None,
        description=(
            "The AWS VPC to link the task run to. This is only applicable when using "
            "the 'awsvpc' network mode for your task. FARGATE tasks require this "
            "network  mode, but for EC2 tasks the default network mode is 'bridge'. "
            "If using the 'awsvpc' network mode and this field is null, your default "
            "VPC will be used. If no default VPC can be found, the task run will fail."
        ),
    )
    configure_cloudwatch_logs: bool = Field(
        default=None,
        description=(
            "If enabled, the Prefect container will be configured to send its output "
            "to the AWS CloudWatch logs service. This functionality requires an "
            "execution role with logs:CreateLogStream, logs:CreateLogGroup, and "
            "logs:PutLogEvents permissions. The default for this field is `False` "
            "unless `stream_output` is set."
        ),
    )
    cloudwatch_logs_options: Dict[str, str] = Field(
        default_factory=dict,
        description=(
            "When `configure_cloudwatch_logs` is enabled, this setting may be used to"
            " pass additional options to the CloudWatch logs configuration or override"
            " the default options. See the [AWS"
            " documentation](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/using_awslogs.html#create_awslogs_logdriver_options)"  # noqa
            " for available options. "
        ),
    )

    network_configuration: Dict[str, Any] = Field(
        default_factory=dict,
        description=(
            "When `network_configuration` is supplied it will override ECS Worker's"
            "awsvpcConfiguration that defined in the ECS task executing your workload. "
            "See the [AWS documentation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-ecs-service-awsvpcconfiguration.html)"  # noqa
            " for available options."
        ),
    )

    stream_output: bool = Field(
        default=None,
        description=(
            "If enabled, logs will be streamed from the Prefect container to the local "
            "console. Unless you have configured AWS CloudWatch logs manually on your "
            "task definition, this requires the same prerequisites outlined in "
            "`configure_cloudwatch_logs`."
        ),
    )
    task_start_timeout_seconds: int = Field(
        default=300,
        description=(
            "The amount of time to watch for the start of the ECS task "
            "before marking it as failed. The task must enter a RUNNING state to be "
            "considered started."
        ),
    )
    task_watch_poll_interval: float = Field(
        default=5.0,
        description=(
            "The amount of time to wait between AWS API calls while monitoring the "
            "state of an ECS task."
        ),
    )
    auto_deregister_task_definition: bool = Field(
        default=False,
        description=(
            "If enabled, any task definitions that are created by this block will be "
            "deregistered. Existing task definitions linked by ARN will never be "
            "deregistered. Deregistering a task definition does not remove it from "
            "your AWS account, instead it will be marked as INACTIVE."
        ),
    )
    match_latest_revision_in_family: bool = Field(
        default=False,
        description=(
            "If enabled, the most recent active revision in the task definition "
            "family will be compared against the desired ECS task configuration. "
            "If they are equal, the existing task definition will be used instead "
            "of registering a new one. If no family is specified the default family "
            f'"{ECS_DEFAULT_FAMILY}" will be used.'
        ),
    )

Attributes

auto_deregister_task_definition: bool pydantic-field

If enabled, any task definitions that are created by this block will be deregistered. Existing task definitions linked by ARN will never be deregistered. Deregistering a task definition does not remove it from your AWS account, instead it will be marked as INACTIVE.

aws_credentials: AwsCredentials pydantic-field

The AWS credentials to use to connect to ECS. If not provided, credentials will be inferred from the local environment following AWS's boto client's rules.

cloudwatch_logs_options: Dict[str, str] pydantic-field

When configure_cloudwatch_logs is enabled, this setting may be used to pass additional options to the CloudWatch logs configuration or override the default options. See the AWS documentation for available options.

cluster: str pydantic-field

The ECS cluster to run the task in. An ARN or name may be provided. If not provided, the default cluster will be used.

configure_cloudwatch_logs: bool pydantic-field

If enabled, the Prefect container will be configured to send its output to the AWS CloudWatch logs service. This functionality requires an execution role with logs:CreateLogStream, logs:CreateLogGroup, and logs:PutLogEvents permissions. The default for this field is False unless stream_output is set.

container_name: str pydantic-field

The name of the container flow run orchestration will occur in. If not specified, a default value of prefect will be used and if that is not found in the task definition the first container will be used.

cpu: int pydantic-field

The amount of CPU to provide to the ECS task. Valid amounts are specified in the AWS documentation. If not provided, a default value of 1024 will be used unless present on the task definition.

execution_role_arn: str pydantic-field

An execution role to use for the task. This controls the permissions of the task when it is launching. If this value is not null, it will override the value in the task definition. An execution role must be provided to capture logs from the container.

family: str pydantic-field

A family for the task definition. If not provided, it will be inferred from the task definition. If the task definition does not have a family, the name will be generated. When flow and deployment metadata is available, the generated name will include their names. Values for this field will be slugified to match AWS character requirements.

image: str pydantic-field

The image to use for the Prefect container in the task. If this value is not null, it will override the value in the task definition. This value defaults to a Prefect base image matching your local versions.

launch_type: Literal['FARGATE', 'EC2', 'EXTERNAL', 'FARGATE_SPOT'] pydantic-field

The type of ECS task run infrastructure that should be used. Note that 'FARGATE_SPOT' is not a formal ECS launch type, but we will configure the proper capacity provider strategy if set here.

match_latest_revision_in_family: bool pydantic-field

If enabled, the most recent active revision in the task definition family will be compared against the desired ECS task configuration. If they are equal, the existing task definition will be used instead of registering a new one. If no family is specified the default family "prefect" will be used.

memory: int pydantic-field

The amount of memory to provide to the ECS task. Valid amounts are specified in the AWS documentation. If not provided, a default value of 2048 will be used unless present on the task definition.

network_configuration: Dict[str, Any] pydantic-field

When network_configuration is supplied it will override ECS Worker'sawsvpcConfiguration that defined in the ECS task executing your workload. See the AWS documentation for available options.

stream_output: bool pydantic-field

If enabled, logs will be streamed from the Prefect container to the local console. Unless you have configured AWS CloudWatch logs manually on your task definition, this requires the same prerequisites outlined in configure_cloudwatch_logs.

task_definition_arn: str pydantic-field

An identifier for an existing task definition to use. If set, options that require changes to the task definition will be ignored. All contents of the task definition in the job configuration will be ignored.

task_role_arn: str pydantic-field

A role to attach to the task run. This controls the permissions of the task while it is running.

task_start_timeout_seconds: int pydantic-field

The amount of time to watch for the start of the ECS task before marking it as failed. The task must enter a RUNNING state to be considered started.

task_watch_poll_interval: float pydantic-field

The amount of time to wait between AWS API calls while monitoring the state of an ECS task.

vpc_id: str pydantic-field

The AWS VPC to link the task run to. This is only applicable when using the 'awsvpc' network mode for your task. FARGATE tasks require this network mode, but for EC2 tasks the default network mode is 'bridge'. If using the 'awsvpc' network mode and this field is null, your default VPC will be used. If no default VPC can be found, the task run will fail.

ECSWorker (BaseWorker)

A Prefect worker to run flow runs as ECS tasks.

Source code in prefect_aws/workers/ecs_worker.py
class ECSWorker(BaseWorker):
    """
    A Prefect worker to run flow runs as ECS tasks.
    """

    type = "ecs"
    job_configuration = ECSJobConfiguration
    job_configuration_variables = ECSVariables
    _description = (
        "Execute flow runs within containers on AWS ECS. Works with EC2 "
        "and Fargate clusters. Requires an AWS account."
    )
    _display_name = "AWS Elastic Container Service"
    _documentation_url = "https://prefecthq.github.io/prefect-aws/ecs_worker/"
    _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/d74b16fe84ce626345adf235a47008fea2869a60-225x225.png"  # noqa

    async def run(
        self,
        flow_run: "FlowRun",
        configuration: ECSJobConfiguration,
        task_status: Optional[anyio.abc.TaskStatus] = None,
    ) -> BaseWorkerResult:
        """
        Runs a given flow run on the current worker.
        """
        ecs_client = await run_sync_in_worker_thread(
            self._get_client, configuration, "ecs"
        )

        logger = self.get_flow_run_logger(flow_run)

        (
            task_arn,
            cluster_arn,
            task_definition,
            is_new_task_definition,
        ) = await run_sync_in_worker_thread(
            self._create_task_and_wait_for_start,
            logger,
            ecs_client,
            configuration,
            flow_run,
        )

        # The task identifier is "{cluster}::{task}" where we use the configured cluster
        # if set to preserve matching by name rather than arn
        # Note "::" is used despite the Prefect standard being ":" because ARNs contain
        # single colons.
        identifier = (
            (configuration.cluster if configuration.cluster else cluster_arn)
            + "::"
            + task_arn
        )

        if task_status:
            task_status.started(identifier)

        status_code = await run_sync_in_worker_thread(
            self._watch_task_and_get_exit_code,
            logger,
            configuration,
            task_arn,
            cluster_arn,
            task_definition,
            is_new_task_definition and configuration.auto_deregister_task_definition,
            ecs_client,
        )

        return ECSWorkerResult(
            identifier=identifier,
            # If the container does not start the exit code can be null but we must
            # still report a status code. We use a -1 to indicate a special code.
            status_code=status_code if status_code is not None else -1,
        )

    def _get_client(
        self, configuration: ECSJobConfiguration, client_type: Union[str, ClientType]
    ) -> _ECSClient:
        """
        Get a boto3 client of client_type. Will use a cached client if one exists.
        """
        return configuration.aws_credentials.get_client(client_type)

    def _create_task_and_wait_for_start(
        self,
        logger: logging.Logger,
        ecs_client: _ECSClient,
        configuration: ECSJobConfiguration,
        flow_run: FlowRun,
    ) -> Tuple[str, str, dict, bool]:
        """
        Register the task definition, create the task run, and wait for it to start.

        Returns a tuple of
        - The task ARN
        - The task's cluster ARN
        - The task definition
        - A bool indicating if the task definition is newly registered
        """
        task_definition_arn = configuration.task_run_request.get("taskDefinition")
        new_task_definition_registered = False

        if not task_definition_arn:
            task_definition = self._prepare_task_definition(
                configuration, region=ecs_client.meta.region_name
            )
            (
                task_definition_arn,
                new_task_definition_registered,
            ) = self._get_or_register_task_definition(
                logger, ecs_client, configuration, flow_run, task_definition
            )
        else:
            task_definition = self._retrieve_task_definition(
                logger, ecs_client, task_definition_arn
            )
            if configuration.task_definition:
                logger.warning(
                    "Ignoring task definition in configuration since task definition"
                    " ARN is provided on the task run request."
                )

        self._validate_task_definition(task_definition, configuration)

        _TASK_DEFINITION_CACHE[flow_run.deployment_id] = task_definition_arn

        logger.info(f"Using ECS task definition {task_definition_arn!r}...")
        logger.debug(
            f"Task definition {json.dumps(task_definition, indent=2, default=str)}"
        )

        task_run_request = self._prepare_task_run_request(
            configuration,
            task_definition,
            task_definition_arn,
        )

        logger.info("Creating ECS task run...")
        logger.debug(
            "Task run request"
            f"{json.dumps(mask_api_key(task_run_request), indent=2, default=str)}"
        )

        try:
            task = self._create_task_run(ecs_client, task_run_request)
            task_arn = task["taskArn"]
            cluster_arn = task["clusterArn"]
        except Exception as exc:
            self._report_task_run_creation_failure(configuration, task_run_request, exc)
            raise

        logger.info("Waiting for ECS task run to start...")
        self._wait_for_task_start(
            logger,
            configuration,
            task_arn,
            cluster_arn,
            ecs_client,
            timeout=configuration.task_start_timeout_seconds,
        )

        return task_arn, cluster_arn, task_definition, new_task_definition_registered

    def _get_or_register_task_definition(
        self,
        logger: logging.Logger,
        ecs_client: _ECSClient,
        configuration: ECSJobConfiguration,
        flow_run: FlowRun,
        task_definition: dict,
    ) -> Tuple[str, bool]:
        """Get or register a task definition for the given flow run.

        Returns a tuple of the task definition ARN and a bool indicating if the task
        definition is newly registered.
        """

        cached_task_definition_arn = _TASK_DEFINITION_CACHE.get(flow_run.deployment_id)
        new_task_definition_registered = False

        if cached_task_definition_arn:
            try:
                cached_task_definition = self._retrieve_task_definition(
                    logger, ecs_client, cached_task_definition_arn
                )
                if not cached_task_definition[
                    "status"
                ] == "ACTIVE" or not self._task_definitions_equal(
                    task_definition, cached_task_definition
                ):
                    cached_task_definition_arn = None
            except Exception:
                cached_task_definition_arn = None

        if (
            not cached_task_definition_arn
            and configuration.match_latest_revision_in_family
        ):
            family_name = task_definition.get("family", ECS_DEFAULT_FAMILY)
            try:
                task_definition_from_family = self._retrieve_task_definition(
                    logger, ecs_client, family_name
                )
                if task_definition_from_family and self._task_definitions_equal(
                    task_definition, task_definition_from_family
                ):
                    cached_task_definition_arn = task_definition_from_family[
                        "taskDefinitionArn"
                    ]
            except Exception:
                cached_task_definition_arn = None

        if not cached_task_definition_arn:
            task_definition_arn = self._register_task_definition(
                logger, ecs_client, task_definition
            )
            new_task_definition_registered = True
        else:
            task_definition_arn = cached_task_definition_arn

        return task_definition_arn, new_task_definition_registered

    def _watch_task_and_get_exit_code(
        self,
        logger: logging.Logger,
        configuration: ECSJobConfiguration,
        task_arn: str,
        cluster_arn: str,
        task_definition: dict,
        deregister_task_definition: bool,
        ecs_client: _ECSClient,
    ) -> Optional[int]:
        """
        Wait for the task run to complete and retrieve the exit code of the Prefect
        container.
        """

        # Wait for completion and stream logs
        task = self._wait_for_task_finish(
            logger,
            configuration,
            task_arn,
            cluster_arn,
            task_definition,
            ecs_client,
        )

        if deregister_task_definition:
            ecs_client.deregister_task_definition(
                taskDefinition=task["taskDefinitionArn"]
            )

        container_name = (
            configuration.container_name
            or _container_name_from_task_definition(task_definition)
            or ECS_DEFAULT_CONTAINER_NAME
        )

        # Check the status code of the Prefect container
        container = _get_container(task["containers"], container_name)
        assert (
            container is not None
        ), f"'{container_name}' container missing from task: {task}"
        status_code = container.get("exitCode")
        self._report_container_status_code(logger, container_name, status_code)

        return status_code

    def _report_container_status_code(
        self, logger: logging.Logger, name: str, status_code: Optional[int]
    ) -> None:
        """
        Display a log for the given container status code.
        """
        if status_code is None:
            logger.error(
                f"Task exited without reporting an exit status for container {name!r}."
            )
        elif status_code == 0:
            logger.info(f"Container {name!r} exited successfully.")
        else:
            logger.warning(
                f"Container {name!r} exited with non-zero exit code {status_code}."
            )

    def _report_task_run_creation_failure(
        self, configuration: ECSJobConfiguration, task_run: dict, exc: Exception
    ) -> None:
        """
        Wrap common AWS task run creation failures with nicer user-facing messages.
        """
        # AWS generates exception types at runtime so they must be captured a bit
        # differently than normal.
        if "ClusterNotFoundException" in str(exc):
            cluster = task_run.get("cluster", "default")
            raise RuntimeError(
                f"Failed to run ECS task, cluster {cluster!r} not found. "
                "Confirm that the cluster is configured in your region."
            ) from exc
        elif (
            "No Container Instances" in str(exc) and task_run.get("launchType") == "EC2"
        ):
            cluster = task_run.get("cluster", "default")
            raise RuntimeError(
                f"Failed to run ECS task, cluster {cluster!r} does not appear to "
                "have any container instances associated with it. Confirm that you "
                "have EC2 container instances available."
            ) from exc
        elif (
            "failed to validate logger args" in str(exc)
            and "AccessDeniedException" in str(exc)
            and configuration.configure_cloudwatch_logs
        ):
            raise RuntimeError(
                "Failed to run ECS task, the attached execution role does not appear"
                " to have sufficient permissions. Ensure that the execution role"
                f" {configuration.execution_role!r} has permissions"
                " logs:CreateLogStream, logs:CreateLogGroup, and logs:PutLogEvents."
            )
        else:
            raise

    def _validate_task_definition(
        self, task_definition: dict, configuration: ECSJobConfiguration
    ) -> None:
        """
        Ensure that the task definition is compatible with the configuration.

        Raises `ValueError` on incompatibility. Returns `None` on success.
        """
        launch_type = configuration.task_run_request.get(
            "launchType", ECS_DEFAULT_LAUNCH_TYPE
        )
        if (
            launch_type != "EC2"
            and "FARGATE" not in task_definition["requiresCompatibilities"]
        ):
            raise ValueError(
                "Task definition does not have 'FARGATE' in 'requiresCompatibilities'"
                f" and cannot be used with launch type {launch_type!r}"
            )

        if launch_type == "FARGATE" or launch_type == "FARGATE_SPOT":
            # Only the 'awsvpc' network mode is supported when using FARGATE
            network_mode = task_definition.get("networkMode")
            if network_mode != "awsvpc":
                raise ValueError(
                    f"Found network mode {network_mode!r} which is not compatible with "
                    f"launch type {launch_type!r}. Use either the 'EC2' launch "
                    "type or the 'awsvpc' network mode."
                )

        if configuration.configure_cloudwatch_logs and not task_definition.get(
            "executionRoleArn"
        ):
            raise ValueError(
                "An execution role arn must be set on the task definition to use "
                "`configure_cloudwatch_logs` or `stream_logs` but no execution role "
                "was found on the task definition."
            )

    def _register_task_definition(
        self,
        logger: logging.Logger,
        ecs_client: _ECSClient,
        task_definition: dict,
    ) -> str:
        """
        Register a new task definition with AWS.

        Returns the ARN.
        """
        logger.info("Registering ECS task definition...")
        logger.debug(
            "Task definition request"
            f"{json.dumps(task_definition, indent=2, default=str)}"
        )
        response = ecs_client.register_task_definition(**task_definition)
        return response["taskDefinition"]["taskDefinitionArn"]

    def _retrieve_task_definition(
        self,
        logger: logging.Logger,
        ecs_client: _ECSClient,
        task_definition: str,
    ):
        """
        Retrieve an existing task definition from AWS.
        """
        if task_definition.startswith("arn:aws:ecs:"):
            logger.info(f"Retrieving ECS task definition {task_definition!r}...")
        else:
            logger.info(
                "Retrieving most recent active revision from "
                f"ECS task family {task_definition!r}..."
            )
        response = ecs_client.describe_task_definition(taskDefinition=task_definition)
        return response["taskDefinition"]

    def _wait_for_task_start(
        self,
        logger: logging.Logger,
        configuration: ECSJobConfiguration,
        task_arn: str,
        cluster_arn: str,
        ecs_client: _ECSClient,
        timeout: int,
    ) -> dict:
        """
        Waits for an ECS task run to reach a RUNNING status.

        If a STOPPED status is reached instead, an exception is raised indicating the
        reason that the task run did not start.
        """
        for task in self._watch_task_run(
            logger,
            configuration,
            task_arn,
            cluster_arn,
            ecs_client,
            until_status="RUNNING",
            timeout=timeout,
        ):
            # TODO: It is possible that the task has passed _through_ a RUNNING
            #       status during the polling interval. In this case, there is not an
            #       exception to raise.
            if task["lastStatus"] == "STOPPED":
                code = task.get("stopCode")
                reason = task.get("stoppedReason")
                # Generate a dynamic exception type from the AWS name
                raise type(code, (RuntimeError,), {})(reason)

        return task

    def _wait_for_task_finish(
        self,
        logger: logging.Logger,
        configuration: ECSJobConfiguration,
        task_arn: str,
        cluster_arn: str,
        task_definition: dict,
        ecs_client: _ECSClient,
    ):
        """
        Watch an ECS task until it reaches a STOPPED status.

        If configured, logs from the Prefect container are streamed to stderr.

        Returns a description of the task on completion.
        """
        can_stream_output = False
        container_name = (
            configuration.container_name
            or _container_name_from_task_definition(task_definition)
            or ECS_DEFAULT_CONTAINER_NAME
        )

        if configuration.stream_output:
            container_def = _get_container(
                task_definition["containerDefinitions"], container_name
            )
            if not container_def:
                logger.warning(
                    "Prefect container definition not found in "
                    "task definition. Output cannot be streamed."
                )
            elif not container_def.get("logConfiguration"):
                logger.warning(
                    "Logging configuration not found on task. "
                    "Output cannot be streamed."
                )
            elif not container_def["logConfiguration"].get("logDriver") == "awslogs":
                logger.warning(
                    "Logging configuration uses unsupported "
                    " driver {container_def['logConfiguration'].get('logDriver')!r}. "
                    "Output cannot be streamed."
                )
            else:
                # Prepare to stream the output
                log_config = container_def["logConfiguration"]["options"]
                logs_client = self._get_client(configuration, "logs")
                can_stream_output = True
                # Track the last log timestamp to prevent double display
                last_log_timestamp: Optional[int] = None
                # Determine the name of the stream as "prefix/container/run-id"
                stream_name = "/".join(
                    [
                        log_config["awslogs-stream-prefix"],
                        container_name,
                        task_arn.rsplit("/")[-1],
                    ]
                )
                self._logger.info(
                    f"Streaming output from container {container_name!r}..."
                )

        for task in self._watch_task_run(
            logger,
            configuration,
            task_arn,
            cluster_arn,
            ecs_client,
            current_status="RUNNING",
        ):
            if configuration.stream_output and can_stream_output:
                # On each poll for task run status, also retrieve available logs
                last_log_timestamp = self._stream_available_logs(
                    logger,
                    logs_client,
                    log_group=log_config["awslogs-group"],
                    log_stream=stream_name,
                    last_log_timestamp=last_log_timestamp,
                )

        return task

    def _stream_available_logs(
        self,
        logger: logging.Logger,
        logs_client: Any,
        log_group: str,
        log_stream: str,
        last_log_timestamp: Optional[int] = None,
    ) -> Optional[int]:
        """
        Stream logs from the given log group and stream since the last log timestamp.

        Will continue on paginated responses until all logs are returned.

        Returns the last log timestamp which can be used to call this method in the
        future.
        """
        last_log_stream_token = "NO-TOKEN"
        next_log_stream_token = None

        # AWS will return the same token that we send once the end of the paginated
        # response is reached
        while last_log_stream_token != next_log_stream_token:
            last_log_stream_token = next_log_stream_token

            request = {
                "logGroupName": log_group,
                "logStreamName": log_stream,
            }

            if last_log_stream_token is not None:
                request["nextToken"] = last_log_stream_token

            if last_log_timestamp is not None:
                # Bump the timestamp by one ms to avoid retrieving the last log again
                request["startTime"] = last_log_timestamp + 1

            try:
                response = logs_client.get_log_events(**request)
            except Exception:
                logger.error(
                    f"Failed to read log events with request {request}",
                    exc_info=True,
                )
                return last_log_timestamp

            log_events = response["events"]
            for log_event in log_events:
                # TODO: This doesn't forward to the local logger, which can be
                #       bad for customizing handling and understanding where the
                #       log is coming from, but it avoid nesting logger information
                #       when the content is output from a Prefect logger on the
                #       running infrastructure
                print(log_event["message"], file=sys.stderr)

                if (
                    last_log_timestamp is None
                    or log_event["timestamp"] > last_log_timestamp
                ):
                    last_log_timestamp = log_event["timestamp"]

            next_log_stream_token = response.get("nextForwardToken")
            if not log_events:
                # Stop reading pages if there was no data
                break

        return last_log_timestamp

    def _watch_task_run(
        self,
        logger: logging.Logger,
        configuration: ECSJobConfiguration,
        task_arn: str,
        cluster_arn: str,
        ecs_client: _ECSClient,
        current_status: str = "UNKNOWN",
        until_status: str = None,
        timeout: int = None,
    ) -> Generator[None, None, dict]:
        """
        Watches an ECS task run by querying every `poll_interval` seconds. After each
        query, the retrieved task is yielded. This function returns when the task run
        reaches a STOPPED status or the provided `until_status`.

        Emits a log each time the status changes.
        """
        last_status = status = current_status
        t0 = time.time()
        while status != until_status:
            tasks = ecs_client.describe_tasks(
                tasks=[task_arn], cluster=cluster_arn, include=["TAGS"]
            )["tasks"]

            if tasks:
                task = tasks[0]

                status = task["lastStatus"]
                if status != last_status:
                    logger.info(f"ECS task status is {status}.")

                yield task

                # No point in continuing if the status is final
                if status == "STOPPED":
                    break

                last_status = status

            else:
                # Intermittently, the task will not be described. We wat to respect the
                # watch timeout though.
                logger.debug("Task not found.")

            elapsed_time = time.time() - t0
            if timeout is not None and elapsed_time > timeout:
                raise RuntimeError(
                    f"Timed out after {elapsed_time}s while watching task for status "
                    f"{until_status or 'STOPPED'}."
                )
            time.sleep(configuration.task_watch_poll_interval)

    def _prepare_task_definition(
        self,
        configuration: ECSJobConfiguration,
        region: str,
    ) -> dict:
        """
        Prepare a task definition by inferring any defaults and merging overrides.
        """
        task_definition = copy.deepcopy(configuration.task_definition)

        # Configure the Prefect runtime container
        task_definition.setdefault("containerDefinitions", [])

        # Remove empty container definitions
        task_definition["containerDefinitions"] = [
            d for d in task_definition["containerDefinitions"] if d
        ]

        container_name = configuration.container_name
        if not container_name:
            container_name = (
                _container_name_from_task_definition(task_definition)
                or ECS_DEFAULT_CONTAINER_NAME
            )

        container = _get_container(
            task_definition["containerDefinitions"], container_name
        )
        if container is None:
            if container_name != ECS_DEFAULT_CONTAINER_NAME:
                raise ValueError(
                    f"Container {container_name!r} not found in task definition."
                )

            # Look for a container without a name
            for container in task_definition["containerDefinitions"]:
                if "name" not in container:
                    container["name"] = container_name
                    break
            else:
                container = {"name": container_name}
                task_definition["containerDefinitions"].append(container)

        # Image is required so make sure it's present
        container.setdefault("image", get_prefect_image_name())

        # Remove any keys that have been explicitly "unset"
        unset_keys = {key for key, value in configuration.env.items() if value is None}
        for item in tuple(container.get("environment", [])):
            if item["name"] in unset_keys or item["value"] is None:
                container["environment"].remove(item)

        if configuration.configure_cloudwatch_logs:
            container["logConfiguration"] = {
                "logDriver": "awslogs",
                "options": {
                    "awslogs-create-group": "true",
                    "awslogs-group": "prefect",
                    "awslogs-region": region,
                    "awslogs-stream-prefix": configuration.name or "prefect",
                    **configuration.cloudwatch_logs_options,
                },
            }

        family = task_definition.get("family") or ECS_DEFAULT_FAMILY
        task_definition["family"] = slugify(
            family,
            max_length=255,
            regex_pattern=r"[^a-zA-Z0-9-_]+",
        )

        # CPU and memory are required in some cases, retrieve the value to use
        cpu = task_definition.get("cpu") or ECS_DEFAULT_CPU
        memory = task_definition.get("memory") or ECS_DEFAULT_MEMORY

        launch_type = configuration.task_run_request.get(
            "launchType", ECS_DEFAULT_LAUNCH_TYPE
        )

        if launch_type == "FARGATE" or launch_type == "FARGATE_SPOT":
            # Task level memory and cpu are required when using fargate
            task_definition["cpu"] = str(cpu)
            task_definition["memory"] = str(memory)

            # The FARGATE compatibility is required if it will be used as as launch type
            requires_compatibilities = task_definition.setdefault(
                "requiresCompatibilities", []
            )
            if "FARGATE" not in requires_compatibilities:
                task_definition["requiresCompatibilities"].append("FARGATE")

            # Only the 'awsvpc' network mode is supported when using FARGATE
            # However, we will not enforce that here if the user has set it
            task_definition.setdefault("networkMode", "awsvpc")

        elif launch_type == "EC2":
            # Container level memory and cpu are required when using ec2
            container.setdefault("cpu", cpu)
            container.setdefault("memory", memory)

            # Ensure set values are cast to integers
            container["cpu"] = int(container["cpu"])
            container["memory"] = int(container["memory"])

        # Ensure set values are cast to strings
        if task_definition.get("cpu"):
            task_definition["cpu"] = str(task_definition["cpu"])
        if task_definition.get("memory"):
            task_definition["memory"] = str(task_definition["memory"])

        return task_definition

    def _load_network_configuration(
        self, vpc_id: Optional[str], configuration: ECSJobConfiguration
    ) -> dict:
        """
        Load settings from a specific VPC or the default VPC and generate a task
        run request's network configuration.
        """
        ec2_client = self._get_client(configuration, "ec2")
        vpc_message = "the default VPC" if not vpc_id else f"VPC with ID {vpc_id}"

        if not vpc_id:
            # Retrieve the default VPC
            describe = {"Filters": [{"Name": "isDefault", "Values": ["true"]}]}
        else:
            describe = {"VpcIds": [vpc_id]}

        vpcs = ec2_client.describe_vpcs(**describe)["Vpcs"]
        if not vpcs:
            help_message = (
                "Pass an explicit `vpc_id` or configure a default VPC."
                if not vpc_id
                else "Check that the VPC exists in the current region."
            )
            raise ValueError(
                f"Failed to find {vpc_message}. "
                "Network configuration cannot be inferred. "
                + help_message
            )

        vpc_id = vpcs[0]["VpcId"]
        subnets = ec2_client.describe_subnets(
            Filters=[{"Name": "vpc-id", "Values": [vpc_id]}]
        )["Subnets"]
        if not subnets:
            raise ValueError(
                f"Failed to find subnets for {vpc_message}. "
                "Network configuration cannot be inferred."
            )

        return {
            "awsvpcConfiguration": {
                "subnets": [s["SubnetId"] for s in subnets],
                "assignPublicIp": "ENABLED",
                "securityGroups": [],
            }
        }

    def _custom_network_configuration(
        self,
        vpc_id: str,
        network_configuration: dict,
        configuration: ECSJobConfiguration,
    ) -> dict:
        """
        Load settings from a specific VPC or the default VPC and generate a task
        run request's network configuration.
        """
        ec2_client = self._get_client(configuration, "ec2")
        vpc_message = f"VPC with ID {vpc_id}"

        vpcs = ec2_client.describe_vpcs(VpcIds=[vpc_id]).get("Vpcs")

        if not vpcs:
            raise ValueError(
                f"Failed to find {vpc_message}. "
                + "Network configuration cannot be inferred. "
                + "Pass an explicit `vpc_id`."
            )

        vpc_id = vpcs[0]["VpcId"]
        subnets = ec2_client.describe_subnets(
            Filters=[{"Name": "vpc-id", "Values": [vpc_id]}]
        )["Subnets"]

        if not subnets:
            raise ValueError(
                f"Failed to find subnets for {vpc_message}. "
                + "Network configuration cannot be inferred."
            )

        subnet_ids = [subnet["SubnetId"] for subnet in subnets]

        config_subnets = network_configuration.get("subnets", [])
        if not all(conf_sn in subnet_ids for conf_sn in config_subnets):
            raise ValueError(
                f"Subnets {config_subnets} not found within {vpc_message}."
                + "Please check that VPC is associated with supplied subnets."
            )

        return {"awsvpcConfiguration": network_configuration}

    def _prepare_task_run_request(
        self,
        configuration: ECSJobConfiguration,
        task_definition: dict,
        task_definition_arn: str,
    ) -> dict:
        """
        Prepare a task run request payload.
        """
        task_run_request = deepcopy(configuration.task_run_request)

        task_run_request.setdefault("taskDefinition", task_definition_arn)
        assert task_run_request["taskDefinition"] == task_definition_arn

        if task_run_request.get("launchType") == "FARGATE_SPOT":
            # Should not be provided at all for FARGATE SPOT
            task_run_request.pop("launchType", None)

            # A capacity provider strategy is required for FARGATE SPOT
            task_run_request.setdefault(
                "capacityProviderStrategy",
                [{"capacityProvider": "FARGATE_SPOT", "weight": 1}],
            )

        overrides = task_run_request.get("overrides", {})
        container_overrides = overrides.get("containerOverrides", [])

        # Ensure the network configuration is present if using awsvpc for network mode
        if (
            task_definition.get("networkMode") == "awsvpc"
            and not task_run_request.get("networkConfiguration")
            and not configuration.network_configuration
        ):
            task_run_request["networkConfiguration"] = self._load_network_configuration(
                configuration.vpc_id, configuration
            )

        # Use networkConfiguration if supplied by user
        if (
            task_definition.get("networkMode") == "awsvpc"
            and configuration.network_configuration
            and configuration.vpc_id
        ):
            task_run_request["networkConfiguration"] = (
                self._custom_network_configuration(
                    configuration.vpc_id,
                    configuration.network_configuration,
                    configuration,
                )
            )

        # Ensure the container name is set if not provided at template time

        container_name = (
            configuration.container_name
            or _container_name_from_task_definition(task_definition)
            or ECS_DEFAULT_CONTAINER_NAME
        )

        if container_overrides and not container_overrides[0].get("name"):
            container_overrides[0]["name"] = container_name

        # Ensure configuration command is respected post-templating

        orchestration_container = _get_container(container_overrides, container_name)

        if orchestration_container:
            # Override the command if given on the configuration
            if configuration.command:
                orchestration_container["command"] = configuration.command

        # Clean up templated variable formatting

        for container in container_overrides:
            if isinstance(container.get("command"), str):
                container["command"] = shlex.split(container["command"])
            if isinstance(container.get("environment"), dict):
                container["environment"] = [
                    {"name": k, "value": v} for k, v in container["environment"].items()
                ]

            # Remove null values — they're not allowed by AWS
            container["environment"] = [
                item
                for item in container.get("environment", [])
                if item["value"] is not None
            ]

        if isinstance(task_run_request.get("tags"), dict):
            task_run_request["tags"] = [
                {"key": k, "value": v} for k, v in task_run_request["tags"].items()
            ]

        if overrides.get("cpu"):
            overrides["cpu"] = str(overrides["cpu"])

        if overrides.get("memory"):
            overrides["memory"] = str(overrides["memory"])

        # Ensure configuration tags and env are respected post-templating

        tags = [
            item
            for item in task_run_request.get("tags", [])
            if item["key"] not in configuration.labels.keys()
        ] + [
            {"key": k, "value": v}
            for k, v in configuration.labels.items()
            if v is not None
        ]

        # Slugify tags keys and values
        tags = [
            {
                "key": slugify(
                    item["key"],
                    regex_pattern=_TAG_REGEX,
                    allow_unicode=True,
                    lowercase=False,
                ),
                "value": slugify(
                    item["value"],
                    regex_pattern=_TAG_REGEX,
                    allow_unicode=True,
                    lowercase=False,
                ),
            }
            for item in tags
        ]

        if tags:
            task_run_request["tags"] = tags

        if orchestration_container:
            environment = [
                item
                for item in orchestration_container.get("environment", [])
                if item["name"] not in configuration.env.keys()
            ] + [
                {"name": k, "value": v}
                for k, v in configuration.env.items()
                if v is not None
            ]
            if environment:
                orchestration_container["environment"] = environment

        # Remove empty container overrides

        overrides["containerOverrides"] = [v for v in container_overrides if v]

        return task_run_request

    @retry(
        stop=stop_after_attempt(MAX_CREATE_TASK_RUN_ATTEMPTS),
        wait=wait_fixed(CREATE_TASK_RUN_MIN_DELAY_SECONDS)
        + wait_random(
            CREATE_TASK_RUN_MIN_DELAY_JITTER_SECONDS,
            CREATE_TASK_RUN_MAX_DELAY_JITTER_SECONDS,
        ),
    )
    def _create_task_run(self, ecs_client: _ECSClient, task_run_request: dict) -> str:
        """
        Create a run of a task definition.

        Returns the task run ARN.
        """
        return ecs_client.run_task(**task_run_request)["tasks"][0]

    def _task_definitions_equal(self, taskdef_1, taskdef_2) -> bool:
        """
        Compare two task definitions.

        Since one may come from the AWS API and have populated defaults, we do our best
        to homogenize the definitions without changing their meaning.
        """
        if taskdef_1 == taskdef_2:
            return True

        if taskdef_1 is None or taskdef_2 is None:
            return False

        taskdef_1 = copy.deepcopy(taskdef_1)
        taskdef_2 = copy.deepcopy(taskdef_2)

        for taskdef in (taskdef_1, taskdef_2):
            # Set defaults that AWS would set after registration
            container_definitions = taskdef.get("containerDefinitions", [])
            essential = any(
                container.get("essential") for container in container_definitions
            )
            if not essential:
                container_definitions[0].setdefault("essential", True)

            taskdef.setdefault("networkMode", "bridge")

        _drop_empty_keys_from_task_definition(taskdef_1)
        _drop_empty_keys_from_task_definition(taskdef_2)

        # Clear fields that change on registration for comparison
        for field in ECS_POST_REGISTRATION_FIELDS:
            taskdef_1.pop(field, None)
            taskdef_2.pop(field, None)

        return taskdef_1 == taskdef_2

    async def kill_infrastructure(
        self,
        configuration: ECSJobConfiguration,
        infrastructure_pid: str,
        grace_seconds: int = 30,
    ) -> None:
        """
        Kill a task running on ECS.

        Args:
            infrastructure_pid: A cluster and task arn combination. This should match a
                value yielded by `ECSWorker.run`.
        """
        if grace_seconds != 30:
            self._logger.warning(
                f"Kill grace period of {grace_seconds}s requested, but AWS does not "
                "support dynamic grace period configuration so 30s will be used. "
                "See https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs-agent-config.html for configuration of grace periods."  # noqa
            )
        cluster, task = parse_identifier(infrastructure_pid)
        await run_sync_in_worker_thread(self._stop_task, configuration, cluster, task)

    def _stop_task(
        self, configuration: ECSJobConfiguration, cluster: str, task: str
    ) -> None:
        """
        Stop a running ECS task.
        """
        if configuration.cluster is not None and cluster != configuration.cluster:
            raise InfrastructureNotAvailable(
                "Cannot stop ECS task: this infrastructure block has access to "
                f"cluster {configuration.cluster!r} but the task is running in cluster "
                f"{cluster!r}."
            )

        ecs_client = self._get_client(configuration, "ecs")
        try:
            ecs_client.stop_task(cluster=cluster, task=task)
        except Exception as exc:
            # Raise a special exception if the task does not exist
            if "ClusterNotFound" in str(exc):
                raise InfrastructureNotFound(
                    f"Cannot stop ECS task: the cluster {cluster!r} could not be found."
                ) from exc
            if "not find task" in str(exc) or "referenced task was not found" in str(
                exc
            ):
                raise InfrastructureNotFound(
                    f"Cannot stop ECS task: the task {task!r} could not be found in "
                    f"cluster {cluster!r}."
                ) from exc
            if "no registered tasks" in str(exc):
                raise InfrastructureNotFound(
                    f"Cannot stop ECS task: the cluster {cluster!r} has no tasks."
                ) from exc

            # Reraise unknown exceptions
            raise

Classes

job_configuration (BaseJobConfiguration) pydantic-model

Job configuration for an ECS worker.

Source code in prefect_aws/workers/ecs_worker.py
class ECSJobConfiguration(BaseJobConfiguration):
    """
    Job configuration for an ECS worker.
    """

    aws_credentials: Optional[AwsCredentials] = Field(default_factory=AwsCredentials)
    task_definition: Optional[Dict[str, Any]] = Field(
        template=_default_task_definition_template()
    )
    task_run_request: Dict[str, Any] = Field(
        template=_default_task_run_request_template()
    )
    configure_cloudwatch_logs: Optional[bool] = Field(default=None)
    cloudwatch_logs_options: Dict[str, str] = Field(default_factory=dict)
    network_configuration: Dict[str, Any] = Field(default_factory=dict)
    stream_output: Optional[bool] = Field(default=None)
    task_start_timeout_seconds: int = Field(default=300)
    task_watch_poll_interval: float = Field(default=5.0)
    auto_deregister_task_definition: bool = Field(default=False)
    vpc_id: Optional[str] = Field(default=None)
    container_name: Optional[str] = Field(default=None)
    cluster: Optional[str] = Field(default=None)
    match_latest_revision_in_family: bool = Field(default=False)

    @root_validator
    def task_run_request_requires_arn_if_no_task_definition_given(cls, values) -> dict:
        """
        If no task definition is provided, a task definition ARN must be present on the
        task run request.
        """
        if not values.get("task_run_request", {}).get(
            "taskDefinition"
        ) and not values.get("task_definition"):
            raise ValueError(
                "A task definition must be provided if a task definition ARN is not "
                "present on the task run request."
            )
        return values

    @root_validator
    def container_name_default_from_task_definition(cls, values) -> dict:
        """
        Infers the container name from the task definition if not provided.
        """
        if values.get("container_name") is None:
            values["container_name"] = _container_name_from_task_definition(
                values.get("task_definition")
            )

            # We may not have a name here still; for example if someone is using a task
            # definition arn. In that case, we'll perform similar logic later to find
            # the name to treat as the "orchestration" container.

        return values

    @root_validator(pre=True)
    def set_default_configure_cloudwatch_logs(cls, values: dict) -> dict:
        """
        Streaming output generally requires CloudWatch logs to be configured.

        To avoid entangled arguments in the simple case, `configure_cloudwatch_logs`
        defaults to matching the value of `stream_output`.
        """
        configure_cloudwatch_logs = values.get("configure_cloudwatch_logs")
        if configure_cloudwatch_logs is None:
            values["configure_cloudwatch_logs"] = values.get("stream_output")
        return values

    @root_validator
    def configure_cloudwatch_logs_requires_execution_role_arn(
        cls, values: dict
    ) -> dict:
        """
        Enforces that an execution role arn is provided (or could be provided by a
        runtime task definition) when configuring logging.
        """
        if (
            values.get("configure_cloudwatch_logs")
            and not values.get("execution_role_arn")
            # TODO: Does not match
            # Do not raise if they've linked to another task definition or provided
            # it without using our shortcuts
            and not values.get("task_run_request", {}).get("taskDefinition")
            and not (values.get("task_definition") or {}).get("executionRoleArn")
        ):
            raise ValueError(
                "An `execution_role_arn` must be provided to use "
                "`configure_cloudwatch_logs` or `stream_logs`."
            )
        return values

    @root_validator
    def cloudwatch_logs_options_requires_configure_cloudwatch_logs(
        cls, values: dict
    ) -> dict:
        """
        Enforces that an execution role arn is provided (or could be provided by a
        runtime task definition) when configuring logging.
        """
        if values.get("cloudwatch_logs_options") and not values.get(
            "configure_cloudwatch_logs"
        ):
            raise ValueError(
                "`configure_cloudwatch_log` must be enabled to use "
                "`cloudwatch_logs_options`."
            )
        return values

    @root_validator
    def network_configuration_requires_vpc_id(cls, values: dict) -> dict:
        """
        Enforces a `vpc_id` is provided when custom network configuration mode is
        enabled for network settings.
        """
        if values.get("network_configuration") and not values.get("vpc_id"):
            raise ValueError(
                "You must provide a `vpc_id` to enable custom `network_configuration`."
            )
        return values
Methods
cloudwatch_logs_options_requires_configure_cloudwatch_logs classmethod

Enforces that an execution role arn is provided (or could be provided by a runtime task definition) when configuring logging.

Source code in prefect_aws/workers/ecs_worker.py
@root_validator
def cloudwatch_logs_options_requires_configure_cloudwatch_logs(
    cls, values: dict
) -> dict:
    """
    Enforces that an execution role arn is provided (or could be provided by a
    runtime task definition) when configuring logging.
    """
    if values.get("cloudwatch_logs_options") and not values.get(
        "configure_cloudwatch_logs"
    ):
        raise ValueError(
            "`configure_cloudwatch_log` must be enabled to use "
            "`cloudwatch_logs_options`."
        )
    return values
configure_cloudwatch_logs_requires_execution_role_arn classmethod

Enforces that an execution role arn is provided (or could be provided by a runtime task definition) when configuring logging.

Source code in prefect_aws/workers/ecs_worker.py
@root_validator
def configure_cloudwatch_logs_requires_execution_role_arn(
    cls, values: dict
) -> dict:
    """
    Enforces that an execution role arn is provided (or could be provided by a
    runtime task definition) when configuring logging.
    """
    if (
        values.get("configure_cloudwatch_logs")
        and not values.get("execution_role_arn")
        # TODO: Does not match
        # Do not raise if they've linked to another task definition or provided
        # it without using our shortcuts
        and not values.get("task_run_request", {}).get("taskDefinition")
        and not (values.get("task_definition") or {}).get("executionRoleArn")
    ):
        raise ValueError(
            "An `execution_role_arn` must be provided to use "
            "`configure_cloudwatch_logs` or `stream_logs`."
        )
    return values
container_name_default_from_task_definition classmethod

Infers the container name from the task definition if not provided.

Source code in prefect_aws/workers/ecs_worker.py
@root_validator
def container_name_default_from_task_definition(cls, values) -> dict:
    """
    Infers the container name from the task definition if not provided.
    """
    if values.get("container_name") is None:
        values["container_name"] = _container_name_from_task_definition(
            values.get("task_definition")
        )

        # We may not have a name here still; for example if someone is using a task
        # definition arn. In that case, we'll perform similar logic later to find
        # the name to treat as the "orchestration" container.

    return values
network_configuration_requires_vpc_id classmethod

Enforces a vpc_id is provided when custom network configuration mode is enabled for network settings.

Source code in prefect_aws/workers/ecs_worker.py
@root_validator
def network_configuration_requires_vpc_id(cls, values: dict) -> dict:
    """
    Enforces a `vpc_id` is provided when custom network configuration mode is
    enabled for network settings.
    """
    if values.get("network_configuration") and not values.get("vpc_id"):
        raise ValueError(
            "You must provide a `vpc_id` to enable custom `network_configuration`."
        )
    return values
set_default_configure_cloudwatch_logs classmethod

Streaming output generally requires CloudWatch logs to be configured.

To avoid entangled arguments in the simple case, configure_cloudwatch_logs defaults to matching the value of stream_output.

Source code in prefect_aws/workers/ecs_worker.py
@root_validator(pre=True)
def set_default_configure_cloudwatch_logs(cls, values: dict) -> dict:
    """
    Streaming output generally requires CloudWatch logs to be configured.

    To avoid entangled arguments in the simple case, `configure_cloudwatch_logs`
    defaults to matching the value of `stream_output`.
    """
    configure_cloudwatch_logs = values.get("configure_cloudwatch_logs")
    if configure_cloudwatch_logs is None:
        values["configure_cloudwatch_logs"] = values.get("stream_output")
    return values
task_run_request_requires_arn_if_no_task_definition_given classmethod

If no task definition is provided, a task definition ARN must be present on the task run request.

Source code in prefect_aws/workers/ecs_worker.py
@root_validator
def task_run_request_requires_arn_if_no_task_definition_given(cls, values) -> dict:
    """
    If no task definition is provided, a task definition ARN must be present on the
    task run request.
    """
    if not values.get("task_run_request", {}).get(
        "taskDefinition"
    ) and not values.get("task_definition"):
        raise ValueError(
            "A task definition must be provided if a task definition ARN is not "
            "present on the task run request."
        )
    return values
job_configuration_variables (BaseVariables) pydantic-model

Variables for templating an ECS job.

Source code in prefect_aws/workers/ecs_worker.py
class ECSVariables(BaseVariables):
    """
    Variables for templating an ECS job.
    """

    task_definition_arn: Optional[str] = Field(
        default=None,
        description=(
            "An identifier for an existing task definition to use. If set, options that"
            " require changes to the task definition will be ignored. All contents of "
            "the task definition in the job configuration will be ignored."
        ),
    )
    env: Dict[str, Optional[str]] = Field(
        title="Environment Variables",
        default_factory=dict,
        description=(
            "Environment variables to provide to the task run. These variables are set "
            "on the Prefect container at task runtime. These will not be set on the "
            "task definition."
        ),
    )
    aws_credentials: AwsCredentials = Field(
        title="AWS Credentials",
        default_factory=AwsCredentials,
        description=(
            "The AWS credentials to use to connect to ECS. If not provided, credentials"
            " will be inferred from the local environment following AWS's boto client's"
            " rules."
        ),
    )
    cluster: Optional[str] = Field(
        default=None,
        description=(
            "The ECS cluster to run the task in. An ARN or name may be provided. If "
            "not provided, the default cluster will be used."
        ),
    )
    family: Optional[str] = Field(
        default=None,
        description=(
            "A family for the task definition. If not provided, it will be inferred "
            "from the task definition. If the task definition does not have a family, "
            "the name will be generated. When flow and deployment metadata is "
            "available, the generated name will include their names. Values for this "
            "field will be slugified to match AWS character requirements."
        ),
    )
    launch_type: Optional[Literal["FARGATE", "EC2", "EXTERNAL", "FARGATE_SPOT"]] = (
        Field(
            default=ECS_DEFAULT_LAUNCH_TYPE,
            description=(
                "The type of ECS task run infrastructure that should be used. Note that"
                " 'FARGATE_SPOT' is not a formal ECS launch type, but we will configure"
                " the proper capacity provider strategy if set here."
            ),
        )
    )
    image: Optional[str] = Field(
        default=None,
        description=(
            "The image to use for the Prefect container in the task. If this value is "
            "not null, it will override the value in the task definition. This value "
            "defaults to a Prefect base image matching your local versions."
        ),
    )
    cpu: int = Field(
        title="CPU",
        default=None,
        description=(
            "The amount of CPU to provide to the ECS task. Valid amounts are "
            "specified in the AWS documentation. If not provided, a default value of "
            f"{ECS_DEFAULT_CPU} will be used unless present on the task definition."
        ),
    )
    memory: int = Field(
        default=None,
        description=(
            "The amount of memory to provide to the ECS task. Valid amounts are "
            "specified in the AWS documentation. If not provided, a default value of "
            f"{ECS_DEFAULT_MEMORY} will be used unless present on the task definition."
        ),
    )
    container_name: str = Field(
        default=None,
        description=(
            "The name of the container flow run orchestration will occur in. If not "
            f"specified, a default value of {ECS_DEFAULT_CONTAINER_NAME} will be used "
            "and if that is not found in the task definition the first container will "
            "be used."
        ),
    )
    task_role_arn: str = Field(
        title="Task Role ARN",
        default=None,
        description=(
            "A role to attach to the task run. This controls the permissions of the "
            "task while it is running."
        ),
    )
    execution_role_arn: str = Field(
        title="Execution Role ARN",
        default=None,
        description=(
            "An execution role to use for the task. This controls the permissions of "
            "the task when it is launching. If this value is not null, it will "
            "override the value in the task definition. An execution role must be "
            "provided to capture logs from the container."
        ),
    )
    vpc_id: Optional[str] = Field(
        title="VPC ID",
        default=None,
        description=(
            "The AWS VPC to link the task run to. This is only applicable when using "
            "the 'awsvpc' network mode for your task. FARGATE tasks require this "
            "network  mode, but for EC2 tasks the default network mode is 'bridge'. "
            "If using the 'awsvpc' network mode and this field is null, your default "
            "VPC will be used. If no default VPC can be found, the task run will fail."
        ),
    )
    configure_cloudwatch_logs: bool = Field(
        default=None,
        description=(
            "If enabled, the Prefect container will be configured to send its output "
            "to the AWS CloudWatch logs service. This functionality requires an "
            "execution role with logs:CreateLogStream, logs:CreateLogGroup, and "
            "logs:PutLogEvents permissions. The default for this field is `False` "
            "unless `stream_output` is set."
        ),
    )
    cloudwatch_logs_options: Dict[str, str] = Field(
        default_factory=dict,
        description=(
            "When `configure_cloudwatch_logs` is enabled, this setting may be used to"
            " pass additional options to the CloudWatch logs configuration or override"
            " the default options. See the [AWS"
            " documentation](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/using_awslogs.html#create_awslogs_logdriver_options)"  # noqa
            " for available options. "
        ),
    )

    network_configuration: Dict[str, Any] = Field(
        default_factory=dict,
        description=(
            "When `network_configuration` is supplied it will override ECS Worker's"
            "awsvpcConfiguration that defined in the ECS task executing your workload. "
            "See the [AWS documentation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-ecs-service-awsvpcconfiguration.html)"  # noqa
            " for available options."
        ),
    )

    stream_output: bool = Field(
        default=None,
        description=(
            "If enabled, logs will be streamed from the Prefect container to the local "
            "console. Unless you have configured AWS CloudWatch logs manually on your "
            "task definition, this requires the same prerequisites outlined in "
            "`configure_cloudwatch_logs`."
        ),
    )
    task_start_timeout_seconds: int = Field(
        default=300,
        description=(
            "The amount of time to watch for the start of the ECS task "
            "before marking it as failed. The task must enter a RUNNING state to be "
            "considered started."
        ),
    )
    task_watch_poll_interval: float = Field(
        default=5.0,
        description=(
            "The amount of time to wait between AWS API calls while monitoring the "
            "state of an ECS task."
        ),
    )
    auto_deregister_task_definition: bool = Field(
        default=False,
        description=(
            "If enabled, any task definitions that are created by this block will be "
            "deregistered. Existing task definitions linked by ARN will never be "
            "deregistered. Deregistering a task definition does not remove it from "
            "your AWS account, instead it will be marked as INACTIVE."
        ),
    )
    match_latest_revision_in_family: bool = Field(
        default=False,
        description=(
            "If enabled, the most recent active revision in the task definition "
            "family will be compared against the desired ECS task configuration. "
            "If they are equal, the existing task definition will be used instead "
            "of registering a new one. If no family is specified the default family "
            f'"{ECS_DEFAULT_FAMILY}" will be used.'
        ),
    )
Attributes
auto_deregister_task_definition: bool pydantic-field

If enabled, any task definitions that are created by this block will be deregistered. Existing task definitions linked by ARN will never be deregistered. Deregistering a task definition does not remove it from your AWS account, instead it will be marked as INACTIVE.

aws_credentials: AwsCredentials pydantic-field

The AWS credentials to use to connect to ECS. If not provided, credentials will be inferred from the local environment following AWS's boto client's rules.

cloudwatch_logs_options: Dict[str, str] pydantic-field

When configure_cloudwatch_logs is enabled, this setting may be used to pass additional options to the CloudWatch logs configuration or override the default options. See the AWS documentation for available options.

cluster: str pydantic-field

The ECS cluster to run the task in. An ARN or name may be provided. If not provided, the default cluster will be used.

configure_cloudwatch_logs: bool pydantic-field

If enabled, the Prefect container will be configured to send its output to the AWS CloudWatch logs service. This functionality requires an execution role with logs:CreateLogStream, logs:CreateLogGroup, and logs:PutLogEvents permissions. The default for this field is False unless stream_output is set.

container_name: str pydantic-field

The name of the container flow run orchestration will occur in. If not specified, a default value of prefect will be used and if that is not found in the task definition the first container will be used.

cpu: int pydantic-field

The amount of CPU to provide to the ECS task. Valid amounts are specified in the AWS documentation. If not provided, a default value of 1024 will be used unless present on the task definition.

execution_role_arn: str pydantic-field

An execution role to use for the task. This controls the permissions of the task when it is launching. If this value is not null, it will override the value in the task definition. An execution role must be provided to capture logs from the container.

family: str pydantic-field

A family for the task definition. If not provided, it will be inferred from the task definition. If the task definition does not have a family, the name will be generated. When flow and deployment metadata is available, the generated name will include their names. Values for this field will be slugified to match AWS character requirements.

image: str pydantic-field

The image to use for the Prefect container in the task. If this value is not null, it will override the value in the task definition. This value defaults to a Prefect base image matching your local versions.

launch_type: Literal['FARGATE', 'EC2', 'EXTERNAL', 'FARGATE_SPOT'] pydantic-field

The type of ECS task run infrastructure that should be used. Note that 'FARGATE_SPOT' is not a formal ECS launch type, but we will configure the proper capacity provider strategy if set here.

match_latest_revision_in_family: bool pydantic-field

If enabled, the most recent active revision in the task definition family will be compared against the desired ECS task configuration. If they are equal, the existing task definition will be used instead of registering a new one. If no family is specified the default family "prefect" will be used.

memory: int pydantic-field

The amount of memory to provide to the ECS task. Valid amounts are specified in the AWS documentation. If not provided, a default value of 2048 will be used unless present on the task definition.

network_configuration: Dict[str, Any] pydantic-field

When network_configuration is supplied it will override ECS Worker'sawsvpcConfiguration that defined in the ECS task executing your workload. See the AWS documentation for available options.

stream_output: bool pydantic-field

If enabled, logs will be streamed from the Prefect container to the local console. Unless you have configured AWS CloudWatch logs manually on your task definition, this requires the same prerequisites outlined in configure_cloudwatch_logs.

task_definition_arn: str pydantic-field

An identifier for an existing task definition to use. If set, options that require changes to the task definition will be ignored. All contents of the task definition in the job configuration will be ignored.

task_role_arn: str pydantic-field

A role to attach to the task run. This controls the permissions of the task while it is running.

task_start_timeout_seconds: int pydantic-field

The amount of time to watch for the start of the ECS task before marking it as failed. The task must enter a RUNNING state to be considered started.

task_watch_poll_interval: float pydantic-field

The amount of time to wait between AWS API calls while monitoring the state of an ECS task.

vpc_id: str pydantic-field

The AWS VPC to link the task run to. This is only applicable when using the 'awsvpc' network mode for your task. FARGATE tasks require this network mode, but for EC2 tasks the default network mode is 'bridge'. If using the 'awsvpc' network mode and this field is null, your default VPC will be used. If no default VPC can be found, the task run will fail.

Methods

kill_infrastructure async

Kill a task running on ECS.

Parameters:

Name Type Description Default
infrastructure_pid str

A cluster and task arn combination. This should match a value yielded by ECSWorker.run.

required
Source code in prefect_aws/workers/ecs_worker.py
async def kill_infrastructure(
    self,
    configuration: ECSJobConfiguration,
    infrastructure_pid: str,
    grace_seconds: int = 30,
) -> None:
    """
    Kill a task running on ECS.

    Args:
        infrastructure_pid: A cluster and task arn combination. This should match a
            value yielded by `ECSWorker.run`.
    """
    if grace_seconds != 30:
        self._logger.warning(
            f"Kill grace period of {grace_seconds}s requested, but AWS does not "
            "support dynamic grace period configuration so 30s will be used. "
            "See https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs-agent-config.html for configuration of grace periods."  # noqa
        )
    cluster, task = parse_identifier(infrastructure_pid)
    await run_sync_in_worker_thread(self._stop_task, configuration, cluster, task)
run async

Runs a given flow run on the current worker.

Source code in prefect_aws/workers/ecs_worker.py
async def run(
    self,
    flow_run: "FlowRun",
    configuration: ECSJobConfiguration,
    task_status: Optional[anyio.abc.TaskStatus] = None,
) -> BaseWorkerResult:
    """
    Runs a given flow run on the current worker.
    """
    ecs_client = await run_sync_in_worker_thread(
        self._get_client, configuration, "ecs"
    )

    logger = self.get_flow_run_logger(flow_run)

    (
        task_arn,
        cluster_arn,
        task_definition,
        is_new_task_definition,
    ) = await run_sync_in_worker_thread(
        self._create_task_and_wait_for_start,
        logger,
        ecs_client,
        configuration,
        flow_run,
    )

    # The task identifier is "{cluster}::{task}" where we use the configured cluster
    # if set to preserve matching by name rather than arn
    # Note "::" is used despite the Prefect standard being ":" because ARNs contain
    # single colons.
    identifier = (
        (configuration.cluster if configuration.cluster else cluster_arn)
        + "::"
        + task_arn
    )

    if task_status:
        task_status.started(identifier)

    status_code = await run_sync_in_worker_thread(
        self._watch_task_and_get_exit_code,
        logger,
        configuration,
        task_arn,
        cluster_arn,
        task_definition,
        is_new_task_definition and configuration.auto_deregister_task_definition,
        ecs_client,
    )

    return ECSWorkerResult(
        identifier=identifier,
        # If the container does not start the exit code can be null but we must
        # still report a status code. We use a -1 to indicate a special code.
        status_code=status_code if status_code is not None else -1,
    )

ECSWorkerResult (BaseWorkerResult) pydantic-model

The result of an ECS job.

Source code in prefect_aws/workers/ecs_worker.py
class ECSWorkerResult(BaseWorkerResult):
    """
    The result of an ECS job.
    """

Functions

parse_identifier

Splits identifier into its cluster and task components, e.g. input "cluster_name::task_arn" outputs ("cluster_name", "task_arn").

Source code in prefect_aws/workers/ecs_worker.py
def parse_identifier(identifier: str) -> ECSIdentifier:
    """
    Splits identifier into its cluster and task components, e.g.
    input "cluster_name::task_arn" outputs ("cluster_name", "task_arn").
    """
    cluster, task = identifier.split("::", maxsplit=1)
    return ECSIdentifier(cluster, task)