[!NOTE] Active development of this project has moved within PrefectHQ/prefect. The code can be found here and documentation here. Please open issues and PRs against PrefectHQ/prefect instead of this repository.
prefect-databricks
The prefect-databricks collection makes it easy to coordiante Databricks jobs with other tools in your data stack using Prefect. Check out the examples below to get started!
Getting Started
Integrate with Prefect flows
Using Prefect with Databricks allows you to define and orchestrate complex data workflows that take advantage of the scalability and performance of Databricks.
This can be especially useful for data-intensive tasks such as ETL (extract, transform, load) pipelines, machine learning training and inference, and real-time data processing.
Below is an example of how you can incorporate Databricks notebooks within your Prefect flows.
Be sure to install prefect-databricks and save a credentials block to run the examples below!
If you don't have an existing notebook ready on Databricks, you can copy the following, and name it example.ipynb
. This notebook, accepts a name parameter from the flow and simply prints a message.
name = dbutils.widgets.get("name")
message = f"Don't worry {name}, I got your request! Welcome to prefect-databricks!"
print(message)
Here, the flow launches a new cluster to run example.ipynb
and waits for the completion of the notebook run. Replace the placeholders and run.
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion
from prefect_databricks.models.jobs import (
AutoScale,
AwsAttributes,
JobTaskSettings,
NotebookTask,
NewCluster,
)
@flow
def jobs_runs_submit_flow(block_name: str, notebook_path: str, **base_parameters):
databricks_credentials = DatabricksCredentials.load(block_name)
# specify new cluster settings
aws_attributes = AwsAttributes(
availability="SPOT",
zone_id="us-west-2a",
ebs_volume_type="GENERAL_PURPOSE_SSD",
ebs_volume_count=3,
ebs_volume_size=100,
)
auto_scale = AutoScale(min_workers=1, max_workers=2)
new_cluster = NewCluster(
aws_attributes=aws_attributes,
autoscale=auto_scale,
node_type_id="m4.large",
spark_version="10.4.x-scala2.12",
spark_conf={"spark.speculation": True},
)
# specify notebook to use and parameters to pass
notebook_task = NotebookTask(
notebook_path=notebook_path,
base_parameters=base_parameters,
)
# compile job task settings
job_task_settings = JobTaskSettings(
new_cluster=new_cluster,
notebook_task=notebook_task,
task_key="prefect-task"
)
run = jobs_runs_submit_and_wait_for_completion(
databricks_credentials=databricks_credentials,
run_name="prefect-job",
tasks=[job_task_settings]
)
return run
jobs_runs_submit_flow(
block_name="BLOCK-NAME-PLACEHOLDER"
notebook_path="/Users/<EMAIL_ADDRESS_PLACEHOLDER>/example.ipynb",
name="Marvin"
)
Upon execution, the notebook run should output:
Don't worry Marvin, I got your request! Welcome to prefect-databricks!
Input dictionaries in the place of models
Instead of using the built-in models, you may also input a valid dictionary.
For example, the following are equivalent:
auto_scale=AutoScale(min_workers=1, max_workers=2)
auto_scale={"min_workers": 1, "max_workers": 2}
If you have an existing Databricks job, you can run it using jobs_runs_submit_by_id_and_wait_for_completion
:
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import (
jobs_runs_submit_by_id_and_wait_for_completion,
)
@flow
def existing_job_submit(databricks_credentials_block_name: str, job_id):
databricks_credentials = DatabricksCredentials.load(name=block_name)
run = jobs_runs_submit_by_id_and_wait_for_completion(
databricks_credentials=databricks_credentials, job_id=job_id
)
return run
existing_job_submit(databricks_credentials_block_name="db-creds", job_id="YOUR-JOB-NAME")
Resources
For more tips on how to use tasks and flows in a Collection, check out Using Collections!
Note, the tasks within this collection were created by a code generator using the service's OpenAPI spec.
The service's REST API documentation can be found here.
Installation
Install prefect-databricks
with pip
:
pip install prefect-databricks
Requires an installation of Python 3.7+.
We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.
These tasks are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the Prefect documentation.
Saving Credentials to Block
To use the load
method on Blocks, you must already have a block document saved through code or saved through the UI.
Below is a walkthrough on saving block documents through code; simply create a short script, replacing the placeholders.
- Head over to Databricks.
- Login to your Databricks account and select a workspace.
- On the top right side of the nav bar, click on your account name -> User Settings.
- Click Access tokens -> Generate new token -> Generate and copy the token.
- Note down your Databricks instance from the browser URL, formatted like
https://<DATABRICKS-INSTANCE>.cloud.databricks.com/
- Create a short script, replacing the placeholders.
from prefect_databricks import DatabricksCredentials
credentials = DatabricksCredentials(
databricks_instance="DATABRICKS-INSTANCE-PLACEHOLDER"
token="TOKEN-PLACEHOLDER"
)
connector.save("BLOCK_NAME-PLACEHOLDER")
Congrats! You can now easily load the saved block, which holds your credentials:
from prefect_databricks import DatabricksCredentials
DatabricksCredentials.load("BLOCK_NAME-PLACEHOLDER")
Registering blocks
Register blocks in this module to view and edit them on Prefect Cloud:
prefect block register -m prefect_databricks
Feedback
If you encounter any bugs while using prefect-databricks
, feel free to open an issue in the prefect-databricks repository.
If you have any questions or issues while using prefect-databricks
, you can find help in either the Prefect Discourse forum or the Prefect Slack community.
Feel free to star or watch prefect-databricks
for updates too!
Contributing
If you'd like to help contribute to fix an issue or add a feature to prefect-databricks
, please propose changes through a pull request from a fork of the repository.
Here are the steps:
- Fork the repository
- Clone the forked repository
- Install the repository and its dependencies:
pip install -e ".[dev]"
- Make desired changes
- Add tests
- Insert an entry to CHANGELOG.md
- Install
pre-commit
to perform quality checks prior to commit:pre-commit install
git commit
,git push
, and create a pull request