Workflow automation with Prefect#

Prefect is a workflow management system you can use to automate your data pipelines. Prefect is built on top of Dask, so you can execute workflows in parallel and use Coiled to execute those workflows on the cloud.

Note

This example uses Prefect 1, if you’re looking for an example using Prefect 2, see our Prefect 2 example. You might also find Prefect’s migration guide helpful.

In this example, we’ll cover two main ways to use Coiled with Prefect:

  1. Use the LocalExecutor to run Dask computations on a Coiled cluster

  2. Use the DaskExecutor to execute Prefect tasks in parallel on a Coiled cluster

Before you start#

You’ll first need to create consistent local and remote software environments with dask, coiled, prefect, and the necessary dependencies installed. If you are unfamiliar with creating software environments, you can first follow the tutorial on setting up a custom software environment.

First, you will install coiled-runtime, a Dask meta-package, and prefect. Save the following file as environment.yml, replacing <x.x.x> with the versions you would like to use. You can get the most up-to-date version of coiled-runtime from the latest tag in the public coiled-runtime repository.

channels:
  - conda-forge
dependencies:
  - prefect=<x.x.x>
  - coiled-runtime=<x.x.x>
  - python=3.9

Next, create a local software environment using the environment.yml file:

$ conda env create -f environment.yml -n prefect-example
$ conda activate prefect-example

Lastly, create a remote software environment using the environment.yml file:

$ coiled env create -n prefect-example --conda environment.yml

Terminology#

In this example, you’ll see three key Prefect terms: Flows, Tasks, Executors. A Prefect flow is a collection of individual steps, or tasks. A task is similar to a function; it accepts arguments, does something with them, and optionally returns a result. You can create a Prefect task using the @task decorator. Executors represent the logic for how and where a Flow should run. The default executor is the LocalExecutor.

Note

For demonstration purposes, these examples use flow.run() to run the Flow. Though this works for simple flows, for running and monitoring many flows, Prefect recommends using Prefect Cloud or Prefect Server (see the Prefect documentation on orchestration).

Using the LocalExecutor#

In this example, you’ll use the default LocalExecutor. This is often the preferred method of scaling Dask computations you have tested locally to the cloud. Prefect will manage running your Flow locally, however, individual tasks relying on Dask will be run on the cloud with Coiled. This can be helpful if only some of your tasks are memory- or compute- bound. You can use a Coiled cluster for a subset of your tasks, then proceed with running subsequent tasks locally.

Conceptual diagram using Coiled to run a Dask cluster selectively for Prefect tasks.

The example below uses Prefect to define three Tasks:

  1. load_data uses Dask to lazily read in a random timeseries of data from dask.datasets.

  2. summarize uses Dask to compute aggregations of multiple columns.

  3. log_summary uses the Prefect logger to log the result of summarize.

import coiled
import dask
import prefect
from dask.distributed import Client
from prefect import Flow, task


@task
def load_data():
    """Load some data"""
    return dask.datasets.timeseries("2000", "2005", partition_freq="2w").persist()


@task
def summarize(df):
    """Compute a summary table"""
    return df.groupby("name").aggregate({"x": "sum", "y": "max"}).compute()


@task
def log_summary(df):
    """Log summary result"""
    logger = prefect.context.get("logger")
    logger.info(df)

The three tasks are put together to build a pipeline using the Flow context:

with Flow(name="timeseries") as flow:
    with coiled.Cluster(
        "prefect-example", n_workers=5, software="prefect-example"
    ) as cluster:
        # These tasks rely on a Coiled cluster to run,
        # so you can create them inside the context manager
        client = Client(cluster)
        df = load_data()
        summary = summarize(df)
    # This task doesn't rely on the Coiled cluster to run
    # so it can be outside the context manager
    log_summary(summary)

# run the flow
flow.run()

# To use with Prefect Cloud or Prefect Server:
# Register the flow under your project
# flow.register(project_name="<project-name>")

The tasks load_data and summarize are computationally expensive, and therefore run from within the Coiled cluster context. The last task log_summary does not require any computation, therefore it is created outside the cluster context.

Now, when you run this flow with flow.run(), load_data and summarize will run your Dask computations the cloud while log_summary will run locally. Click here to download the complete example.

Using the DaskExecutor#

With Prefect’s DaskExecutor, you can run an entire Flow on a Coiled cluster. This is helpful if you have “mapped” tasks, where a task is mapped over an iterable input (see the Prefect documentation on mapping).

Conceptual diagram using Coiled to run an entire Prefect flow.

In this example, you’ll use Prefect to define the following Flow:

from prefect import Flow, task
from prefect.executors import DaskExecutor


@task
def inc(x):
    return x + 1


@task
def double(x):
    return x * 2


@task
def add(x, y):
    return x + y


@task
def sum_all(z):
    return sum(z)


with Flow("inc-double-add-sum") as flow:
    a_range = range(10)
    a = inc.map(a_range)
    b = double.map(a_range)
    c = add.map(x=a, y=b)
    total_sum = sum_all(c)

The inc and double tasks are mapped over inputs, in this case, a sequence of integers 0 through 9. Then, add is mapped over the results from inc and double. Lastly sum_all reduces the sequence of integers to a single value by summing all elements. There is parallelism in these tasks, since inc, double, and add can be evaluated independently.

Now that you have defined the Flow, you can use the DaskExecutor to take advantage of this parallelism:

coiled_executor = DaskExecutor(
    # tell the DaskExecutor to run on Coiled
    cluster_class="coiled.Cluster",
    # Coiled-specific keyword arguments
    cluster_kwargs={
        "n_workers": 5,
        # replace "account-name" with your account name
        "software": "prefect-example",
        # set shutdown_on_close to False to re use the cluster
        "shutdown_on_close": True,
        # name of the cluster, for easy reference
        "name": "prefect-executor",
    },
)

# run the flow
flow.run(executor=coiled_executor)

# To use with Prefect Cloud or Prefect Server:
# Register the flow under your project
# flow.register(project_name="<project-name>")

By setting the cluster_class argument to use “coiled.Cluster”, you are able to use Coiled to run this Flow on the cloud. For demonstration purposes, shutdown_on_close=True, however, in practice you may want to reuse the same cluster across flows (see Reusing clusters). Click here to download the complete example.

Key takeaways#

In the first example, you used Prefect and Coiled to automate your data pipeline workflow and run it on the cloud. You used Prefect’s LocalExecutor to manage the client-side interaction. You used Dask to manage reading in and calculating some summary statistics for a large Parquet dataset of ~84 million rows and Coiled to manage deployment to the cloud.

In the second example, you configured Prefect’s DaskExecutor to run the “inc-double-add-sum” Flow on the cloud using Coiled. By using the DaskExecutor, you were able to take advantage of the parallelism in this Flow using a common “map/reduce” framework.

Next Steps#

You can check out this blog post to see how to use Prefect’s ResourceManager to dynamically select whether to run your tasks locally or in the cloud.

Watch the video tutorial below on using Prefect with Dask and Coiled to see how to take advantage of Prefect’s many features, such as automatically retrying task execution, setting up automatic event notifications via the Slack integration, and monitoring it all with Prefect Cloud.