prefect-monte-carlo
Welcome!
A collection of Prefect tasks and flows to interact with Monte Carlo from workflows.
Getting Started
Python setup
Requires an installation of Python 3.7+.
We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.
These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the Prefect documentation.
Installation
Install prefect-monte-carlo
with pip
:
pip install prefect-monte-carlo
Then, register this collection's blocks to view them on Prefect Cloud:
prefect block register -m prefect_monte_carlo
Note, to use the load
method on Blocks, you must already have a block document saved through code or saved through the UI.
Write and run a flow
Execute a query against the Monte Carlo GraphQL API
from prefect import flow
from prefect_monte_carlo.graphql import execute_graphql_operation
from prefect_monte_carlo.credentials import MonteCarloCredentials
@flow
def example_execute_query():
monte_carlo_credentials = MonteCarloCredentials.load("my-mc-creds")
result = execute_graphql_operation(
monte_carlo_credentials=monte_carlo_credentials,
operation="query getUser { getUser { email firstName lastName }}",
)
Create or update Monte Carlo lineage
from prefect import flow
from prefect.context import get_run_context
from prefect_monte_carlo.credentials import MonteCarloCredentials
from prefect_monte_carlo.lineage import create_or_update_lineage, MonteCarloLineageNode
@flow
def monte_carlo_orchestrator():
current_flow_run_name = get_run_context().flow_run.name
source = MonteCarloLineageNode(
node_name="source_dataset",
object_id="source_dataset",
object_type="table",
resource_name="some_resource_name",
tags=[{"propertyName": "dataset_owner", "propertyValue": "owner_name"}],
)
destination = MonteCarloLineageNode(
node_name="destination_dataset",
object_id="destination_dataset",
object_type="table",
resource_name="some_resource_name",
tags=[{"propertyName": "dataset_owner", "propertyValue": "owner_name"}],
)
# `create_or_update_lineage` is a flow, so this will be a subflow run
# `extra_tags` are added to both the `source` and `destination` nodes
create_or_update_lineage(
monte_carlo_credentials=MonteCarloCredentials.load("my-mc-creds)
source=source,
destination=destination,
expire_at=datetime.now() + timedelta(days=10),
extra_tags=[{"propertyName": "flow_run_name", "propertyValue": current_flow_run_name}]
)
Conditionally execute a flow based on a Monte Carlo circuit breaker rule
from prefect import flow
from prefect_monte_carlo.circuit_breakers import skip_if_circuit_breaker_flipped
from prefect_monte_carlo.credentials import MonteCarloCredentials
my_mc_creds = MonteCarloCredentials.load("my-mc-creds")
rule_name = "myRule"
@flow
@skip_if_circuit_breaker_flipped(
monte_carlo_credentials=my_mc_creds
rule_name=rule_name,
)
def conditional_flow():
logger = get_run_logger()
logger.info("If you see this, your circuit breaker rule was not breached!")
Resources
If you encounter any bugs while using prefect-monte-carlo
, feel free to open an issue in the prefect-monte-carlo repository.
If you have any questions or issues while using prefect-monte-carlo
, you can find help in either the Prefect Discourse forum or the Prefect Slack community.
Feel free to star or watch prefect-monte-carlo
for updates too!
Contributing
If you'd like to help contribute to fix an issue or add a feature to prefect-monte-carlo
, please propose changes through a pull request from a fork of the repository.
Here are the steps:
- Fork the repository
- Clone the forked repository
- Install the repository and its dependencies:
pip install -e ".[dev]"
- Make desired changes
- Add tests
- Insert an entry to CHANGELOG.md
- Install
pre-commit
to perform quality checks prior to commit:
pre-commit install
git commit
, git push
, and create a pull request