Skip to content

Examples Catalog

Below is a list of examples for prefect-dask.

Task Runners Module

import time

from prefect import flow, task

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow
def count_to(highest_number):
    for number in range(highest_number):
        shout.submit(number)

if __name__ == "__main__":
    count_to(10)

# outputs
#0
#1
#2
#3
#4
#5
#6
#7
#8
#9

Switching to a DaskTaskRunner:

import time

from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow(task_runner=DaskTaskRunner)
def count_to(highest_number):
    for number in range(highest_number):
        shout.submit(number)

if __name__ == "__main__":
    count_to(10)

# outputs
#3
#7
#2
#6
#4
#0
#1
#5
#8
#9
Using a temporary local dask cluster:
from prefect import flow
from prefect_dask.task_runners import DaskTaskRunner

@flow(task_runner=DaskTaskRunner)
def my_flow():
    ...

Using a temporary cluster running elsewhere. Any Dask cluster class should work, here we use dask-cloudprovider:

DaskTaskRunner(
    cluster_class="dask_cloudprovider.FargateCluster",
    cluster_kwargs={
        "image": "prefecthq/prefect:latest",
        "n_workers": 5,
    },
)

Connecting to an existing dask cluster:

DaskTaskRunner(address="192.0.2.255:8786")

Utils Module

Use get_dask_client to distribute work across workers.

import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client

@task
def compute_task():
    with get_dask_client(timeout="120s") as client:
        df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
        summary_df = client.compute(df.describe()).result()
    return summary_df

@flow(task_runner=DaskTaskRunner())
def dask_flow():
    prefect_future = compute_task.submit()
    return prefect_future.result()

dask_flow()
Use get_async_dask_client to distribute work across workers.
import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_async_dask_client

@task
async def compute_task():
    async with get_async_dask_client(timeout="120s") as client:
        df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
        summary_df = await client.compute(df.describe())
    return summary_df

@flow(task_runner=DaskTaskRunner())
async def dask_flow():
    prefect_future = await compute_task.submit()
    return await prefect_future.result()

asyncio.run(dask_flow())