Workflow automation with Prefect#
Prefect is a workflow management system you can use to automate your data pipelines. Prefect is built on top of Dask, so you can execute workflows in parallel and use Coiled to execute those workflows on the cloud. You can download this jupyter notebook
to follow along.
Note
This example uses Prefect 2.
If you’re looking for an example using Prefect 1, see our Workflow automation with Prefect 1. You might also find Prefect’s migration guide helpful.
Before you start#
1. Create your Python environment#
You’ll first need install the necessary packages:
$ pip install coiled dask prefect prefect-dask
You can install these in a new virtual environment or in whatever environment you’re already using for your project. When you create a cluster, Coiled will automatically replicate your local environment in your cluster (see Package Synchronization).
2. Set up Prefect Cloud#
Coiled works best when used with Prefect Cloud, a coordination-as-a-service platform. You can follow these instructions from Prefect on how to get started for free.
Terminology#
In this example, you’ll see three key Prefect terms: flows, tasks, and task runners. A flow is a collection of individual steps, or tasks. A task is similar to a function; it accepts arguments, does something with them, and optionally returns a result. Task runners are optional and allow you to choose whether Prefect executes tasks sequentially, concurrently, or in parallel.
Using the DaskTaskRunner
#
With Prefect’s DaskTaskRunner, you can use the dask.distributed
scheduler to decide where and when to run your Prefect tasks. In the following example, you’ll define a few functions which will be mapped across a sequence of integers, and at the end you’ll sum the results into a single value.
In the inc_double_add_sum
flow, the inc
and double
tasks are mapped over a_range
. Then, add
is mapped over the results from inc
and double
, and sum_all
reduces the sequence of integers to a single value by summing all elements.
You’ll first define the following tasks with the @task
decorator:
from prefect import flow, task
@task
def inc(x):
return x + 1
@task
def double(x):
return x * 2
@task
def add(x, y):
return x + y
@task
def sum_all(z):
return sum(z)
And combine these tasks into a flow with the @flow
decorator:
from prefect_dask import DaskTaskRunner
coiled_runner = DaskTaskRunner(
cluster_class="coiled.Cluster",
cluster_kwargs={
"name": "prefect-inc-double-add-sum",
},
)
@flow(task_runner=coiled_runner)
def inc_double_add_sum(a_range):
a = inc.map(a_range)
b = double.map(a_range)
c = add.map(x=a, y=b)
return sum_all(c)
By specifying cluster_class="coiled.Cluster"
in the DaskTaskRunner
, you’ll be running these tasks in parallel in the cloud.
Let’s run our flow, mapping the tasks over a sequence of integers 0 through 9:
inc_double_add_sum(range(9))
You should see an output similar to the following:
16:08:03.776 | INFO | prefect.engine - Created flow run 'nebulous-mammoth' for flow 'inc-double-add-sum'
16:08:03.779 | INFO | prefect.task_runner.dask - Creating a new Dask cluster with `coiled._beta.cluster.Cluster`

16:09:16.490 | INFO | prefect.task_runner.dask - The Dask dashboard is available at https://cluster-yoazq.dask.host?token=se1qiHezcg26aJjE
16:09:17.682 | INFO | Flow run 'nebulous-mammoth' - Created task run 'inc-5' for task 'inc'
16:09:18.027 | INFO | Flow run 'nebulous-mammoth' - Submitted task run 'inc-5' for execution.
16:09:18.030 | INFO | Flow run 'nebulous-mammoth' - Created task run 'double-7' for task 'double'
...
16:09:18.862 | INFO | Flow run 'nebulous-mammoth' - Created task run 'sum_all-0' for task 'sum_all'
16:09:18.864 | INFO | Flow run 'nebulous-mammoth' - Executing 'sum_all-0' immediately...
16:09:31.102 | INFO | Task run 'sum_all-0' - Finished in state Completed()
16:09:31.652 | INFO | Flow run 'nebulous-mammoth' - Finished in state Completed()
Cluster options#
We ran this flow with mostly default Coiled cluster options, but you can pass any Coiled cluster keyword argument to cluster_kwargs
(see Creating clusters). For example, to adjust the default behavior of shutting down the cluster once the flow is completed, you can use "shutdown_on_close": False
:
from prefect_dask import DaskTaskRunner
coiled_runner = DaskTaskRunner(
cluster_class="coiled.Cluster",
cluster_kwargs={
"name": "prefect-inc-double-add-sum",
"shutdown_on_close": False,
},
)
You can also connect to an existing cluster by using the same Coiled cluster name or the scheduler address (see Reusing clusters). For example:
from prefect_dask import DaskTaskRunner
coiled_runner = DaskTaskRunner(
address=cluster.scheduler_address,
client_kwargs={"security": cluster.security}
)
Computing Dask Collections#
In some cases, you may want to compute a Dask collection (e.g. Dask DataFrame) from within a Prefect task. You can do this by creating a Coiled cluster from within a Prefect task. For example:
import coiled
import dask
from prefect import flow, get_run_logger, task
@task
def groupby_timeseries():
with coiled.Cluster(name="prefect-task") as cluster:
# connect to Dask client
cluster.get_client()
# load dataset
df = dask.datasets.timeseries(
"2000", "2005", partition_freq="2w"
).persist()
# perform a groupby with an aggregation
return df.groupby("name").aggregate(
{"x": "sum", "y": "max"}
).compute()
@flow
def timeseries():
summary = groupby_timeseries.submit()
logger = get_run_logger()
logger.info(summary.result())
We can then run the timeseries
flow, and groupby_timeseries
will be run with Dask on a Coiled cluster.
timeseries()
You should see an output similar to the following:
16:47:56.388 | INFO | prefect.engine - Created flow run 'incredible-cow' for flow 'timeseries'
16:47:57.551 | INFO | Flow run 'incredible-cow' - Created task run 'groupby_timeseries-0' for task 'groupby_timeseries'
16:47:57.554 | INFO | Flow run 'incredible-cow' - Submitted task run 'groupby_timeseries-0' for execution.
16:49:28.860 | INFO | Task run 'groupby_timeseries-0' - Finished in state Completed()
16:49:28.861 | INFO | Flow run 'incredible-cow' - x y
name
Alice -1396.923700 1.000000
Xavier -333.098275 1.000000
Bob -326.340375 1.000000
Charlie 1153.724366 1.000000
Zelda -1759.322480 0.999998
...
16:49:29.127 | INFO | Flow run 'incredible-cow' - Finished in state Completed('All states completed.')
You’ll also be able to see your Dask computation, including individual Dask tasks within the Prefect task in Coiled web application (see Analytics).

Dask tasks in the Coiled web UI (click to enlarge)#
Next steps#
To learn more about Dask and Coiled, you can watch this demo from PrefectLive.