Skip to content

prefect_aws.glue_job

Integrations with the AWS Glue Job.

Classes

GlueJobBlock (JobBlock) pydantic-model

Execute a job to the AWS Glue Job service.

Attributes:

Name Type Description
job_name str

The name of the job definition to use.

arguments Optional[dict]

The job arguments associated with this run. For this job run, they replace the default arguments set in the job definition itself. You can specify arguments here that your own job-execution script consumes, as well as arguments that Glue itself consumes. Job arguments may be logged. Do not pass plaintext secrets as arguments. Retrieve secrets from a Glue Connection, Secrets Manager or other secret management mechanism if you intend to keep them within the Job. doc

job_watch_poll_interval float

The amount of time to wait between AWS API calls while monitoring the state of a Glue Job. default is 60s because of jobs that use AWS Glue versions 2.0 and later have a 1-minute minimum. AWS Glue Pricing

Examples:

Start a job to AWS Glue Job.

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.glue_job import GlueJobBlock


@flow
def example_run_glue_job():
    aws_credentials = AwsCredentials(
        aws_access_key_id="your_access_key_id",
        aws_secret_access_key="your_secret_access_key"
    )
    glue_job_run = GlueJobBlock(
        job_name="your_glue_job_name",
        arguments={"--YOUR_EXTRA_ARGUMENT": "YOUR_EXTRA_ARGUMENT_VALUE"},
    ).trigger()

    return glue_job_run.wait_for_completion()


example_run_glue_job()

Source code in prefect_aws/glue_job.py
class GlueJobBlock(JobBlock):
    """Execute a job to the AWS Glue Job service.

    Attributes:
        job_name: The name of the job definition to use.
        arguments: The job arguments associated with this run.
            For this job run, they replace the default arguments set in the job
            definition itself.
            You can specify arguments here that your own job-execution script consumes,
            as well as arguments that Glue itself consumes.
            Job arguments may be logged. Do not pass plaintext secrets as arguments.
            Retrieve secrets from a Glue Connection, Secrets Manager or other secret
            management mechanism if you intend to keep them within the Job.
            [doc](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html)
        job_watch_poll_interval: The amount of time to wait between AWS API
            calls while monitoring the state of a Glue Job.
            default is 60s because of jobs that use AWS Glue versions 2.0 and later
            have a 1-minute minimum.
            [AWS Glue Pricing](https://aws.amazon.com/glue/pricing/?nc1=h_ls)

    Example:
        Start a job to AWS Glue Job.
        ```python
        from prefect import flow
        from prefect_aws import AwsCredentials
        from prefect_aws.glue_job import GlueJobBlock


        @flow
        def example_run_glue_job():
            aws_credentials = AwsCredentials(
                aws_access_key_id="your_access_key_id",
                aws_secret_access_key="your_secret_access_key"
            )
            glue_job_run = GlueJobBlock(
                job_name="your_glue_job_name",
                arguments={"--YOUR_EXTRA_ARGUMENT": "YOUR_EXTRA_ARGUMENT_VALUE"},
            ).trigger()

            return glue_job_run.wait_for_completion()


        example_run_glue_job()
        ```
    """

    job_name: str = Field(
        ...,
        title="AWS Glue Job Name",
        description="The name of the job definition to use.",
    )

    arguments: Optional[dict] = Field(
        default=None,
        title="AWS Glue Job Arguments",
        description="The job arguments associated with this run.",
    )
    job_watch_poll_interval: float = Field(
        default=60.0,
        description=(
            "The amount of time to wait between AWS API calls while monitoring the "
            "state of an Glue Job."
        ),
    )

    aws_credentials: AwsCredentials = Field(
        title="AWS Credentials",
        default_factory=AwsCredentials,
        description="The AWS credentials to use to connect to Glue.",
    )

    async def trigger(self) -> GlueJobRun:
        """trigger for GlueJobRun"""
        client = self._get_client()
        job_run_id = self._start_job(client)
        return GlueJobRun(
            job_name=self.job_name,
            job_id=job_run_id,
            job_watch_poll_interval=self.job_watch_poll_interval,
        )

    def _start_job(self, client: _GlueJobClient) -> str:
        """
        Start the AWS Glue Job
        [doc](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/client/start_job_run.html)
        """
        self.logger.info(
            f"starting job {self.job_name} with arguments {self.arguments}"
        )
        try:
            response = client.start_job_run(
                JobName=self.job_name,
                Arguments=self.arguments,
            )
            job_run_id = str(response["JobRunId"])
            self.logger.info(f"job started with job run id: {job_run_id}")
            return job_run_id
        except Exception as e:
            self.logger.error(f"failed to start job: {e}")
            raise RuntimeError

    def _get_client(self) -> _GlueJobClient:
        """
        Retrieve a Glue Job Client
        """
        boto_session = self.aws_credentials.get_boto3_session()
        return boto_session.client("glue")

Attributes

arguments: dict pydantic-field

The job arguments associated with this run.

aws_credentials: AwsCredentials pydantic-field

The AWS credentials to use to connect to Glue.

job_name: str pydantic-field required

The name of the job definition to use.

job_watch_poll_interval: float pydantic-field

The amount of time to wait between AWS API calls while monitoring the state of an Glue Job.

Methods

__json_encoder__ special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

trigger async

trigger for GlueJobRun

Source code in prefect_aws/glue_job.py
async def trigger(self) -> GlueJobRun:
    """trigger for GlueJobRun"""
    client = self._get_client()
    job_run_id = self._start_job(client)
    return GlueJobRun(
        job_name=self.job_name,
        job_id=job_run_id,
        job_watch_poll_interval=self.job_watch_poll_interval,
    )

GlueJobRun (JobRun, BaseModel) pydantic-model

Execute a Glue Job

Source code in prefect_aws/glue_job.py
class GlueJobRun(JobRun, BaseModel):
    """Execute a Glue Job"""

    job_name: str = Field(
        ...,
        title="AWS Glue Job Name",
        description="The name of the job definition to use.",
    )

    job_id: str = Field(
        ...,
        title="AWS Glue Job ID",
        description="The ID of the job run.",
    )

    job_watch_poll_interval: float = Field(
        default=60.0,
        description=(
            "The amount of time to wait between AWS API calls while monitoring the "
            "state of an Glue Job."
        ),
    )

    _error_states = ["FAILED", "STOPPED", "ERROR", "TIMEOUT"]

    aws_credentials: AwsCredentials = Field(
        title="AWS Credentials",
        default_factory=AwsCredentials,
        description="The AWS credentials to use to connect to Glue.",
    )

    client: _GlueJobClient = Field(default=None, description="")

    async def fetch_result(self) -> str:
        """fetch glue job state"""
        job = self._get_job_run()
        return job["JobRun"]["JobRunState"]

    def wait_for_completion(self) -> None:
        """
        Wait for the job run to complete and get exit code
        """
        self.logger.info(f"watching job {self.job_name} with run id {self.job_id}")
        while True:
            job = self._get_job_run()
            job_state = job["JobRun"]["JobRunState"]
            if job_state in self._error_states:
                # Generate a dynamic exception type from the AWS name
                self.logger.error(f"job failed: {job['JobRun']['ErrorMessage']}")
                raise RuntimeError(job["JobRun"]["ErrorMessage"])
            elif job_state == "SUCCEEDED":
                self.logger.info(f"job succeeded: {self.job_id}")
                break

            time.sleep(self.job_watch_poll_interval)

    def _get_job_run(self):
        """get glue job"""
        return self.client.get_job_run(JobName=self.job_name, RunId=self.job_id)

Attributes

aws_credentials: AwsCredentials pydantic-field

The AWS credentials to use to connect to Glue.

job_id: str pydantic-field required

The ID of the job run.

job_name: str pydantic-field required

The name of the job definition to use.

job_watch_poll_interval: float pydantic-field

The amount of time to wait between AWS API calls while monitoring the state of an Glue Job.

Methods

fetch_result async

fetch glue job state

Source code in prefect_aws/glue_job.py
async def fetch_result(self) -> str:
    """fetch glue job state"""
    job = self._get_job_run()
    return job["JobRun"]["JobRunState"]
wait_for_completion

Wait for the job run to complete and get exit code

Source code in prefect_aws/glue_job.py
def wait_for_completion(self) -> None:
    """
    Wait for the job run to complete and get exit code
    """
    self.logger.info(f"watching job {self.job_name} with run id {self.job_id}")
    while True:
        job = self._get_job_run()
        job_state = job["JobRun"]["JobRunState"]
        if job_state in self._error_states:
            # Generate a dynamic exception type from the AWS name
            self.logger.error(f"job failed: {job['JobRun']['ErrorMessage']}")
            raise RuntimeError(job["JobRun"]["ErrorMessage"])
        elif job_state == "SUCCEEDED":
            self.logger.info(f"job succeeded: {self.job_id}")
            break

        time.sleep(self.job_watch_poll_interval)