Serverless Functions#

Run your Python functions in the cloud

Sometimes you just need extra computing resources for key parts of your workloads. This is common for different reasons:

  1. You want to run close to your data

  2. You want a GPU

  3. You want a bigger machine

  4. You want many machines

This is easy with the @coiled.function decorator. Coiled runs your decorated function on the infrastructure of your choosing.

import coiled
import pandas as pd

df = pd.read_csv("data.csv")  # Read local data

@coiled.function()            # This function runs remotely
def process(df):
    df = df[df.name == "Alice"]
    return df

result = process(df)          # Process on remote VM
print(result)
import coiled
import pandas as pd

@coiled.function(
    region="us-west-2",             # Run close to data
)
def process(filename):
    df = pd.read_parquet(filename)  # Read S3 data from EC2
    df = df[df.name == "Alice"]     # Filter data on cloud
    return df                       # Return subset

result = process("s3://my-bucket/data.parquet")  # Runs remotely
print(result)                                    # Runs locally
import coiled

@coiled.function(
    vm_type="p3.2xlarge",           # Run on a GPU instance
)
def train():
    import torch
    device = torch.device("cuda")
    ...
    return model

model = train()                    # Runs remotely
import coiled

@coiled.function(
    memory="512 GB",
    cpu=128
)
def my_func(...):
    ...
import coiled
import pandas as pd

@coiled.function(region="us-west-2")  # Run close to the data
def process(filename):
    output_filename = filename[:-4] + ".parquet"
    df = pd.read_csv(filename)
    df.to_parquet(output_filename)
    return output_filename

# result = process("s3://my-bucket/data.parquet")  # one file

results = process.map(filenames)   # many files in parallel
for filename in results:
    print("Finished", filename)

Use Cases#

Features#

coiled.function() benefits from the standard Coiled features:

  • Easy to use API

  • Creates and manages VMs for you in the cloud

  • Automatically synchronizes your local software and credentials

  • Gives access to any cloud hardware (like GPUs) in any region

  • Auto-scales out if needed

Resources#

It’s easy to customize the hardware and software resources your function needs. You can select any VM type available on your cloud (see VM Size and Type). For example:

@coiled.function(
    memory="256 GiB",            # Bigger VM
    region="us-east-2",          # Specific region
    arm=True,                    # Change architecture
)
def my_func(...):
    ...

Software for @coiled.function works the same as with Coiled clusters. By default, Coiled will automatically synchronize your local software environment. This works well in most cases, but you can also specify an manual software environment or Docker image:

@coiled.function(container="nvcr.io/nvidia/rapidsai/base:23.08-cuda11.8-py3.10")
def my_gpu_func(...):
    ...
@coiled.function(software="my-software-env")
def my_func(...):
    ...

VM Lifecycle#

The first time a @coiled.function decorated function is called, a VM with the specified resources will be created. This initial startup typically takes ~1–2 minutes and then your VM will be available for the rest of the Python session.

By default, if a VM has been idle for 24 hours, it will automatically be shut down. Note that this timeout period is different than other Coiled APIs like coiled.Cluster. You can use the idle_timeout= argument to control the timeout period:

import coiled

@coiled.function(idle_timeout="2 hours")
def train(filepath):
    result = ...
    return result

All cloud resources are automatically deprovisioned and cleaned up on Python interpreter shutdown (controlled by keepalive parameter, see Warm Start).

Parallelism#

By default Coiled Functions will run sequentially, just like normal Python functions. However, they can also easily run in parallel.

Map#

If you want to run your function many times across different inputs in parallel, you can use the .map() method.

@coiled.function()
def simulate(trial: int=0):
    return ...

result = simulate(1)  # run the function once on cloud hardware

results = simulate.map(range(1000))  # Run 1000 times. Returns iterator immediately.
list(results)                        # Wait until done. Retrieve results.

You can also include keyword arguments for inputs that you aren’t mapping over.

@coiled.function()
def simulate(trial: int=0, group: string=""):
    return ...

results = []
results.extend(simulate.map(range(1000), group="A"))  # Run 1000 times for group A,
results.extend(simulate.map(range(1000), group="B"))  # and 1000 times for group B.
list(results)

Submit#

The .submit() method for Coiled Functions returns a Dask Future immediately, while your computation runs in parallel with other futures in the background. You can then call the .result() method on the future to block until the function is done running and retrieve the result.

@coiled.function()
def process(data):
    return ...

futures = []
for filename in filenames:
    future = process.submit(filename)
    futures.append(future)

 results = [f.result() for f in futures]

Dask futures are powerful and flexible, for more information on their full API see the Dask Futures Documentation.

Scaling#

When using .map() and .submit() methods to run Coiled Functions in parallel, there are two ways to specify the number of parallel workers to run on.

  • Adaptive scaling (default) automatically scales the number of VMs up and down depending on your workload (see Adaptive Deployments in the Dask docs for more information). This provide computing resources when needed and avoids idle resources.

  • Manual scaling uses a static number of VMs to run your function.

You can set the minimum and maximum number of worker VMs you want to adaptively scale between by specifying n_workers as a list. By default Coiled Functions will adaptively scale between 0-500 workers (n_workers=[0, 500]).

@coiled.function(n_workers=[10, 300])  # Adaptively scale between 10-300 VMs
def process(data):
    return ...

Set n_workers to an integer for a static number of VMs. Note that with manual scaling, VMs will remain up even during times when no computations are running. You can use the .cluster.scale() method to manually scale the number of VMs as needed.

@coiled.function(n_workers=200)  # Run function across 200 VMs
def process(data):
    return ...

Warm Start#

You can reuse cloud VMs between scripts and interactive runs with the keepalive parameter:

import coiled

@coiled.function(
    ...
    keepalive="5 minutes",
)
def my_function(...):
    return ...

In this example, the VM will stay running for 5 minutes after your Python session closes. This means repeated runs of this function start in about a second rather than a minute or two. Like with Coiled clusters, you can still set idle_timeout, which defaults to 24 hours. This way, you benefit from an already running VM in the short term, while still avoiding costs from an idle VM in the long term.

Retries#

Sometimes transient errors occur when running functions, especially at scale. In these cases simply rerunning the function often helps. You can use the retries= parameter to automatically rerun functions a specified number of times. By default, there are no retries.

import coiled

@coiled.function(memory="128 GB")
def process(data):
    result = ...
    return result

results = process.map(inputs, retries=10)
result = process.submit(x, retries=10)

API#

coiled.function(*, software=None, container=None, vm_type=None, cpu=None, memory=None, gpu=None, account=None, workspace=None, region=None, arm=None, disk_size=None, allow_ingress_from=None, shutdown_on_close=True, spot_policy=None, idle_timeout='24 hours', keepalive='30 seconds', package_sync_ignore=None, environ=None, threads_per_worker=1, local=False, name=None, tags=None, n_workers=None, extra_kwargs=None)[source]

Decorate a function to run on cloud infrastructure

This creates a Function object that executes its code on a remote cluster with the hardware and software specified in the arguments to the decorator. It can run either as a normal function, or it can return Dask Futures for parallel computing.

Parameters:
  • software (Optional[str]) – Name of the software environment to use; this allows you to use and re-use existing Coiled software environments, and should not be used with package sync or when specifying a container to use for this specific cluster.

  • container (Optional[str]) – Name or URI of container image to use; when using a pre-made container image with Coiled, this allows you to skip the step of explicitly creating a Coiled software environment from that image. Note that this should not be used with package sync or when specifying an existing Coiled software environment.

  • vm_type (Union[str, list[str], None]) – Instance type, or list of instance types, that you would like to use. You can use coiled.list_instance_types() to see a list of allowed types.

  • cpu (Union[int, list[int], None]) – Number, or range, of CPUs requested. Specify a range by using a list of two elements, for example: cpu=[2, 8].

  • memory (Union[str, list[str], None]) – Amount of memory to request for each VM, Coiled will use a +/- 10% buffer from the memory that you specify. You may specify a range of memory by using a list of two elements, for example: memory=["2GiB", "4GiB"].

  • disk_size (Union[int, str, None]) – Size of persistent disk attached to each VM instance, specified as string with units or integer for GiB.

  • gpu (Optional[bool]) – Whether to attach a GPU; this would be a single NVIDIA T4.

  • account (Optional[str]) – DEPRECATED. Use workspace instead.

  • workspace (Optional[str]) – The Coiled workspace (previously “account”) to use. If not specified, will check the coiled.workspace or coiled.account configuration values, or will use your default workspace if those aren’t set.

  • region (Optional[str]) – The cloud provider region in which to run the cluster.

  • arm (Optional[bool]) – Whether to use ARM instances for cluster; default is x86 (Intel) instances.

  • keepalive – Keep your cluster running for the specified time, even if your Python session closes. Default is “30 seconds”.

  • spot_policy (Optional[str]) –

    Purchase option to use for workers in your cluster, options are “on-demand”, “spot”, and “spot_with_fallback”; by default this is “spot_with_fallback” for Coiled Functions. (Google Cloud refers to this as “provisioning model” for your instances.) Note that even with this option, the first VM is always on-demand. This only applies to any additional VMs when running Coiled Functions in parallel across multiple VMs with the .map() and .submit() methods. When running on a single VM, an on-demand instance will be used.

    Spot instances are much cheaper, but can have more limited availability and may be terminated while you’re still using them if the cloud provider needs more capacity for other customers. On-demand instances have the best availability and are almost never terminated while still in use, but they’re significantly more expensive than spot instances. For most workloads, “spot_with_fallback” is likely to be a good choice: Coiled will try to get as many spot instances as we can, and if we get less than you requested, we’ll try to get the remaining instances as on-demand. For AWS, when we’re notified that an active spot instance is going to be terminated, we’ll attempt to get a replacement instance (spot if available, but could be on-demand if you’ve enabled “fallback”). Dask on the active instance will attempt a graceful shutdown before the instance is terminated so that computed results won’t be lost.

  • idle_timeout (str) – Shut down the cluster after this duration if no activity has occurred. Default is “24 hours”.

  • package_sync_ignore (Optional[list[str]]) – A list of package names to exclude from the cloud VM environment. This is useful when you have large libraries installed locally that aren’t needed for the function being run. Note the packages listed here may still be installed by another package that depends on them.

  • environ (Optional[Dict[str, str]]) – Dictionary of environment variables to securely pass to the cloud VM environment.

  • threads_per_worker (Optional[int]) – Number of threads to run concurrent tasks in for each VM. -1 can be used to run as many concurrent tasks as there are CPU cores. Default is 1.

  • allow_ingress_from (Optional[str]) – Control the CIDR from which cluster firewall allows ingress to scheduler; by default this is open to any source address (0.0.0.0/0). You can specify CIDR, or “me” for just your IP address.

  • local (bool) – Whether or not to run this function locally or on cloud VMs. If True, this function will be run on your local machine, which can be useful for debugging or during development. Default is False.

  • name (Optional[str]) – Name for the Coiled cluster on which this function will run. If not specified, VM specification parameters like vm_type, disk_size, etc. will be used to produce a unique, deterministic name. Note that name is used for sharing cloud VMs among Coiled Functions with the same hardware and software specification, so please use this parameter with care. Default to None.

  • tags (Optional[Dict[str, str]]) – Dictionary of tags.

  • n_workers (Union[int, List[int], None]) – Number of VMs to provision for parallel function execution. Can either be an integer for a static number of machines, or a list specifying the lower and upper bounds for adaptively scaling up/down machines depending on the amount of work submitted. Defaults to n_workers=[0, 500] which adaptively scales between 0 and 500 machines.

  • extra_kwargs (Optional[dict]) – Dictionary of any additional keyword arguments to pass to coiled.Cluster(). Note that any cluster arguments controlled by other @coiled.function keyword arguments will take precendence over the kwargs in this dictionary.

See the coiled.Cluster docstring for additional parameter descriptions.

Examples

>>> import coiled
>>> @coiled.function()
... def f(x):
...    return x + 1
>>> f(10)  # calling the function blocks until finished
11
>>> f.submit(10)  # immediately returns a future
<Future: pending, key=f-1234>
>>> f.submit(10).result()  # Call .result to get result
11
>>> futures = [f(i) for i in range(1000)]  # parallelize with a for loop
>>> [future.result() for future in futures]
...