Skip to content

Ingestion workflow module

Extension to the OpenMetadata Workflow class

PrefectOpenMetadataIngestion

Bases: Workflow

OpenMetadata ingestion workflow that adds a method allowing to log the workflow status to the Prefect backend.

Parameters:

Name Type Description Default
config

string with a YAML or JSON configuration file

required
Source code in prefect_openmetadata/ingestion_workflow.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class PrefectOpenMetadataIngestion(Workflow):
    """
    OpenMetadata ingestion workflow that adds a method
    allowing to log the workflow status to the Prefect backend.

    Args:
         config: string with a YAML or JSON configuration file
    """

    def __int__(self, config: OpenMetadataWorkflowConfig):
        """
        Args:
            config: string with a YAML or JSON configuration file
        """
        super().__init__(config=config)

    def log_flow_status(self) -> None:
        """
        Log workflow status to the Prefect API backend
        """
        logger = get_run_logger()
        logger.info("Source Status: %s", self.source.get_status().as_string())
        if hasattr(self, "stage"):
            logger.info("Stage Status: %s", self.stage.get_status().as_string())
        if hasattr(self, "sink"):
            logger.info("Sink Status: %s", self.sink.get_status().as_string())
        if hasattr(self, "bulk_sink"):
            logger.info("Bulk Sink Status: %s", self.bulk_sink.get_status().as_string())

__int__(config)

Parameters:

Name Type Description Default
config OpenMetadataWorkflowConfig

string with a YAML or JSON configuration file

required
Source code in prefect_openmetadata/ingestion_workflow.py
20
21
22
23
24
25
def __int__(self, config: OpenMetadataWorkflowConfig):
    """
    Args:
        config: string with a YAML or JSON configuration file
    """
    super().__init__(config=config)

log_flow_status()

Log workflow status to the Prefect API backend

Source code in prefect_openmetadata/ingestion_workflow.py
27
28
29
30
31
32
33
34
35
36
37
38
def log_flow_status(self) -> None:
    """
    Log workflow status to the Prefect API backend
    """
    logger = get_run_logger()
    logger.info("Source Status: %s", self.source.get_status().as_string())
    if hasattr(self, "stage"):
        logger.info("Stage Status: %s", self.stage.get_status().as_string())
    if hasattr(self, "sink"):
        logger.info("Sink Status: %s", self.sink.get_status().as_string())
    if hasattr(self, "bulk_sink"):
        logger.info("Bulk Sink Status: %s", self.bulk_sink.get_status().as_string())