Workflow automation with Prefect 2#

Prefect is a workflow management system you can use to automate your data pipelines. Prefect is built on top of Dask, so you can execute workflows in parallel and use Coiled to execute those workflows on the cloud.

In this example, you’ll learn two main ways to use Coiled with Prefect:

  1. Let Dask determine when and where to run the computation within each task and use Prefect to manage your flow and launch tasks sequentially. This method is recommended when working with Dask collections (e.g. Dask DataFrame).

  2. Execute Prefect tasks with the DaskTaskRunner, using Dask to determine when and where to run all tasks in your flow.

Before you start#

1. Create your Python environment#

You’ll first need to create consistent local and remote software environments with dask, coiled, prefect, and the necessary dependencies installed. If you are unfamiliar with creating software environments, you can first follow the tutorial on setting up a custom software environment.

First, you will install coiled-runtime, a Dask meta-package, and prefect. Save the following file as environment.yml, replacing <x.x.x> with the versions you would like to use. You can get the most up-to-date version of coiled-runtime from the latest tag.

channels:
  - conda-forge
dependencies:
  - coiled-runtime=<x.x.x>
  - python=3.9
  - pip
  - pip:
    - prefect>=2.0
    - prefect-dask>=0.2.1

Next, create a local environment using the environment.yml file:

$ conda env create -f environment.yml -n prefect2-example
$ conda activate prefect2-example

Lastly, create a remote software environment using the environment.yml file:

$ coiled env create -n prefect2-example --conda environment.yml

2. Set up Prefect Cloud#

Coiled works best when used with Prefect Cloud, a coordination-as-a-service platform. You can follow these instructions from Prefect on how to get started for free.

Terminology#

In this example, you’ll see three key Prefect terms: flows, tasks, and task runners. A flow is a collection of individual steps, or tasks. A task is similar to a function; it accepts arguments, does something with them, and optionally returns a result. Task runners are optional and allow you to choose whether Prefect executes tasks sequentially, concurrently, or in parallel.

Working with Dask collections#

In this example, you’ll perform some common data manipulation operations on a Dask DataFrame and log a summary result (download the complete example).

You’ll use Prefect to define three tasks with the @task decorator:

  1. load_data uses Dask to lazily read in a random timeseries of data from dask.datasets.

  2. summarize uses Dask to compute aggregations of multiple columns.

  3. log_summary uses the Prefect logger to log the result of summarize.

import dask
from dask.distributed import Client
from prefect import flow, get_run_logger, task

import coiled


@task
def load_data():
    """Load some data"""
    return dask.datasets.timeseries("2000", "2005", partition_freq="2w").persist()


@task
def summarize(df):
    """Compute a summary table"""
    return df.groupby("name").aggregate({"x": "sum", "y": "max"}).compute()


@task
def log_summary(df):
    """Log summary result"""
    logger = get_run_logger()
    logger.info(df)

The three tasks are put together to build a flow:



@flow
def timeseries_flow():
    df = load_data()
    summary_df = summarize(df)
    log_summary(summary_df)

Now you can run your flow within the Coiled cluster context:



if __name__ == "__main__":

    with coiled.Cluster(
        name="prefect-timeseries", n_workers=5, software="prefect2-example"
    ) as cluster:

You’ve just used Prefect to execute each task in your flow and you let Dask determine the best way to perform individual computations within a Prefect task, i.e. reading in a dataset and calculating some summary statistics. You used Coiled to manage deployment of all this in your cloud provider account.

Executing Prefect tasks with Dask#

With Prefect’s DaskTaskRunner, you can use the dask.distributed scheduler to decide where and when to run your Prefect tasks (download the complete example).

In this example, you’ll define the following tasks with the @task decorator:


@task
def inc(x):
    return x + 1


@task
def double(x):
    return x * 2


@task
def add(x, y):
    return x + y


@task
def sum_all(z):
    return sum(z)

And combine these tasks into a flow with the @flow decorator:


@flow(task_runner=coiled_runner)
def inc_double_add_sum(a_range):
    a = inc.map(a_range)
    b = double.map(a_range)
    c = add.map(x=a, y=b)
    return sum_all(c)

The inc and double tasks are mapped over a_range, in this case, a sequence of integers 0 through 9. Then, add is mapped over the results from inc and double. Lastly sum_all reduces the sequence of integers to a single value by summing all elements. There is parallelism in these tasks, since inc, double, and add can be evaluated independently.

Now that you have defined the flow, you can use the DaskTaskRunner with Coiled to run this flow in the cloud:


coiled_runner = DaskTaskRunner(
    cluster_class="coiled.Cluster",
    cluster_kwargs={
        "n_workers": 5,
        "software": "prefect2-example",
        "name": "prefect-dask-runner",
    },
)

You can also choose to connect to an already existing cluster by using the same Coiled cluster name (see our page on Reusing clusters) or by using the scheduler address. For example:

import coiled
from prefect_dask import DaskTaskRunner

cluster = coiled.Cluster(...)

task_runner = DaskTaskRunner(
    address=cluster.scheduler_address, client_kwargs={"security": cluster.security}
)

Computing Dask collections#

Note

Dask and Prefect both use the term “task” to refer to the concept of work to be done. In this explanation, it’s helpful to conceptualize a Prefect task as containing individual Dask tasks.

In some cases, you may want to use Prefect’s DaskTaskRunner for tasks containing Dask collections. You can do this by submitting Dask tasks to the Dask worker with get_dask_client or get_async_dask_client. See the Prefect documentation on distributing Dask collections across workers for more information. For example (download the complete example):

import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner, get_dask_client


@task
def compute_task():
    with get_dask_client() as client:  # noqa
        df = dask.datasets.timeseries("2000", "2001", partition_freq="4w")
        summary_df = df.describe().compute()
    return summary_df


@flow(
    task_runner=DaskTaskRunner(
        cluster_class="coiled.Cluster",
        cluster_kwargs={
            "name": "",
            "n_workers": 4,
            "software": "prefect2-example",
        },
    )
)
def dask_flow():
    prefect_future = compute_task.submit()
    return prefect_future.result()


if __name__ == "__main__":
    dask_flow()

It’s worth noting get_dask_client is a utility function around distributed.worker_client(), with separate_thread=False. It will invoke a Dask client on a worker and allow you to distribute that work across all workers in your cluster. There is some overhead to using get_dask_client, so it is better used for longer running tasks (see the Dask documentation on submitting tasks from a worker)

Next steps#

Coiled and Prefect and working together to create additional resources using Prefect 2. See this github issue to follow our progress.