Prefect#

Prefect is a workflow management system you can use to automate your data pipelines. Prefect is built on top of Dask, so you can execute workflows in parallel and use Coiled to execute those workflows on the cloud. You can download this jupyter notebook to follow along.

Note

This example uses Prefect 2.

If you’re looking for an example using Prefect 1, see our Workflow automation with Prefect 1. You might also find Prefect’s migration guide helpful.

Before you start#

1. Create your Python environment#

You’ll first need install the necessary packages:

$ conda install -c conda-forge coiled dask prefect prefect-dask

You can install these in a new virtual environment or in whatever environment you’re already using for your project. When you create a cluster, Coiled will automatically replicate your local environment in your cluster (see Manage Software).

2. Set up Prefect Cloud#

Coiled works best when used with Prefect Cloud, a coordination-as-a-service platform. You can follow these instructions from Prefect on how to get started for free.

Terminology#

In this example, you’ll see three key Prefect terms: flows, tasks, and task runners. A flow is a collection of individual steps, or tasks. A task is similar to a function; it accepts arguments, does something with them, and optionally returns a result. Task runners are optional and allow you to choose whether Prefect executes tasks sequentially, concurrently, or in parallel.

Using the DaskTaskRunner#

With Prefect’s DaskTaskRunner, you can use the dask.distributed scheduler to decide where and when to run your Prefect tasks. In the following example, you’ll define a few functions which will be mapped across a sequence of integers, and at the end you’ll sum the results into a single value.

In the inc_double_add_sum flow, the inc and double tasks are mapped over a_range. Then, add is mapped over the results from inc and double, and sum_all reduces the sequence of integers to a single value by summing all elements.

You’ll first define the following tasks with the @task decorator:

from prefect import flow, task


@task
def inc(x):
    return x + 1


@task
def double(x):
    return x * 2


@task
def add(x, y):
    return x + y


@task
def sum_all(z):
    return sum(z)

And combine these tasks into a flow with the @flow decorator:

from prefect_dask import DaskTaskRunner


coiled_runner = DaskTaskRunner(
    cluster_class="coiled.Cluster",
    cluster_kwargs={
        "name": "prefect-inc-double-add-sum",
    },
)

@flow(task_runner=coiled_runner)
def inc_double_add_sum(a_range):
    a = inc.map(a_range)
    b = double.map(a_range)
    c = add.map(x=a, y=b)
    return sum_all(c)

By specifying cluster_class="coiled.Cluster" in the DaskTaskRunner, you’ll be running these tasks in parallel in the cloud.

Let’s run our flow, mapping the tasks over a sequence of integers 0 through 9:

inc_double_add_sum(range(9))

You should see an output similar to the following:

16:08:03.776 | INFO    | prefect.engine - Created flow run 'nebulous-mammoth' for flow 'inc-double-add-sum'
16:08:03.779 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `coiled.v2.cluster.Cluster`
Coiled cluster widget with two panels for package sync (top) and cluster status (bottom). Package sync panel shows time elapsed and progress bars for fetching latest package priorities, scanning 16 conda packages, scanning 144 python packages, running pip check, validating environment, and requesting package sync build. Cluster panel has a link to the cluster details, cluster name, scheduler dashboard link, region, instance types, and number of workers.
16:09:16.490 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at https://cluster-yoazq.dask.host?token=se1qiHezcg26aJjE
16:09:17.682 | INFO    | Flow run 'nebulous-mammoth' - Created task run 'inc-5' for task 'inc'
16:09:18.027 | INFO    | Flow run 'nebulous-mammoth' - Submitted task run 'inc-5' for execution.
16:09:18.030 | INFO    | Flow run 'nebulous-mammoth' - Created task run 'double-7' for task 'double'
...
16:09:18.862 | INFO    | Flow run 'nebulous-mammoth' - Created task run 'sum_all-0' for task 'sum_all'
16:09:18.864 | INFO    | Flow run 'nebulous-mammoth' - Executing 'sum_all-0' immediately...
16:09:31.102 | INFO    | Task run 'sum_all-0' - Finished in state Completed()
16:09:31.652 | INFO    | Flow run 'nebulous-mammoth' - Finished in state Completed()

Cluster options#

We ran this flow with mostly default Coiled cluster options, but you can pass any Coiled cluster keyword argument to cluster_kwargs (see Manage Hardware). For example, to adjust the default behavior of shutting down the cluster once the flow is completed, you can use "shutdown_on_close": False:

from prefect_dask import DaskTaskRunner

coiled_runner = DaskTaskRunner(
    cluster_class="coiled.Cluster",
    cluster_kwargs={
        "name": "prefect-inc-double-add-sum",
        "shutdown_on_close": False,
    },
)

You can also connect to an existing cluster by using the same Coiled cluster name or the scheduler address (see Reuse). For example:

from prefect_dask import DaskTaskRunner

coiled_runner = DaskTaskRunner(
    address=cluster.scheduler_address,
    client_kwargs={"security": cluster.security}
)

Computing Dask Collections#

In some cases, you may want to compute a Dask collection (e.g. Dask DataFrame) from within a Prefect task. You can do this by creating a Coiled cluster from within a Prefect task. For example:

import coiled
import dask
from prefect import flow, get_run_logger, task


@task
def groupby_timeseries():
    with coiled.Cluster(name="prefect-task") as cluster:

        # connect to Dask client
        cluster.get_client()

        # load dataset
        df = dask.datasets.timeseries(
            "2000", "2005", partition_freq="2w"
        ).persist()

        # perform a groupby with an aggregation
        return df.groupby("name").aggregate(
            {"x": "sum", "y": "max"}
        ).compute()


@flow
def timeseries():
    summary = groupby_timeseries.submit()
    logger = get_run_logger()
    logger.info(summary.result())

We can then run the timeseries flow, and groupby_timeseries will be run with Dask on a Coiled cluster.

timeseries()

You should see an output similar to the following:

16:47:56.388 | INFO    | prefect.engine - Created flow run 'incredible-cow' for flow 'timeseries'
16:47:57.551 | INFO    | Flow run 'incredible-cow' - Created task run 'groupby_timeseries-0' for task 'groupby_timeseries'
16:47:57.554 | INFO    | Flow run 'incredible-cow' - Submitted task run 'groupby_timeseries-0' for execution.
16:49:28.860 | INFO    | Task run 'groupby_timeseries-0' - Finished in state Completed()
16:49:28.861 | INFO    | Flow run 'incredible-cow' -                     x         y
name
Alice    -1396.923700  1.000000
Xavier    -333.098275  1.000000
Bob       -326.340375  1.000000
Charlie   1153.724366  1.000000
Zelda    -1759.322480  0.999998
...
16:49:29.127 | INFO    | Flow run 'incredible-cow' - Finished in state Completed('All states completed.')

You’ll also be able to see your Dask computation, including individual Dask tasks within the Prefect task in Coiled web application .

../../../_images/prefect-task.png

Dask tasks in the Coiled web UI (click to enlarge)#

Next steps#

To learn more about Dask and Coiled, you can watch this demo from PrefectLive.