Schedule and deploy metadata ingestion flows
Schedule your OpenMetadata ingestion flows with Prefect
Ingesting your data via manually executed scripts is great for initial exploration, but in order to build a reliable metadata platform, you need to run those workflows on a regular cadence. That’s where you can leverage Prefect schedules and deployments.
Here is how you can add a DeploymentSpec
to your flow to ensure that your metadata gets refreshed every 15 minutes:
# ingestion_flow.py
from datetime import timedelta
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import SubprocessFlowRunner
from prefect.orion.schemas.schedules import IntervalSchedule
from prefect_openmetadata.flows import ingest_metadata
config = """See previous examples"""
DeploymentSpec(
name="openmetadata-dev",
flow=ingest_metadata,
parameters=dict(config=config),
flow_runner=SubprocessFlowRunner(),
schedule=IntervalSchedule(interval=timedelta(minutes=15)),
)
Here is an explanation of the DeploymentSpec
arguments:
name
- specifies the name of the deployment - you could use it to differentiate between a deployment for development and production environmentflow
- points to the flow object, i.e. the flow function name-
flow_runner
- specifies how the flow run should be deployed; this allows you to deploy the flow run as a docker container, a Kubernetes job, or as a local subprocess - for example, you can deploy it as a subprocess running in a Conda virtual environment named "openmetadata" using the syntax:SubprocessFlowRunner(condaenv="openmetadata")
-
schedule
- allows you to choose and customize your desired schedule class; in this example, we are using a simpleIntervalSchedule
triggering a new flow run every 15 minutes. With the asynchronous scheduling service in Prefect 2.0, you could even schedule your flow to run every 10 seconds if you need your metadata to be always up-to-date.
To deploy this scheduled workflow to Prefect, run the following command from your CLI:
prefect deployment create ingestion_flow.py
Deploy your execution layer to run your flows
So far, we’ve looked at how you can create and schedule your workflow, but where does this code actually run? This is a place where the concepts of storage, work queues, and agents become important. But don’t worry - all you need to know to get started is running one CLI command for each of those concepts.
1) Storage
Storage is used to tell Prefect where your workflow code lives. To configure storage, run:
prefect storage create
The CLI will guide you through the process to select the storage of your choice - to get started you can select the Local Storage and choose some path in your file system. You can then directly select it as your default storage.
2) Work Queue
Work queues collect scheduled runs and agents pick those up from the queue. To create a default work queue, run:
prefect work-queue create default
3) Agent
Agents are lightweight processes that poll their work queues for scheduled runs and execute workflows on the infrastructure you specified on the DeploymentSpec
’s flow_runner
. To create an agent corresponding to the default work queue, run:
prefect agent start default
That’s all you need! Once you have executed those three commands, your scheduled deployments (such as the one we defined using ingestion_flow.py
above) are now scheduled, and Prefect will ensure that your metadata stays up-to-date.
You can observe the state of your metadata ingestion workflows from the Prefect Orion UI. The UI will also include detailed logs showing which metadata got updated to ensure your data platform remains healthy and observable.
Using Prefect 2.0 in the Cloud
If you want to move beyond this local installation, you can deploy Prefect 2.0 to run your OpenMetadata ingestion workflows by:
- self-hosting the orchestration layer - see the list of resources on Prefect Discourse,
- or signing up for Prefect Cloud 2.0 - the following page will walk you through the process.
For various deployment options of OpenMetadata, check the “Deploy” section of this documentation.
Questions about using OpenMetadata with Prefect
If you have any questions about configuring Prefect, post your question on Prefect Discourse or in the Prefect Community Slack.
And if you need support for OpenMetadata, get in touch on OpenMetadata Slack.