Examples Catalog
Below is a list of examples for prefect-dask
.
Task Runners Module
import time
from prefect import flow, task
@task
def shout(number):
time.sleep(0.5)
print(f"#{number}")
@flow
def count_to(highest_number):
for number in range(highest_number):
shout.submit(number)
if __name__ == "__main__":
count_to(10)
# outputs
#0
#1
#2
#3
#4
#5
#6
#7
#8
#9
Switching to a DaskTaskRunner
:
import time
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def shout(number):
time.sleep(0.5)
print(f"#{number}")
@flow(task_runner=DaskTaskRunner)
def count_to(highest_number):
for number in range(highest_number):
shout.submit(number)
if __name__ == "__main__":
count_to(10)
# outputs
#3
#7
#2
#6
#4
#0
#1
#5
#8
#9
from prefect import flow
from prefect_dask.task_runners import DaskTaskRunner
@flow(task_runner=DaskTaskRunner)
def my_flow():
...
Using a temporary cluster running elsewhere. Any Dask cluster class should work, here we use dask-cloudprovider:
DaskTaskRunner(
cluster_class="dask_cloudprovider.FargateCluster",
cluster_kwargs={
"image": "prefecthq/prefect:latest",
"n_workers": 5,
},
)
Connecting to an existing dask cluster:
DaskTaskRunner(address="192.0.2.255:8786")
Utils Module
Use get_dask_client
to distribute work across workers.
import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client
@task
def compute_task():
with get_dask_client(timeout="120s") as client:
df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
summary_df = client.compute(df.describe()).result()
return summary_df
@flow(task_runner=DaskTaskRunner())
def dask_flow():
prefect_future = compute_task.submit()
return prefect_future.result()
dask_flow()
get_async_dask_client
to distribute work across workers.
import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_async_dask_client
@task
async def compute_task():
async with get_async_dask_client(timeout="120s") as client:
df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
summary_df = await client.compute(df.describe())
return summary_df
@flow(task_runner=DaskTaskRunner())
async def dask_flow():
prefect_future = await compute_task.submit()
return await prefect_future.result()
asyncio.run(dask_flow())