Skip to content

Examples Catalog

Below is a list of examples for prefect-databricks.

Credentials Module

Gets a Databricks REST AsyncClient.

from prefect import flow
from prefect_databricks import DatabricksCredentials

@flow
def example_get_client_flow():
    token = "consumer_key"
    databricks_credentials = DatabricksCredentials(token=token)
    client = databricks_credentials.get_client()
    return client

example_get_client_flow()

Flows Module

Waits for completion on jobs runs.

from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_wait_for_completion

@flow
def jobs_runs_wait_for_completion_flow():
    databricks_credentials = DatabricksCredentials.load("BLOCK_NAME")
    return jobs_runs_wait_for_completion(
        multi_task_jobs_run_id=45429,
        databricks_credentials=databricks_credentials,
        run_name="my_run_name",
        max_wait_seconds=1800,  # 30 minutes
        poll_frequency_seconds=120,  # 2 minutes
    )
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import (
    jobs_runs_submit_by_id_and_wait_for_completion,
)


@flow
def submit_existing_job(block_name: str, job_id):
    databricks_credentials = DatabricksCredentials.load(block_name)

    run = jobs_runs_submit_by_id_and_wait_for_completion(
        databricks_credentials=databricks_credentials, job_id=job_id
    )

    return run


submit_existing_job(block_name="db-creds", job_id=db_job_id)

Rest Module

Lists jobs on the Databricks instance.

from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.rest import execute_endpoint
@flow
def example_execute_endpoint_flow():
    endpoint = "/2.1/jobs/list"
    databricks_credentials = DatabricksCredentials.load("my-block")
    params = {
        "limit": 5,
        "offset": None,
        "expand_tasks": True,
    }
    response = execute_endpoint(
        endpoint,
        databricks_credentials,
        params=params
    )
    return response.json()