Prefect for Workflow Automation

Prefect automates data engineering workflows. Coiled helps scale Prefect on cloud resources. It’s easy to use both together.

Prefect in a nutshell

Prefect is a popular workflow management system. At it’s core, Prefect has the concept of a Task, Flow, and Executor:

  • A Task is an individual step in a Prefect workflow

  • A Flow is a collection of tasks that represent the entire workflow

  • An Executor is a class that’s responsible for running a Flow

Using Prefect’s DaskExecutor you can run workflows on a Dask cluster, including Coiled clusters.

Prefect + Coiled

The example below uses Prefect and Coiled to read NYC Taxi data and find some of the most generous tippers in historical data. It does this by reading a CSV file on Amazon S3, breaking it into many small pieces (one DataFrame for each hour of data), cleans the data, and then finds rows in the data with exceptional values for tipping and logs those rows.

We highlight a few of the features that Prefect provides:

  1. We intentionally add some random failures into the cleaning process to show off how Prefect can provide automatic retry logic

  2. We return lists of objects to show how Prefect can map over collections of outputs

  3. At the end of the computation we send a report to a public Slack channel (sign up for the Coiled Community Slack and then navigate to the prefect-example channel to see results)

  4. Easy scalability by connecting with a Dask cluster, in this case provided by Coiled.

import datetime
import random

import pandas as pd
import prefect
import s3fs
from prefect.engine.executors import DaskExecutor
from prefect.utilities.notifications import slack_notifier

# Input parameter for CSV filename on Amazon S3
filename = prefect.Parameter(
    name="filename",
    default="s3://nyc-tlc/trip data/yellow_tripdata_2020-01.csv",
)

# Setup for Slack notifications
handler = slack_notifier(
    backend_info=False,
    only_states=[prefect.engine.state.Success, prefect.engine.state.Failed],
)

prefect.context.secrets = {
    "SLACK_WEBHOOK_URL": "https://hooks.slack.com/services/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"
}


# Our normal Prefect flow
@prefect.task
def download(filename):
    # Read taxi data from S3 and emit chunks for each hour of data
    s3 = s3fs.S3FileSystem(anon=True)
    with s3.open(filename) as f:
        df = pd.read_csv(f, nrows=10_000)
    times = pd.DatetimeIndex(df.tpep_pickup_datetime)
    return [chunk for _, chunk in df.groupby([times.hour, times.date])]


@prefect.task(max_retries=5, retry_delay=datetime.timedelta(seconds=2))
def clean(df):
    if random.random() < 0.2:
        raise Exception("Random failure")

    df = df[df.tip_amount > 0]
    return df


@prefect.task
def best(df):
    if not df.empty:
        best_tip_idx = (df.tip_amount / df.total_amount).argmax()
        row = df.iloc[best_tip_idx]
        prefect.context.logger.info("Best row: %s", row)
        return row


with prefect.Flow(
    name="Most Generous NYC Taxi Riders",
    state_handlers=[handler],
) as flow:
    dataframes = download(filename)
    cleaned = clean.map(dataframes)
    best.map(cleaned)


# This section connects Prefect to Coiled
# Comment this out in order to run locally
import coiled

executor = DaskExecutor(
    cluster_class=coiled.Cluster,
    cluster_kwargs={
        "software": "jrbourbeau/prefect",
        "shutdown_on_close": False,
        "name": "prefect-play",
    },
)

flow.run(
    executor=executor,
)

Click here to download the above example script.

How Coiled helps

Coiled comes into play in these lines:

import coiled

executor = DaskExecutor(
    cluster_class=coiled.Cluster,
    cluster_kwargs={
        "software": "jrbourbeau/prefect",
        "shutdown_on_close": False,
        "name": "prefect-play",
    },
)

where we setup a DaskExecutor so Prefect can run our workflow on a coiled.Cluster with the provided arguments. This let’s us easily scale the resources available to Prefect for running our workflow.

Best practices

Software environments

You will want to make sure that the Coiled cluster running on the cloud has Prefect installed, along with any other libraries that you might need in order to complete the work, like Pandas or S3Fs in this case. For more information on, see documentation on constructing software environments.

Reusing clusters

It’s also common to run several flows one after the other. For example we may want to run this same flow on data for many months of data, rather than just the one file in this example.

In this case spinning up a new Coiled cluster for every flow may be cumbersome (it takes about a minute to start a cluster). Instead, we recommend using the shutdown_on_close=False keyword, along with a specific name for your cluster, like "prefect-play" or "production". This tells Coiled that you want to reuse a specific cluster for many different workflows.

Coiled will automatically shutdown your cluster after twenty minutes of inactivity by default. So if you run your flow again after this period that’s ok, you will just have to wait the one minute startup time.