Workflow automation with Prefect#
Prefect is a workflow management system you can use to automate your data pipelines. Prefect is built on top of Dask, so you can execute workflows in parallel and use Coiled to execute those workflows on the cloud.
Note
This example uses Prefect 1.
In this example, we’ll cover two main ways to use Coiled with Prefect:
Use the
LocalExecutor
to run Dask computations on a Coiled clusterUse the
DaskExecutor
to execute Prefect tasks in parallel on a Coiled cluster
Before you start#
You’ll first need install the necessary packages, For the purposes of this example, we’ll do this in a new virtual environment, but you could also install them in whatever environment you’re already using for your project.
$ conda create -n prefect-example -c conda-forge python=3.9 prefect coiled dask
$ conda activate prefect-example
You also could use pip, or any other package manager you prefer; conda isn’t required.
When you create a cluster, Coiled will automatically replicate your local prefect-example environment in your cluster (see Package Synchronization).
Terminology#
In this example, you’ll see three key Prefect terms: Flows
, Tasks
, Executors
.
A Prefect flow
is a collection of individual steps, or tasks.
A task is similar to a function; it accepts arguments, does something with them, and
optionally returns a result. You can create a Prefect task using the @task
decorator.
Executors
represent the logic for how and where a Flow
should run. The default executor is the
LocalExecutor
.
Note
For demonstration purposes, these examples use flow.run()
to run the Flow
.
Though this works for simple flows, for running and monitoring many flows, Prefect
recommends using Prefect Cloud or Prefect Server
(see the Prefect documentation on orchestration).
Using the LocalExecutor
#
In this example, you’ll use the default LocalExecutor
.
This is often the preferred method of scaling Dask
computations you have tested locally to the cloud.
Prefect will manage running your Flow
locally, however,
individual tasks relying on Dask will be run on the cloud
with Coiled. This can be helpful if
only some of your tasks are memory- or compute- bound.
You can use a Coiled cluster for a subset of your tasks,
then proceed with running subsequent tasks locally.

The example below uses Prefect to define three Tasks
:
load_data
uses Dask to lazily read in a random timeseries of data fromdask.datasets
.summarize
uses Dask to compute aggregations of multiple columns.log_summary
uses the Prefect logger to log the result ofsummarize
.
import dask
import prefect
from dask.distributed import Client
from prefect import Flow, task
import coiled
@task
def load_data():
"""Load some data"""
return dask.datasets.timeseries("2000", "2005", partition_freq="2w").persist()
@task
def summarize(df):
"""Compute a summary table"""
return df.groupby("name").aggregate({"x": "sum", "y": "max"}).compute()
@task
def log_summary(df):
"""Log summary result"""
logger = prefect.context.get("logger")
The three tasks are put together to build a pipeline using the Flow
context:
with Flow(name="timeseries") as flow:
with coiled.Cluster("prefect-example", n_workers=5) as cluster:
# These tasks rely on a Coiled cluster to run,
# so you can create them inside the context manager
client = Client(cluster)
df = load_data()
summary = summarize(df)
# This task doesn't rely on the Coiled cluster to run
# so it can be outside the context manager
log_summary(summary)
# run the flow
flow.run()
# To use with Prefect Cloud or Prefect Server:
# Register the flow under your project
# flow.register(project_name="<project-name>")
The tasks load_data
and summarize
are computationally expensive, and therefore run from within the Coiled cluster context. The last task log_summary
does not require any computation, therefore it is created outside the cluster context.
Now, when you run this flow with flow.run()
, load_data
and summarize
will
run your Dask computations the cloud while log_summary
will run locally.
Click here
to download the complete example.
Using the DaskExecutor
#
With Prefect’s DaskExecutor,
you can run an entire Flow
on a Coiled cluster. This is helpful
if you have “mapped” tasks, where a task is mapped over an iterable input (see the Prefect documentation on mapping).

In this example, you’ll use Prefect to define the following Flow
:
from prefect import Flow, task
from prefect.executors import DaskExecutor
@task
def inc(x):
return x + 1
@task
def double(x):
return x * 2
@task
def add(x, y):
return x + y
@task
def sum_all(z):
return sum(z)
with Flow("inc-double-add-sum") as flow:
a_range = range(10)
a = inc.map(a_range)
b = double.map(a_range)
c = add.map(x=a, y=b)
total_sum = sum_all(c)
The inc
and double
tasks are mapped over inputs
, in this case, a sequence of integers 0 through 9.
Then, add
is mapped over the results from inc
and double
. Lastly sum_all
reduces the sequence
of integers to a single value by summing all elements. There is parallelism in these tasks, since inc
, double
, and add
can be evaluated independently.
Now that you have defined the Flow
, you can use the DaskExecutor
to take advantage of this parallelism:
coiled_executor = DaskExecutor(
# tell the DaskExecutor to run on Coiled
cluster_class="coiled.Cluster",
# Coiled-specific keyword arguments
cluster_kwargs={
"n_workers": 5,
# replace "account-name" with your account name
"software": "prefect-example",
# set shutdown_on_close to False to re use the cluster
"shutdown_on_close": True,
# name of the cluster, for easy reference
"name": "prefect-executor",
},
)
# run the flow
flow.run(executor=coiled_executor)
# To use with Prefect Cloud or Prefect Server:
# Register the flow under your project
# flow.register(project_name="<project-name>")
By setting the cluster_class
argument to use “coiled.Cluster”, you are able to use Coiled to run this Flow
on the cloud. For demonstration purposes, shutdown_on_close=True
, however, in practice you may want to reuse the same cluster across flows (see Reusing clusters). Click here
to download the complete example.
Key takeaways#
In the first example, you used Prefect and Coiled to automate
your data pipeline workflow and run it on the cloud. You used Prefect’s LocalExecutor
to manage the
client-side interaction. You used Dask to manage reading in
and calculating some summary statistics for a large Parquet dataset
of ~84 million rows and Coiled to manage deployment to the cloud.
In the second example, you configured Prefect’s DaskExecutor
to run
the “inc-double-add-sum” Flow
on the cloud using Coiled. By using
the DaskExecutor
, you were able to take advantage of the parallelism
in this Flow
using a common “map/reduce” framework.
Next Steps#
You can check out this blog post to see how to use Prefect’s ResourceManager to dynamically select whether to run your tasks locally or in the cloud.
Watch the video tutorial below on using Prefect with Dask and Coiled to see how to take advantage of Prefect’s many features, such as automatically retrying task execution, setting up automatic event notifications via the Slack integration, and monitoring it all with Prefect Cloud.