Skip to content

Flows & tasks

Prefect flows and tasks for OpenMetadata workflows include:

  • metadata ingestion workflow: information about your tables, view, schemas, etc
  • usage ingestion workflow: query and audit logs showing usage patterns
  • profiler workflow: profile data and run data validation tests

Follow the main documentation for guidance on:

  • installing and configuring Prefect and OpenMetadata,
  • running metadata ingestion flows locally and on schedule.

OpenMetadataFailedConnection

Bases: Exception

Exception for failed connection attempts

Source code in prefect_openmetadata/flows.py
29
30
class OpenMetadataFailedConnection(Exception):
    """Exception for failed connection attempts"""

ingest_metadata(config, is_json=False)

Ingests raw metadata about tables, dashboards, users, topics, pipelines, etc. The same flow is used for usage ingestion.

Parameters:

Name Type Description Default
config str

configuration spec, by default in YAML, optionally in JSON

required
is_json bool

flag whether config is a JSON spec rather than YAML

False

Examples:

Flow ingesting metadata using Prefect:

from prefect_openmetadata.flows import ingest_metadata

config = """See an example in the section: Run ingestion flow"""

if __name__ == "__main__":
    ingest_metadata(config)

Source code in prefect_openmetadata/flows.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@flow
def ingest_metadata(config: str, is_json: bool = False) -> None:
    """
    Ingests raw metadata about tables, dashboards, users, topics, pipelines, etc.
    The same flow is used for usage ingestion.

    Args:
        config: configuration spec, by default in YAML, optionally in JSON
        is_json: flag whether `config` is a JSON spec rather than YAML

    Examples:
        Flow ingesting metadata using Prefect:
        ```python
        from prefect_openmetadata.flows import ingest_metadata

        config = \"""See an example in the section: Run ingestion flow\"""

        if __name__ == "__main__":
            ingest_metadata(config)
        ```
    """
    om_workflow_config = json.loads(config) if is_json else yaml.safe_load(config)
    om_workflow_model = OpenMetadataWorkflowConfig.parse_obj(om_workflow_config)
    run_ingestion_task(om_workflow_model)

make_test_connection(conn_config) async

Task making a test connection to the specified OpenMetadata connection

Parameters:

Name Type Description Default
conn_config TestServiceConnectionRequest

connection spec as a pydantic model

required
Source code in prefect_openmetadata/flows.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@task
async def make_test_connection(conn_config: TestServiceConnectionRequest) -> None:
    """
    Task making a test connection to the specified OpenMetadata connection

    Args:
        conn_config: connection spec as a pydantic model
    """
    logger = get_run_logger()
    connection = get_connection(conn_config.connection.config)
    try:
        test_connection(connection)
        logger.info("Connection successful!")
    except OpenMetadataFailedConnection as exc:
        logger.error("Test connection failed")
        raise exc

profile_metadata(config, is_json=False)

Profiles metadata about tables, dashboards, users, topics, pipelines, etc.

Parameters:

Name Type Description Default
config str

configuration spec, by default in YAML, optionally in JSON

required
is_json bool

flag whether config is a JSON spec rather than YAML

False

Examples:

Flow profiling metadata using Prefect:

from prefect_openmetadata.flows import profile_metadata

config = """See an example in the section: Run profiling flow"""

if __name__ == "__main__":
    profile_metadata(config)

Source code in prefect_openmetadata/flows.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@flow
def profile_metadata(config: str, is_json: bool = False) -> None:
    """
    Profiles metadata about tables, dashboards, users, topics, pipelines, etc.

    Args:
        config: configuration spec, by default in YAML, optionally in JSON
        is_json: flag whether `config` is a JSON spec rather than YAML

    Examples:
        Flow profiling metadata using Prefect:
        ```python
        from prefect_openmetadata.flows import profile_metadata

        config = \"""See an example in the section: Run profiling flow\"""

        if __name__ == "__main__":
            profile_metadata(config)
        ```
    """
    om_workflow_config = json.loads(config) if is_json else yaml.safe_load(config)
    om_workflow_model = OpenMetadataWorkflowConfig.parse_obj(om_workflow_config)
    run_profiling_task(om_workflow_model)

run_ingestion_task(om_workflow_model) async

Task ingesting metadata into OpenMetadata backend

Parameters:

Name Type Description Default
om_workflow_model OpenMetadataWorkflowConfig

ingestion spec as a pydantic model

required
Source code in prefect_openmetadata/flows.py
33
34
35
36
37
38
39
40
41
42
43
44
45
@task
async def run_ingestion_task(om_workflow_model: OpenMetadataWorkflowConfig) -> None:
    """
    Task ingesting metadata into OpenMetadata backend

    Args:
        om_workflow_model: ingestion spec as a pydantic model
    """
    workflow = PrefectOpenMetadataIngestion(om_workflow_model)
    workflow.execute()
    workflow.log_flow_status()
    workflow.raise_from_status()
    workflow.stop()

run_profiling_task(om_workflow_model) async

Task profiling a given OpenMetadata source

Parameters:

Name Type Description Default
om_workflow_model OpenMetadataWorkflowConfig

profiling spec as a pydantic model

required
Source code in prefect_openmetadata/flows.py
74
75
76
77
78
79
80
81
82
83
84
85
86
@task
async def run_profiling_task(om_workflow_model: OpenMetadataWorkflowConfig) -> None:
    """
    Task profiling a given OpenMetadata source

    Args:
        om_workflow_model: profiling spec as a pydantic model
    """
    workflow = PrefectOpenMetadataProfiler(om_workflow_model)
    workflow.execute()
    workflow.log_flow_status()
    workflow.raise_from_status()
    workflow.stop()

validate_connection(conn_config, is_json=False)

Makes a SQLAlchemy connection based on a given JSON or YAML config for testing. Go to the OpenMetadata schema definitions, to inspect required fields to connect with your desired system and crawl its metadata.

Requires installing the required sqlalchemy subpackage for the relevant connector e.g. Snowflake connection requires pip install snowflake-sqlalchemy

Parameters:

Name Type Description Default
conn_config str

connection spec as a string from a JSON spec

required
is_json bool

flag whether conn_config is a JSON spec rather than YAML

False

Examples:

Flow testing connection using Prefect:

from prefect_openmetadata.flows import validate_connection

config = """
connection:
  config:
    type: Snowflake
    username: DEMO
    password: xxx
    account: xxx.us-east-2.aws
    database: YOUR_DB
    warehouse: COMPUTE_WH
connectionType: Database
"""

if __name__ == "__main__":
    validate_connection(config)

Source code in prefect_openmetadata/flows.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
@flow
def validate_connection(conn_config: str, is_json: bool = False):
    """
    Makes a SQLAlchemy connection based on a given JSON or YAML config for testing.
    Go to the [OpenMetadata schema definitions](https://github.com/open-metadata/OpenMetadata/tree/main/catalog-rest-service/src/main/resources/json/schema/entity/services/connections),
    to inspect required fields to connect with your desired system and crawl its metadata.

    Requires installing the required sqlalchemy subpackage for the relevant connector
    e.g. Snowflake connection requires `pip install snowflake-sqlalchemy`

    Args:
        conn_config: connection spec as a string from a JSON spec
        is_json:  flag whether `conn_config` is a JSON spec rather than YAML

    Examples:
        Flow testing connection using Prefect:
        ```python
        from prefect_openmetadata.flows import validate_connection

        config = \"""
        connection:
          config:
            type: Snowflake
            username: DEMO
            password: xxx
            account: xxx.us-east-2.aws
            database: YOUR_DB
            warehouse: COMPUTE_WH
        connectionType: Database
        \"""

        if __name__ == "__main__":
            validate_connection(config)
        ```
    """  # noqa
    conn_spec_dict = json.loads(conn_config) if is_json else yaml.safe_load(conn_config)
    conn_model = TestServiceConnectionRequest.parse_obj(conn_spec_dict)
    make_test_connection(conn_model)