Prefect#
Deploy Prefect workflows on the cloud
Prefect is a popular workflow management system that makes it easy to run production jobs. It is easy to use with Coiled.
Example#
This workflow finds all new files in an S3 bucket and processes them in parallel every day.
# workflow.py
import coiled, prefect, s3fs
# Step 1: Find new data files to be processed.
@prefect.task
def list_new_files() -> List[str]:
return s3fs.S3FileSystem().ls("my-bucket")
# Step 2: Define function to process file. Specify hardware for function.
@prefect.task
@coiled.function(memory="64 GiB", region="us-east-2")
def process_file(file):
return ...
# Step 3: Define workflow to process all new files.
@prefect.flow
def workflow():
files = list_new_files()
futures = process_file.map(files) # Process each file in parallel
# Step 4: Run the workflow daily.
if __name__ == "__main__":
workflow.serve(interval=datetime.timedelta(days=1)) # Run daily
Deploy workflow on Coiled (runs on a small VM)
# python workflow.py # Run locally
coiled prefect serve --region us-east-2 workflow.py # Run on the cloud
See Schedule Python Jobs with Prefect and Coiled for a real-world example of running a daily job on Coiled.
When to use Prefect#
Prefect’s intuitive API makes writing workflows straightforward, especially for Python developers.
Sometimes you want to run your workflow on a VM in the cloud. This is common for different reasons:
You want an always-on machine
You want to run close to your data
You want specific hardware (e.g. GPU, lots of memory)
You want many machines
Deploy Prefect Workflows#
Deploying Prefect workflows is easy with the coiled prefect serve
CLI command.
Instead of running python workflow.py
to serve your flow locally,
run coiled prefect serve workflow.py
to serve your flow on the cloud with the
infrastructure of your choosing.
# python workflow.py # Runs locally
coiled prefect serve workflow.py # Runs on the cloud
# python workflow.py # Runs locally
coiled prefect serve --vm-type t3.medium workflow.py # Runs on the cloud
# python workflow.py # Runs locally
coiled prefect serve --region us-west-2 workflow.py # Runs on the cloud
For long-running, or regularly scheduled, jobs we recommend deploying your Prefect workflow
on a small, inexpensive instance type (for example, an always-on t3.medium
on AWS costs <$1/day)
and dispatch specific tasks that need significant resources to separate, ephemeral VMs using
Task Resources.
Features#
coiled prefect serve
benefits from the standard Coiled features:
Easy to use API
Creates and manages VMs for you in the cloud
Automatically synchronizes your local software
Gives access to any cloud hardware in any region
Task Resources#
coiled prefect serve
manages running your Prefect flow on the cloud.
Sometimes there are specific tasks within a flow that need to either scale up (large, expensive VM)
or scale out (many VMs so you can run many tasks concurrently). For these cases, we recommended using the
@coiled.function
decorator to mark what resources your task needs. Coiled will then run your task
on the cloud infrastructure of your choosing.
See the Serverless Functions documentation for more details about using @coiled.function
.
Scaling Up#
To farm off a task to a separate large cloud VM, simply decorate it with @coiled.function
, like so:
# workflow.py
import coiled
from prefect import flow, task
@task
@coiled.function(memory="64 GB", region="us-west-2")
def expensive_function(num):
import time
time.sleep(30) # Simulates large-scale work
return num**2
@flow
def workflow(num):
squared_num = expensive_function(num)
print(f"{squared_num = }")
if __name__ == "__main__":
workflow.serve(
name="workflow",
parameters={"num": 2},
)
This will run expensive_function
on a VM with 64 GB of memory in us-west-2
.
Parallelism#
For running the same task across many inputs in parallel, use the .map
method on
@coiled.function
-decorated tasks, like so:
# workflow.py
import coiled
from prefect import flow, task
@task
@coiled.function(memory="64 GB", region="us-west-2")
def expensive_function(num):
import time
time.sleep(30) # Simulates large-scale work
return num**2
@flow
def map_flow(nums):
squared_nums = expensive_function.map(nums) # Runs in parallel
print(f"{list(squared_nums) = }")
if __name__ == "__main__":
map_flow.serve(
name="map",
parameters={"nums": list(range(100))},
)
This will run expensive_function
across all the input values, adaptively scaling out across
multiple VMs as needed.
Authentication#
@coiled.function
-decorated tasks require the Prefect flow running your tasks
to be connected to a Coiled account. If your flow is deployed using coiled prefect serve
,
or running on a machine where you’ve already run coiled login
,
then no further setup is needed.
For other deployment configurations we recommend using the Coiled Prefect Block to handle connecting your flow to your Coiled account with the following steps:
Store your Coiled API token in Prefect using the
coiled.prefect.Credentials
block (only needs to be done once).from coiled.prefect import Credentials Credentials(token="<api-token-here>").save("<block-name>")
Authenticate any Prefect flow with your Coiled account.
from coiled.prefect import Credentials from prefect import flow @flow def my_flow(): Credentials.load("<block-name>").login() ...
Now any @coiled.function
-decorated Prefect tasks your flow uses
will be able to connect to your Coiled account.
API#
coiled prefect serve#
Start server for Prefect deployment (using my_flow.serve()
API)
coiled prefect serve [OPTIONS] [COMMAND]...
Options
- --cluster-name <cluster_name>#
Name of Coiled cluster
- --account, --workspace <account>#
Coiled workspace (uses default workspace if not specified). Note: –account is deprecated, please use –workspace instead.
- --software <software>#
Software environment name to use. If neither software nor container is specified, all the currently-installed Python packages are replicated on the VM using package sync.
- --container <container>#
Container image to use. If neither software nor container is specified, all the currently-installed Python packages are replicated on the VM using package sync.
- --vm-type <vm_type>#
VM type to use. Specify multiple times to provide multiple options.
- --gpu#
Have a GPU available.
- --region <region>#
The cloud provider region in which to run the notebook.
- --disk-size <disk_size>#
Use larger-than-default disk on VM, specified in GiB.
- -f, --file <file>#
Local files required to run command.
- -e, --env <env>#
Environment variables securely transmitted to prefect flow environment. Format is KEY=val, multiple vars can be set with separate –env for each.
- -t, --tag <tag>#
Tags. Format is KEY=val, multiple vars can be set with separate –tag for each.
Arguments
- COMMAND#
Optional argument(s)