AWS Lambda#
In this guide, you’ll learn how to use AWS Lambda as a Dask Client to offload compute work onto an existing Dask cluster.
This is a simplified version of a more complete example AWS Stack written in CDK available here
Why though?#
It’s not uncommon, nor absurd, to use Lambda to perform compute operations. This could range from processing new files landing in S3, records in SQS, or any number of other purposes; the ‘sky’ is the limit.
In this guide, we’ll take on the example of a modernizing data warehouse which wants to enable more up-to-date data by operating on events caused by a vendor depositing data onto their S3 bucket / Data Lake.
We have at least two options:
1. Perform the computation in Lambda#
Pros:
Straight forward development of ETL (read and process directly in Pandas/Polars/etc).
No cluster or added external complexity to maintain.
Cons:
Code size restriction (or resort to using heavier Docker images).
Run time duration (15mins as of this writing).
Memory maximum (10GB as of this writing).
GB/sec pricing is more expensive than equivelent EC2, so there is an incentive to minimize runtime and memory.
2. Perform the computation on a Dask cluster#
Pros:
Can often use minimal Lambda memory (128MB at time of this writing).
Can optionally use ‘fire-and-forget’ job submission to cluster, allowing minimal Lambda execution duration.
Size of file to be processed is decoupled from Lambda resource requirements.
Cons:
Some added complexity of interfacing/managing external Dask cluster.
No surprise, we’ll be taking door number two.
In my experience, and perhaps yours, things will be running great on Lambda and then something happens. The file/record/data being processed grows, a processing dependency’s new release ends up exceeding Lamba’s code size limit, and so on. Inturn Lambda resources grow, costs go up, and at worst, complexity grows as the ETL may need to be moved to larger resources such as Batch/Fargate/etc.
As noted, using Lambda as a client to coordinate and offload the compute to a cluster can decouple the size of the compute resources needed from the convenience of using a Lambda function.
If you’re on board so far, let’s continue.
Alright then, so we have a data vendor which deposits some file on our S3 bucket during business hours at 1 minute intervals. Our ETL will take the file and do some processing given the file’s input. A Lambda function will be triggered on these new files, it will then establish a connection to a Dask cluster and offload the computation to said cluster.
Here’s what that looks like as a diagram:
Okay let’s look at some code. First thing’s first, we need to start the cluster at the start of the day. This could be different depending on if you want to start a cluster based on SQS size, or even starting a cluster with every Lambda invocation, the code will look the same.
# file: lambda_function.py
import coiled
from distributed import wait
import cluster_processing # We'll talk about this next...
def start_stop_cluster(event, context):
"""
Lambda Function
Scheduled CRON Lambda function to start/stop Dask cluster using Coiled
"""
client = _get_or_create_cluster_client()
# Shutdown cluster if indicated
# In our case, an EventBridge scheduled event: {"action": "stop"}
if event.get("action") == "stop":
client.cluster.shutdown()
def _get_or_create_cluster_client()
# Will reconnect to cluster if name already exists, otherwise create it.
cluster = coiled.Cluster(
name=f"my-processing-cluster",
software=_software_environment(),
shutdown_on_close=False, # Note: Don't automatically shutdown cluster after Lambda exits
scheduler_options={"idle_timeout": "2 hours"}, # Defaults to shutting down after 20mins of inactivity
n_workers=4,
worker_cpu=2,
)
client = cluster.get_client()
client.upload_file(cluster_processing.__file__) # We'll talk about this later in the bridge function
return cluster.get_client()
def _software_environment():
"""
Create a Coiled software environment, which is a superset of
dependencies used in the Lambda environment. For example, this
Lambda environment wil have `distributed` as will the Dask cluster,
but the cluster will also have the heavier dependencies like
`dask[dataframe]`.
"""
name = "my-software-environment"
coiled.create_software_environment(name=name, pip=["dask[dataframe]", "s3fs", "bokeh==2.4.2"])
return name
We have one Lambda function, start_stop_cluster
, in order to manage the start/stop of a cluster which will be available during business hours; when our vendor is depositing files on s3 for processing.
This is followed by two helper functions:
_get_or_create_cluster_client
: This defines our cluster which we will use for processing files. Important to note that when a cluster already exists with the same name, we’ll just be reconnected to it._software_environment
Part of the benefit of offloading compute to our Dask cluster means the Lambda function can be ligther weight in terms of source code and dependencies. Therefore, we can install additional dependencies on the cluster for the processing of files. In this case, we can use dask dataframes and bokeh (for use in Scheduler dashboard).
Note
You will need to set the DASK_COILED__TOKEN
environment variable for these functions, and all other functions using the Coiled API to work. See how to create your API token.
After this, invoking start_stop_cluster
will start a Dask cluster for you via Coiled
Remember:#
This example we’re talking about using an existing Dask cluster, but the code above can and will start a cluster on every new Lambda execution, only need to switch shutdown_on_close=True
, to stop the cluster once the Lambda/client has exited.
You’ll likely want to adjust this to your situtation. For example, if you have an ETL which only runs occasionally then starting/stopping a cluster with the Lambda execution makes more sense. Here we are using a frequently running ETL so we’re keeping the cluster alive between those Lambda executions.
Adjusting scheduler_options={"idle_timeout": ...}
might also be a great option for your sitatuation if you experience less predictable and/or bursts of processing events followed by lengthy idle times for example.
Super duper, now let’s write our bridge_function
we imported above in order to process the data on the cluster:
# file: cluster_processing.py
def bridge_function(bucket: str, key: str):
"""
Process our vendor's s3 file via bridge function
"""
# Some example processing code.
# The important bit here, is importing the extra
# dependencies inside of this function since this
# `cluster_processing.py` file will be imported
# inside of the Lambda function environment which
# doesn't have the extra dependencies. (But the cluster will)
import dask.dataframe as dd # dask[dataframe] only exists on cluster!
df = dd.read_parquet(f"s3://{bucket}/{key}")
... some ETL logic here ...
df.to_parquet("s3://our-data-lake/processed/vendor/data.parquet")
### Or alternatively import some internal library for processing ###
from my_organizational_code import processing_logic
processing_logic(bucket=bucket, key=key) # all other imports are inside this function
We’ll keep this function simple, as the processing code isn’t so important for this guide; it’ll be normal dask code you’d write anyway. The caveat being we need to link the Lambda’s client with code to be executed on the cluster, while ensuring we don’t accidently import any modules not available on the Lambda function’s runtime environment.
Above is how we can do this. We define some ‘bridge’ function which can be imported by both the Lambda function and cluster workers while keeping cluster only modules hidden from being imported by the lambda function.
To complete this, after starting the Cluster we call client.upload_file(cluster_processing.__file__)
to create the file on the cluster, which only serves as a common interface between the client and the cluster workers. If you choose to have your function in the same file, then uploading wouldn’t be needed; it’s here to demonstrate should one prefer to place this ‘bridge function’ in a separate module.
From there, the cluster can have all the added dependencies it desires, while the Lambda function itself can make due with only coiled
and distributed
in this case. Allowing it to run with the minimum Lambda memory (128MB).
Finally, our Lambda function which is invoked on new files only needs to re-use _get_or_create_cluster_client
then submit new S3 files to the cluster for processing:
# lambda_processing.py
... all code as above from lambda_processing.py ...
def process_s3_file(event, context):
"""
Lambda function invoked on new S3 files from our vendor
"""
# Get bucket and key of file triggering this function
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"].replace("%3D", "=")
# Offload the processing to the cluster
job = client.submit(cluster_processing.bridge_function, bucket, key)
wait(job)
# Any remaining notifications, processing,
# for example we can cause the function to propogate any error:
if job.exception() is not None:
raise job.exception()
And that’s it, we’ll now see tasks being ran on the cluster, coordinated by the client on Lambda!
Summary#
We’ve seen that we can pretty easily offload heavy compute work from Lambda onto a Dask cluster, allowing our Lambda functions to be more agile, lower in cost to operate, and more stable when data input sizes change.
For a full deployable example, check out our AWS CDK example stack