Skip to content

Profiler workflow module

Extension to the OpenMetadata ProfilerWorkflow class

PrefectOpenMetadataProfiler

Bases: ProfilerWorkflow

OpenMetadata profiler workflow that adds a method allowing to log the workflow status to the Prefect backend.

Parameters:

Name Type Description Default
config

string with a YAML or JSON configuration file

required
Source code in prefect_openmetadata/profiler_workflow.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class PrefectOpenMetadataProfiler(ProfilerWorkflow):
    """
    OpenMetadata profiler workflow that adds a method
    allowing to log the workflow status to the Prefect backend.

    Args:
         config: string with a YAML or JSON configuration file
    """

    def __int__(self, config: OpenMetadataWorkflowConfig):
        """
        Args:
            config: string with a YAML or JSON configuration file
        """
        super().__init__(config=config)

    def log_flow_status(self) -> None:
        """
        Log workflow status to the Prefect API backend
        """
        logger = get_run_logger()
        logger.info("Source Status: %s", self.source_status.as_string())
        logger.info("Processir Status: %s", self.processor.get_status().as_string())
        if hasattr(self, "sink"):
            logger.info("Sink Status: %s", self.sink.get_status().as_string())

__int__(config)

Parameters:

Name Type Description Default
config OpenMetadataWorkflowConfig

string with a YAML or JSON configuration file

required
Source code in prefect_openmetadata/profiler_workflow.py
20
21
22
23
24
25
def __int__(self, config: OpenMetadataWorkflowConfig):
    """
    Args:
        config: string with a YAML or JSON configuration file
    """
    super().__init__(config=config)

log_flow_status()

Log workflow status to the Prefect API backend

Source code in prefect_openmetadata/profiler_workflow.py
27
28
29
30
31
32
33
34
35
def log_flow_status(self) -> None:
    """
    Log workflow status to the Prefect API backend
    """
    logger = get_run_logger()
    logger.info("Source Status: %s", self.source_status.as_string())
    logger.info("Processir Status: %s", self.processor.get_status().as_string())
    if hasattr(self, "sink"):
        logger.info("Sink Status: %s", self.sink.get_status().as_string())