Prefect#

Deploy Prefect workflows on the cloud

Prefect is a popular workflow management system that makes it easy to run production jobs. It is easy to use with Coiled.

Example#

This workflow finds all new files in an S3 bucket and processes them in parallel every day.

# workflow.py
import coiled, prefect, s3fs

# Step 1: Find new data files to be processed.
@prefect.task
def list_new_files() -> List[str]:
    return s3fs.S3FileSystem().ls("my-bucket")

# Step 2: Define function to process file.  Specify hardware for function.
@prefect.task
@coiled.function(memory="64 GiB", region="us-east-2")
def process_file(file):
    return ...

# Step 3: Define workflow to process all new files.
@prefect.flow
def workflow():
    files = list_new_files()
    futures = process_file.map(files)  # Process each file in parallel

# Step 4: Run the workflow daily.
if __name__ == "__main__":
    workflow.serve(interval=datetime.timedelta(days=1))  # Run daily

Deploy workflow on Coiled (runs on a small VM)

# python workflow.py                                     # Run locally
coiled prefect serve --region us-east-2 workflow.py      # Run on the cloud

See Schedule Python Jobs with Prefect and Coiled for a real-world example of running a daily job on Coiled.

When to use Prefect#

Prefect’s intuitive API makes writing workflows straightforward, especially for Python developers.

Sometimes you want to run your workflow on a VM in the cloud. This is common for different reasons:

  1. You want an always-on machine

  2. You want to run close to your data

  3. You want specific hardware (e.g. GPU, lots of memory)

  4. You want many machines

Deploy Prefect Workflows#

Deploying Prefect workflows is easy with the coiled prefect serve CLI command. Instead of running python workflow.py to serve your flow locally, run coiled prefect serve workflow.py to serve your flow on the cloud with the infrastructure of your choosing.

# python workflow.py                                     # Runs locally
coiled prefect serve workflow.py                         # Runs on the cloud
# python workflow.py                                     # Runs locally
coiled prefect serve --vm-type t3.medium workflow.py     # Runs on the cloud
# python workflow.py                                     # Runs locally
coiled prefect serve --region us-west-2 workflow.py      # Runs on the cloud

For long-running, or regularly scheduled, jobs we recommend deploying your Prefect workflow on a small, inexpensive instance type (for example, an always-on t3.medium on AWS costs <$1/day) and dispatch specific tasks that need significant resources to separate, ephemeral VMs using Task Resources.

Features#

coiled prefect serve benefits from the standard Coiled features:

  • Easy to use API

  • Creates and manages VMs for you in the cloud

  • Automatically synchronizes your local software

  • Gives access to any cloud hardware in any region

Task Resources#

coiled prefect serve manages running your Prefect flow on the cloud.

Sometimes there are specific tasks within a flow that need to either scale up (large, expensive VM) or scale out (many VMs so you can run many tasks concurrently). For these cases, we recommended using the @coiled.function decorator to mark what resources your task needs. Coiled will then run your task on the cloud infrastructure of your choosing.

See the Serverless Functions documentation for more details about using @coiled.function.

Scaling Up#

To farm off a task to a separate large cloud VM, simply decorate it with @coiled.function, like so:

# workflow.py
import coiled
from prefect import flow, task

@task
@coiled.function(memory="64 GB", region="us-west-2")
def expensive_function(num):
    import time
    time.sleep(30)  # Simulates large-scale work
    return num**2

@flow
def workflow(num):
    squared_num = expensive_function(num)
    print(f"{squared_num = }")

if __name__ == "__main__":
    workflow.serve(
        name="workflow",
        parameters={"num": 2},
    )

This will run expensive_function on a VM with 64 GB of memory in us-west-2.

Parallelism#

For running the same task across many inputs in parallel, use the .map method on @coiled.function-decorated tasks, like so:

# workflow.py
import coiled
from prefect import flow, task

@task
@coiled.function(memory="64 GB", region="us-west-2")
def expensive_function(num):
    import time
    time.sleep(30)  # Simulates large-scale work
    return num**2

@flow
def map_flow(nums):
    squared_nums = expensive_function.map(nums)  # Runs in parallel
    print(f"{list(squared_nums) = }")

if __name__ == "__main__":
    map_flow.serve(
        name="map",
        parameters={"nums": list(range(100))},
    )

This will run expensive_function across all the input values, adaptively scaling out across multiple VMs as needed.

Authentication#

@coiled.function-decorated tasks require the Prefect flow running your tasks to be connected to a Coiled account. If your flow is deployed using coiled prefect serve, or running on a machine where you’ve already run coiled login, then no further setup is needed.

For other deployment configurations we recommend using the Coiled Prefect Block to handle connecting your flow to your Coiled account with the following steps:

  1. Create a Coiled API token.

  2. Store your Coiled API token in Prefect using the coiled.prefect.Credentials block (only needs to be done once).

    from coiled.prefect import Credentials
    Credentials(token="<api-token-here>").save("<block-name>")
    
  3. Authenticate any Prefect flow with your Coiled account.

    from coiled.prefect import Credentials
    from prefect import flow
    
    @flow
    def my_flow():
        Credentials.load("<block-name>").login()
        ...
    

Now any @coiled.function-decorated Prefect tasks your flow uses will be able to connect to your Coiled account.

API#

coiled prefect serve#

Start server for Prefect deployment (using my_flow.serve() API)

coiled prefect serve [OPTIONS] [COMMAND]...

Options

--cluster-name <cluster_name>#

Name of Coiled cluster

--account, --workspace <account>#

Coiled workspace (uses default workspace if not specified). Note: –account is deprecated, please use –workspace instead.

--software <software>#

Software environment name to use. If neither software nor container is specified, all the currently-installed Python packages are replicated on the VM using package sync.

--container <container>#

Container image to use. If neither software nor container is specified, all the currently-installed Python packages are replicated on the VM using package sync.

--vm-type <vm_type>#

VM type to use. Specify multiple times to provide multiple options.

--gpu#

Have a GPU available.

--region <region>#

The cloud provider region in which to run the notebook.

--disk-size <disk_size>#

Use larger-than-default disk on VM, specified in GiB.

-f, --file <file>#

Local files required to run command.

-e, --env <env>#

Environment variables securely transmitted to prefect flow environment. Format is KEY=val, multiple vars can be set with separate –env for each.

-t, --tag <tag>#

Tags. Format is KEY=val, multiple vars can be set with separate –tag for each.

Arguments

COMMAND#

Optional argument(s)