Workflow automation with Prefect 2#
Prefect is a workflow management system you can use to automate your data pipelines. Prefect is built on top of Dask, so you can execute workflows in parallel and use Coiled to execute those workflows on the cloud.
In this example, you’ll learn two main ways to use Coiled with Prefect:
Let Dask determine when and where to run the computation within each task and use Prefect to manage your flow and launch tasks sequentially. This method is recommended when working with Dask collections (e.g. Dask DataFrame).
Execute Prefect tasks with the
DaskTaskRunner
, using Dask to determine when and where to run all tasks in your flow.
Before you start#
1. Create your Python environment#
You’ll first need to create consistent local and remote software environments
with dask
, coiled
, prefect
, and the necessary dependencies installed.
If you are unfamiliar with creating software environments, you can first
follow the
tutorial on setting up a custom software environment.
First, you will install coiled-runtime, a Dask meta-package, and prefect
.
Save the following file as environment.yml
, replacing <x.x.x>
with the versions
you would like to use. You can get the most up-to-date version of coiled-runtime from the
latest tag.
channels:
- conda-forge
dependencies:
- coiled-runtime=<x.x.x>
- python=3.9
- pip
- pip:
- prefect>=2.0
- prefect-dask>=0.2.1
Next, create a local environment using the environment.yml
file:
$ conda env create -f environment.yml -n prefect2-example
$ conda activate prefect2-example
Lastly, create a remote software environment using the environment.yml
file:
$ coiled env create -n prefect2-example --conda environment.yml
2. Set up Prefect Cloud#
Coiled works best when used with Prefect Cloud, a coordination-as-a-service platform. You can follow these instructions from Prefect on how to get started for free.
Terminology#
In this example, you’ll see three key Prefect terms: flows
, tasks
, and task runners
.
A flow
is a collection of individual steps, or tasks.
A task is similar to a function; it accepts arguments, does something with them, and
optionally returns a result. Task runners are optional and allow you to choose whether Prefect executes tasks sequentially, concurrently, or in parallel.
Working with Dask collections#
In this example, you’ll perform some common data manipulation operations on a Dask DataFrame and log a summary result (download the complete example
).
You’ll use Prefect to define three tasks
with the @task
decorator:
load_data
uses Dask to lazily read in a random timeseries of data fromdask.datasets
.summarize
uses Dask to compute aggregations of multiple columns.log_summary
uses the Prefect logger to log the result ofsummarize
.
import dask
from dask.distributed import Client
from prefect import flow, get_run_logger, task
import coiled
@task
def load_data():
"""Load some data"""
return dask.datasets.timeseries("2000", "2005", partition_freq="2w").persist()
@task
def summarize(df):
"""Compute a summary table"""
return df.groupby("name").aggregate({"x": "sum", "y": "max"}).compute()
@task
def log_summary(df):
"""Log summary result"""
logger = get_run_logger()
logger.info(df)
The three tasks are put together to build a flow
:
@flow
def timeseries_flow():
df = load_data()
summary_df = summarize(df)
log_summary(summary_df)
Now you can run your flow
within the Coiled cluster context:
if __name__ == "__main__":
with coiled.Cluster(
name="prefect-timeseries", n_workers=5, software="prefect2-example"
) as cluster:
You’ve just used Prefect to execute each task
in your flow
and you let Dask determine the best way to perform individual computations within a Prefect task
, i.e. reading in a dataset and calculating some summary statistics. You used Coiled to manage deployment of all this in your cloud provider account.
Executing Prefect tasks with Dask#
With Prefect’s DaskTaskRunner, you can use the dask.distributed
scheduler to decide where and when to run your Prefect tasks
(download the complete example
).
In this example, you’ll define the following tasks
with the @task
decorator:
@task
def inc(x):
return x + 1
@task
def double(x):
return x * 2
@task
def add(x, y):
return x + y
@task
def sum_all(z):
return sum(z)
And combine these tasks
into a flow
with the @flow
decorator:
@flow(task_runner=coiled_runner)
def inc_double_add_sum(a_range):
a = inc.map(a_range)
b = double.map(a_range)
c = add.map(x=a, y=b)
return sum_all(c)
The inc
and double
tasks are mapped over a_range
, in this case, a sequence of integers 0 through 9.
Then, add
is mapped over the results from inc
and double
. Lastly sum_all
reduces the sequence
of integers to a single value by summing all elements. There is parallelism in these tasks, since inc
, double
, and add
can be evaluated independently.
Now that you have defined the flow
, you can use the DaskTaskRunner
with Coiled to run this flow in the cloud:
coiled_runner = DaskTaskRunner(
cluster_class="coiled.Cluster",
cluster_kwargs={
"n_workers": 5,
"software": "prefect2-example",
"name": "prefect-dask-runner",
},
)
You can also choose to connect to an already existing cluster by using the same Coiled cluster name (see our page on Reusing clusters) or by using the scheduler address. For example:
import coiled
from prefect_dask import DaskTaskRunner
cluster = coiled.Cluster(...)
task_runner = DaskTaskRunner(
address=cluster.scheduler_address, client_kwargs={"security": cluster.security}
)
Computing Dask collections#
Note
Dask and Prefect both use the term “task” to refer to the concept of work to be done. In this explanation, it’s helpful to conceptualize a Prefect task as containing individual Dask tasks.
In some cases, you may want to use Prefect’s DaskTaskRunner
for tasks containing Dask collections. You can do this by submitting Dask tasks to the Dask worker with get_dask_client
or get_async_dask_client
. See the Prefect documentation on distributing Dask collections across workers for more information. For example (download the complete example
):
import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client
@task
def compute_task():
with get_dask_client() as client: # noqa
df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
summary_df = df.describe().compute()
return summary_df
@flow(
task_runner=DaskTaskRunner(
cluster_class="coiled.Cluster",
cluster_kwargs={
"name": "",
"n_workers": 4,
"software": "prefect2-example",
},
)
)
def dask_flow():
prefect_future = compute_task.submit()
return prefect_future.result()
if __name__ == "__main__":
dask_flow()
It’s worth noting get_dask_client
is a utility function around distributed.worker_client()
, with separate_thread=False
. It will invoke a Dask client on a worker and allow you to distribute that work across all workers in your cluster. There is some overhead to using get_dask_client
, so it is better used for longer running tasks (see the Dask documentation on submitting tasks from a worker)
Next steps#
Coiled and Prefect and working together to create additional resources using Prefect 2. See this github issue to follow our progress.