Serverless Functions#
Run your Python functions in the cloud
Sometimes you just need extra computing resources for key parts of your workloads. This is common for different reasons:
You want to run close to your data
You want a GPU
You want a bigger machine
You want many machines
This is easy with the @coiled.function
decorator. Coiled runs your decorated function on
the infrastructure of your choosing.
import coiled
import pandas as pd
df = pd.read_csv("data.csv") # Read local data
@coiled.function() # This function runs remotely
def process(df):
df = df[df.name == "Alice"]
return df
result = process(df) # Process on remote VM
print(result)
import coiled
import pandas as pd
@coiled.function(
region="us-west-2", # Run close to data
)
def process(filename):
df = pd.read_parquet(filename) # Read S3 data from EC2
df = df[df.name == "Alice"] # Filter data on cloud
return df # Return subset
result = process("s3://my-bucket/data.parquet") # Runs remotely
print(result) # Runs locally
import coiled
@coiled.function(
vm_type="p3.2xlarge", # Run on a GPU instance
)
def train():
import torch
device = torch.device("cuda")
...
return model
model = train() # Runs remotely
import coiled
@coiled.function(
memory="512 GB",
cpu=128
)
def my_func(...):
...
import coiled
import pandas as pd
@coiled.function(region="us-west-2") # Run close to the data
def process(filename):
output_filename = filename[:-4] + ".parquet"
df = pd.read_csv(filename)
df.to_parquet(output_filename)
return output_filename
# result = process("s3://my-bucket/data.parquet") # one file
results = process.map(filenames) # many files in parallel
for filename in results:
print("Finished", filename)
Use Cases#
Features#
coiled.function()
benefits from the standard Coiled features:
Easy to use API
Creates and manages VMs for you in the cloud
Automatically synchronizes your local software and credentials
Gives access to any cloud hardware (like GPUs) in any region
Auto-scales out if needed
Resources#
It’s easy to customize the hardware and software resources your function needs. You can select any VM type available on your cloud (see VM Size and Type). For example:
@coiled.function(
memory="256 GiB", # Bigger VM
region="us-east-2", # Specific region
arm=True, # Change architecture
)
def my_func(...):
...
Software for @coiled.function
works the same as with Coiled clusters.
By default, Coiled will automatically synchronize your local software
environment. This works well in most cases, but you
can also specify an manual software environment
or Docker image:
@coiled.function(container="nvcr.io/nvidia/rapidsai/base:23.08-cuda11.8-py3.10")
def my_gpu_func(...):
...
@coiled.function(software="my-software-env")
def my_func(...):
...
VM Lifecycle#
The first time a @coiled.function
decorated function is called, a VM with the specified resources will be created.
This initial startup typically takes ~1–2 minutes and then your VM will be available for the rest of the Python session.
By default, if a VM has been idle for 6 hours, it will automatically be shut down.
Note that this timeout period is different than other Coiled APIs like coiled.Cluster
.
You can use the idle_timeout=
argument to control the timeout period:
import coiled
@coiled.function(idle_timeout="2 hours")
def train(filepath):
result = ...
return result
All cloud resources are automatically deprovisioned and cleaned up on Python
interpreter shutdown (controlled by keepalive
parameter, see Warm Start).
Parallelism#
By default Coiled Functions will run sequentially, just like normal Python functions. However, they can also easily run in parallel.
Map#
If you want to run your function many times across different inputs in parallel,
you can use the .map()
method.
@coiled.function()
def simulate(trial: int=0):
return ...
result = simulate(1) # run the function once on cloud hardware
results = simulate.map(range(1000)) # Run 1000 times. Returns iterator immediately.
list(results) # Wait until done. Retrieve results.
You can also include keyword arguments for inputs that you aren’t mapping over.
@coiled.function()
def simulate(trial: int=0, group: string=""):
return ...
results = []
results.extend(simulate.map(range(1000), group="A")) # Run 1000 times for group A,
results.extend(simulate.map(range(1000), group="B")) # and 1000 times for group B.
list(results)
Submit#
The .submit()
method for Coiled Functions returns a Dask Future immediately, while your computation
runs in parallel with other futures in the background. You can then call the .result()
method on
the future to block until the function is done running and retrieve the result.
@coiled.function()
def process(data):
return ...
futures = []
for filename in filenames:
future = process.submit(filename)
futures.append(future)
results = [f.result() for f in futures]
Dask futures are powerful and flexible, for more information on their full API see the Dask Futures Documentation.
Scaling#
When using .map()
and .submit()
methods to run Coiled Functions in parallel,
there are two ways to specify the number of parallel workers to run on.
Adaptive scaling (default) automatically scales the number of VMs up and down depending on your workload (see Adaptive Deployments in the Dask docs for more information). This provide computing resources when needed and avoids idle resources.
Manual scaling uses a static number of VMs to run your function.
You can set the minimum and maximum number of worker VMs you want
to adaptively scale between by specifying n_workers
as a list. By default
Coiled Functions will adaptively scale between 0-500 workers
(n_workers=[0, 500]
).
@coiled.function(n_workers=[10, 300]) # Adaptively scale between 10-300 VMs
def process(data):
return ...
Set n_workers
to an integer for a static number of VMs. Note that
with manual scaling, VMs will remain up even during times when no computations
are running. You can use the .cluster.scale()
method to manually
scale the number of VMs as needed.
@coiled.function(n_workers=200) # Run function across 200 VMs
def process(data):
return ...
Warm Start#
You can reuse cloud VMs between scripts and interactive runs with the keepalive
parameter:
import coiled
@coiled.function(
...
keepalive="5 minutes",
)
def my_function(...):
return ...
In this example, the VM will stay running for 5 minutes after your Python session closes.
This means repeated runs of this function start in about a second rather than a minute or two.
Like with Coiled clusters, you can still set idle_timeout
, which defaults to 6 hours.
This way, you benefit from an already running VM in the short term, while still avoiding
costs from an idle VM in the long term.
Retries#
Sometimes transient errors occur when running functions, especially at scale.
In these cases simply rerunning the function often helps.
You can use the retries=
parameter to automatically rerun functions
a specified number of times. By default, there are no retries.
import coiled
@coiled.function(memory="128 GB")
def process(data):
result = ...
return result
results = process.map(inputs, retries=10)
result = process.submit(x, retries=10)
API#
- coiled.function(*, software=None, container=None, vm_type=None, cpu=None, memory=None, gpu=None, account=None, workspace=None, region=None, arm=None, disk_size=None, allow_ingress_from=None, shutdown_on_close=True, spot_policy=None, idle_timeout='6 hours', keepalive='30 seconds', package_sync_ignore=None, environ=None, threads_per_worker=1, local=False, name=None, tags=None, n_workers=None, extra_kwargs=None)[source]
Decorate a function to run on cloud infrastructure
This creates a
Function
object that executes its code on a remote cluster with the hardware and software specified in the arguments to the decorator. It can run either as a normal function, or it can return Dask Futures for parallel computing.- Parameters:
software (
Optional
[str
]) – Name of the software environment to use; this allows you to use and re-use existing Coiled software environments, and should not be used with package sync or when specifying a container to use for this specific cluster.container (
Optional
[str
]) – Name or URI of container image to use; when using a pre-made container image with Coiled, this allows you to skip the step of explicitly creating a Coiled software environment from that image. Note that this should not be used with package sync or when specifying an existing Coiled software environment.vm_type (
Union
[str
,list
[str
],None
]) – Instance type, or list of instance types, that you would like to use. You can usecoiled.list_instance_types()
to see a list of allowed types.cpu (
Union
[int
,list
[int
],None
]) – Number, or range, of CPUs requested. Specify a range by using a list of two elements, for example:cpu=[2, 8]
.memory (
Union
[str
,list
[str
],None
]) – Amount of memory to request for each VM, Coiled will use a +/- 10% buffer from the memory that you specify. You may specify a range of memory by using a list of two elements, for example:memory=["2GiB", "4GiB"]
.disk_size (
Union
[int
,str
,None
]) – Size of persistent disk attached to each VM instance, specified as string with units or integer for GiB.gpu (
Optional
[bool
]) – Whether to attach a GPU; this would be a single NVIDIA T4.account (
Optional
[str
]) – DEPRECATED. Useworkspace
instead.workspace (
Optional
[str
]) – The Coiled workspace (previously “account”) to use. If not specified, will check thecoiled.workspace
orcoiled.account
configuration values, or will use your default workspace if those aren’t set.region (
Optional
[str
]) – The cloud provider region in which to run the cluster.arm (
Optional
[bool
]) – Whether to use ARM instances for cluster; default is x86 (Intel) instances.keepalive – Keep your cluster running for the specified time, even if your Python session closes. Default is “30 seconds”.
spot_policy (
Optional
[str
]) –Purchase option to use for workers in your cluster, options are “on-demand”, “spot”, and “spot_with_fallback”; by default this is “spot_with_fallback” for Coiled Functions. (Google Cloud refers to this as “provisioning model” for your instances.) Note that even with this option, the first VM is always on-demand. This only applies to any additional VMs when running Coiled Functions in parallel across multiple VMs with the
.map()
and.submit()
methods. When running on a single VM, an on-demand instance will be used.Spot instances are much cheaper, but can have more limited availability and may be terminated while you’re still using them if the cloud provider needs more capacity for other customers. On-demand instances have the best availability and are almost never terminated while still in use, but they’re significantly more expensive than spot instances. For most workloads, “spot_with_fallback” is likely to be a good choice: Coiled will try to get as many spot instances as we can, and if we get less than you requested, we’ll try to get the remaining instances as on-demand. For AWS, when we’re notified that an active spot instance is going to be terminated, we’ll attempt to get a replacement instance (spot if available, but could be on-demand if you’ve enabled “fallback”). Dask on the active instance will attempt a graceful shutdown before the instance is terminated so that computed results won’t be lost.
idle_timeout (
str
) – Shut down the cluster after this duration if no activity has occurred. Default is “6 hours”.package_sync_ignore (
Optional
[list
[str
]]) – A list of package names to exclude from the cloud VM environment. This is useful when you have large libraries installed locally that aren’t needed for the function being run. Note the packages listed here may still be installed by another package that depends on them.environ (
Optional
[Dict
[str
,str
]]) – Dictionary of environment variables to securely pass to the cloud VM environment.threads_per_worker (
Optional
[int
]) – Number of threads to run concurrent tasks in for each VM. -1 can be used to run as many concurrent tasks as there are CPU cores. Default is 1.allow_ingress_from (
Optional
[str
]) – Control the CIDR from which cluster firewall allows ingress to scheduler; by default this is open to any source address (0.0.0.0/0). You can specify CIDR, or “me” for just your IP address.local (
bool
) – Whether or not to run this function locally or on cloud VMs. IfTrue
, this function will be run on your local machine, which can be useful for debugging or during development. Default isFalse
.name (
Optional
[str
]) – Name for the Coiled cluster on which this function will run. If not specified, VM specification parameters likevm_type
,disk_size
, etc. will be used to produce a unique, deterministic name. Note thatname
is used for sharing cloud VMs among Coiled Functions with the same hardware and software specification, so please use this parameter with care. Default toNone
.tags (
Optional
[Dict
[str
,str
]]) – Dictionary of tags.n_workers (
Union
[int
,List
[int
],None
]) – Number of VMs to provision for parallel function execution. Can either be an integer for a static number of machines, or a list specifying the lower and upper bounds for adaptively scaling up/down machines depending on the amount of work submitted. Defaults ton_workers=[0, 500]
which adaptively scales between 0 and 500 machines.extra_kwargs (
Optional
[dict
]) – Dictionary of any additional keyword arguments to pass tocoiled.Cluster()
. Note that any cluster arguments controlled by other@coiled.function
keyword arguments will take precendence over the kwargs in this dictionary.
See the
coiled.Cluster
docstring for additional parameter descriptions.Examples
>>> import coiled >>> @coiled.function() ... def f(x): ... return x + 1
>>> f(10) # calling the function blocks until finished 11 >>> f.submit(10) # immediately returns a future <Future: pending, key=f-1234> >>> f.submit(10).result() # Call .result to get result 11
>>> futures = [f(i) for i in range(1000)] # parallelize with a for loop >>> [future.result() for future in futures] ...