Workflow automation (Prefect)

Prefect automates data engineering workflows. Coiled helps scale Prefect on cloud resources. It’s easy to use both together.

Prefect in a nutshell

Prefect is a popular workflow management system. At it’s core, Prefect has the concept of a Task, Flow, and Executor:

  • A Task is an individual step in a Prefect workflow

  • A Flow is a collection of tasks that represent the entire workflow

  • An Executor is a class that’s responsible for running a Flow

In the following sections, we’ll explore different ways that Coiled and Prefect can work together. At a more granular level, we’ll call Coiled from one or more Prefect tasks. At a higher level, we’ll configure Coiled to act as a Dask Executor to run all Prefect tasks in a flow.

Using Coiled with Prefect

There are different ways of using Coiled with Prefect. Two common methods are:

  • Calling Coiled from within a Prefect task
    • Prefect tasks execute on existing compute resources until Coiled is needed

    • Coiled cluster only runs when needed

  • Running all Prefect Tasks on Coiled
    • Prefect tasks execute on worker nodes in a Coiled cluster

    • All Prefect tasks within a flow run on a Coiled cluster

We’ll explore these two methods in more detail in the following sections.

Calling Coiled from a Prefect Task

If you only need to call out to Coiled to run some large distributed computations, then you can create a Coiled cluster and run your computations from within a Prefect task, then proceed to subsequent Prefect tasks.

../../_images/coiled-prefect-task.png

The example below defines two Prefect tasks: one task that uses Coiled to handle a large computation, and a second task that takes the result from the first and performs a simpler computation.

import coiled
import dask.dataframe as dd
import prefect
from dask.distributed import Client


# Prefect task that uses Coiled for a large computation
@prefect.task
def transform():
    # Create and connect to Coiled cluster
    cluster = coiled.Cluster(n_workers=10, name="prefect-task")
    client = Client(cluster)
    print("Dashboard:", client.dashboard_link)

    # Read CSV data from S3
    df = dd.read_csv(
        "s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv",
        dtype={
            "payment_type": "UInt8",
            "VendorID": "UInt8",
            "passenger_count": "UInt8",
            "RatecodeID": "UInt8",
        },
        storage_options={"anon": True},
        blocksize="16 MiB",
    ).persist()

    # Compute result
    result = df.groupby("passenger_count").tip_amount.mean().compute()
    return result


# Prefect task without Coiled
@prefect.task()
def clean(series):
    # Filter for 2 or more passengers
    series = series[series.index > 0]
    # Round tip amount to 2 decimal places
    series = round(series, 2)
    return series


with prefect.Flow(
    name="NYC Taxi Riders Tip Amounts",
) as flow:
    series = transform()
    cleaned = clean(series)

flow.run()

Click here to download the above example script.

Coiled comes into play in these lines:

import coiled
import dask.dataframe as dd
from dask.distributed import Client

[...]

cluster = coiled.Cluster(n_workers=10)
client = Client(cluster)

df = dd.read_csv([...]).persist()

[...]

result = df.groupby("passenger_count").tip_amount.mean().compute()

where we create a Coiled cluster, run a large computation, then continue on to subsequent Prefect tasks without Coiled. This lets you easily scale the resources available to specific computations with Prefect tasks when running your workflow.

Running all Prefect Tasks on Coiled

Using Prefect’s DaskExecutor, you can run all tasks from a workflow on a Dask cluster, including Coiled clusters.

../../_images/coiled-prefect-executor.png

The example below uses Prefect and Coiled to read NYC Taxi data and find some of the most generous tippers in historical data. It does this by reading a CSV file on Amazon S3, breaking it into many small pieces (one DataFrame for each hour of data), cleans the data, and then finds rows in the data with exceptional values for tipping and logs those rows.

We highlight a few of the features that Prefect provides:

  1. We intentionally add some random failures into the cleaning process to show off how Prefect can provide automatic retry logic

  2. We return lists of objects to show how Prefect can map over collections of outputs

  3. At the end of the computation we send a report to a public Slack channel (sign up for the Coiled Community Slack and then navigate to the prefect-example channel to see results)

  4. Easy scalability by connecting with a Dask cluster, in this case provided by Coiled.

import datetime
import random

import pandas as pd
import prefect
import s3fs
from prefect.engine.executors import DaskExecutor
from prefect.utilities.notifications import slack_notifier

# Input parameter for CSV filename on Amazon S3
filename = prefect.Parameter(
    name="filename",
    default="s3://nyc-tlc/trip data/yellow_tripdata_2020-01.csv",
)

# Setup for Slack notifications
handler = slack_notifier(
    backend_info=False,
    only_states=[prefect.engine.state.Success, prefect.engine.state.Failed],
)

prefect.context.secrets = {
    "SLACK_WEBHOOK_URL": "https://hooks.slack.com/services/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"
}


# Our normal Prefect flow
@prefect.task
def download(filename):
    # Read taxi data from S3 and emit chunks for each hour of data
    s3 = s3fs.S3FileSystem(anon=True)
    with s3.open(filename) as f:
        df = pd.read_csv(f, nrows=10_000)
    times = pd.DatetimeIndex(df.tpep_pickup_datetime)
    return [chunk for _, chunk in df.groupby([times.hour, times.date])]


@prefect.task(max_retries=5, retry_delay=datetime.timedelta(seconds=2))
def clean(df):
    if random.random() < 0.2:
        raise Exception("Random failure")

    df = df[df.tip_amount > 0]
    return df


@prefect.task
def best(df):
    if not df.empty:
        best_tip_idx = (df.tip_amount / df.total_amount).argmax()
        row = df.iloc[best_tip_idx]
        prefect.context.logger.info("Best row: %s", row)
        return row


with prefect.Flow(
    name="Most Generous NYC Taxi Riders",
    state_handlers=[handler],
) as flow:
    dataframes = download(filename)
    cleaned = clean.map(dataframes)
    best.map(cleaned)


# This section connects Prefect to Coiled
# Comment this out in order to run locally
import coiled

executor = DaskExecutor(
    cluster_class=coiled.Cluster,
    cluster_kwargs={
        "software": "jrbourbeau/prefect",
        "shutdown_on_close": False,
        "name": "prefect-executor",
    },
)

flow.run(
    executor=executor,
)

Click here to download the above example script.

Coiled comes into play in these lines:

import coiled

executor = DaskExecutor(
    cluster_class=coiled.Cluster,
    cluster_kwargs={
        "software": "jrbourbeau/prefect",
        "shutdown_on_close": False,
        "name": "prefect-executor",
    },
)

where we setup a DaskExecutor so Prefect can run our workflow on a coiled.Cluster with the provided arguments. This lets us easily scale the resources available to Prefect for running our workflow.

Best practices

Software environments

You will want to make sure that the Coiled cluster running on the cloud has Prefect installed, along with any other libraries that you might need in order to complete the work, like Pandas or S3Fs in this case. For more information on, see documentation on constructing software environments.

Reusing clusters

It’s also common to run several flows one after the other. For example we may want to run this same flow on data for many months of data, rather than just the one file in this example.

In this case spinning up a new Coiled cluster for every flow may be cumbersome (it takes about a minute to start a cluster). Instead, we recommend using the shutdown_on_close=False keyword, along with a specific name for your cluster, like prefect-task or production. This tells Coiled that you want to reuse a specific cluster for many different workflows.

Idle timeouts

By default, Coiled will automatically shut down your cluster after 20 minutes of inactivity. You can run your Prefect flow again after this period, but you will need to wait for a new Coiled cluster to be provisioned.

Alternatively, you can specify a custom idle timeout when creating a cluster. Refer to the documentation on customizing clusters for more information on how to set idle timeouts.