Skip to content

prefect_dask.utils

Utils to use alongside prefect-dask.

Functions

get_async_dask_client async

Yields a temporary asynchronous dask client; this is useful for parallelizing operations on dask collections, such as a dask.DataFrame or dask.Bag.

Without invoking this, workers do not automatically get a client to connect to the full cluster. Therefore, it will attempt perform work within the worker itself serially, and potentially overwhelming the single worker.

Parameters:

Name Type Description Default
timeout Optional[Union[int, float, str, timedelta]]

Timeout after which to error out; has no effect in flow run contexts because the client has already started; Defaults to the distributed.comm.timeouts.connect configuration value.

None
client_kwargs Dict[str, Any]

Additional keyword arguments to pass to distributed.Client, and overwrites inherited keyword arguments from the task runner, if any.

{}

Yields:

Type Description
AsyncGenerator[Client, None]

A temporary asynchronous dask client.

Examples:

Use get_async_dask_client to distribute work across workers.

import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_async_dask_client

@task
async def compute_task():
    async with get_async_dask_client(timeout="120s") as client:
        df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
        summary_df = await client.compute(df.describe())
    return summary_df

@flow(task_runner=DaskTaskRunner())
async def dask_flow():
    prefect_future = await compute_task.submit()
    return await prefect_future.result()

asyncio.run(dask_flow())

Source code in prefect_dask/utils.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@asynccontextmanager
async def get_async_dask_client(
    timeout: Optional[Union[int, float, str, timedelta]] = None,
    **client_kwargs: Dict[str, Any],
) -> AsyncGenerator[Client, None]:
    """
    Yields a temporary asynchronous dask client; this is useful
    for parallelizing operations on dask collections,
    such as a `dask.DataFrame` or `dask.Bag`.

    Without invoking this, workers do not automatically get a client to connect
    to the full cluster. Therefore, it will attempt perform work within the
    worker itself serially, and potentially overwhelming the single worker.

    Args:
        timeout: Timeout after which to error out; has no effect in
            flow run contexts because the client has already started;
            Defaults to the `distributed.comm.timeouts.connect`
            configuration value.
        client_kwargs: Additional keyword arguments to pass to
            `distributed.Client`, and overwrites inherited keyword arguments
            from the task runner, if any.

    Yields:
        A temporary asynchronous dask client.

    Examples:
        Use `get_async_dask_client` to distribute work across workers.
        ```python
        import dask
        from prefect import flow, task
        from prefect_dask import DaskTaskRunner, get_async_dask_client

        @task
        async def compute_task():
            async with get_async_dask_client(timeout="120s") as client:
                df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
                summary_df = await client.compute(df.describe())
            return summary_df

        @flow(task_runner=DaskTaskRunner())
        async def dask_flow():
            prefect_future = await compute_task.submit()
            return await prefect_future.result()

        asyncio.run(dask_flow())
        ```
    """
    client_kwargs = _generate_client_kwargs(
        async_client=True, timeout=timeout, **client_kwargs
    )
    async with Client(**client_kwargs) as client:
        yield client

get_dask_client

Yields a temporary synchronous dask client; this is useful for parallelizing operations on dask collections, such as a dask.DataFrame or dask.Bag.

Without invoking this, workers do not automatically get a client to connect to the full cluster. Therefore, it will attempt perform work within the worker itself serially, and potentially overwhelming the single worker.

When in an async context, we recommend using get_async_dask_client instead.

Parameters:

Name Type Description Default
timeout Optional[Union[int, float, str, timedelta]]

Timeout after which to error out; has no effect in flow run contexts because the client has already started; Defaults to the distributed.comm.timeouts.connect configuration value.

None
client_kwargs Dict[str, Any]

Additional keyword arguments to pass to distributed.Client, and overwrites inherited keyword arguments from the task runner, if any.

{}

Yields:

Type Description
Client

A temporary synchronous dask client.

Examples:

Use get_dask_client to distribute work across workers.

import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client

@task
def compute_task():
    with get_dask_client(timeout="120s") as client:
        df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
        summary_df = client.compute(df.describe()).result()
    return summary_df

@flow(task_runner=DaskTaskRunner())
def dask_flow():
    prefect_future = compute_task.submit()
    return prefect_future.result()

dask_flow()

Source code in prefect_dask/utils.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
@contextmanager
def get_dask_client(
    timeout: Optional[Union[int, float, str, timedelta]] = None,
    **client_kwargs: Dict[str, Any],
) -> Generator[Client, None, None]:
    """
    Yields a temporary synchronous dask client; this is useful
    for parallelizing operations on dask collections,
    such as a `dask.DataFrame` or `dask.Bag`.

    Without invoking this, workers do not automatically get a client to connect
    to the full cluster. Therefore, it will attempt perform work within the
    worker itself serially, and potentially overwhelming the single worker.

    When in an async context, we recommend using `get_async_dask_client` instead.

    Args:
        timeout: Timeout after which to error out; has no effect in
            flow run contexts because the client has already started;
            Defaults to the `distributed.comm.timeouts.connect`
            configuration value.
        client_kwargs: Additional keyword arguments to pass to
            `distributed.Client`, and overwrites inherited keyword arguments
            from the task runner, if any.

    Yields:
        A temporary synchronous dask client.

    Examples:
        Use `get_dask_client` to distribute work across workers.
        ```python
        import dask
        from prefect import flow, task
        from prefect_dask import DaskTaskRunner, get_dask_client

        @task
        def compute_task():
            with get_dask_client(timeout="120s") as client:
                df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
                summary_df = client.compute(df.describe()).result()
            return summary_df

        @flow(task_runner=DaskTaskRunner())
        def dask_flow():
            prefect_future = compute_task.submit()
            return prefect_future.result()

        dask_flow()
        ```
    """
    client_kwargs = _generate_client_kwargs(
        async_client=False, timeout=timeout, **client_kwargs
    )
    with Client(**client_kwargs) as client:
        yield client