Examples Catalog
Below is a list of examples for prefect-databricks
.
Credentials Module
Gets a Databricks REST AsyncClient.
from prefect import flow
from prefect_databricks import DatabricksCredentials
@flow
def example_get_client_flow():
token = "consumer_key"
databricks_credentials = DatabricksCredentials(token=token)
client = databricks_credentials.get_client()
return client
example_get_client_flow()
Flows Module
Waits for completion on jobs runs.
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_wait_for_completion
@flow
def jobs_runs_wait_for_completion_flow():
databricks_credentials = DatabricksCredentials.load("BLOCK_NAME")
return jobs_runs_wait_for_completion(
multi_task_jobs_run_id=45429,
databricks_credentials=databricks_credentials,
run_name="my_run_name",
max_wait_seconds=1800, # 30 minutes
poll_frequency_seconds=120, # 2 minutes
)
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import (
jobs_runs_submit_by_id_and_wait_for_completion,
)
@flow
def submit_existing_job(block_name: str, job_id):
databricks_credentials = DatabricksCredentials.load(block_name)
run = jobs_runs_submit_by_id_and_wait_for_completion(
databricks_credentials=databricks_credentials, job_id=job_id
)
return run
submit_existing_job(block_name="db-creds", job_id=db_job_id)
Rest Module
Lists jobs on the Databricks instance.
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.rest import execute_endpoint
@flow
def example_execute_endpoint_flow():
endpoint = "/2.1/jobs/list"
databricks_credentials = DatabricksCredentials.load("my-block")
params = {
"limit": 5,
"offset": None,
"expand_tasks": True,
}
response = execute_endpoint(
endpoint,
databricks_credentials,
params=params
)
return response.json()