DataFrame Workloads with Coiled#

Dask and Coiled make it easy to access, manipulate, and transform large tabular datasets in the cloud.  Dask DataFrame scales out pandas, and is a natural and modern replacement for Spark.

There are many ways to use Coiled for data analysis and transformation.  This page collects a few common patterns and points you to useful documentation and examples.

This page may be helpful for you if …

You want to …

  • Transform terabytes of Parquet data

  • Query massive tables with Python

  • Run Python jobs on a schedule

You’re pained by  …

  • Running out of memory

  • Spark’s UX making you cry

  • Slow startup, poor observability

In this post we’ll go through some common workflows, and make a general case for Coiled for tabular data analysis and transformation workloads.

Dask DataFrame does what Pandas does#

Just bigger and better.

Dask DataFrame is composed of many small pandas DataFrames spread across a cluster of machines.  This makes it trivial for your team to adopt Dask if your team already uses Python and pandas.

import pandas as pd

orders = pd.read_parquet("orders.parquet")
accounts = pd.read_parquet("accounts.parquet")

df = orders.merge(accounts, on="id")
df.to_parquet("merged.parquet")
  • Easy and Familiar API ✅

  • Combines well with other Python code ✅

  • Kinda slow ❌

  • Breaks when data gets too big ❌

import dask.dataframe as dd

orders = dd.read_parquet("s3://my-bucket/orders.*.parquet")
accounts = dd.read_parquet("s3://my-bucket/accounts.*.parquet")

df = orders.merge(accounts, on="id")
df.to_parquet("s3://my-bucket/merged.parquet")
  • Easy and familiar API ✅

  • Combines well with other Python code ✅

  • Pretty fast ✅

  • Scales comfortably from 10 GiB to 10 TiB ✅

To learn more about Dask DataFrame, see Dask DataFrame Documentation or see the Uber-Lyft exploratory analysis example

Alternatively, Coiled supports other stuff too, including …

Performance at Scale#

There are many options today for DataFrame libraries including Spark, Dask, DuckDB, Polars, and others.  Dask is robustly fast at scale.  When calculating on the popular TPC-H benchmark suite Dask wasn’t always the fastest (especially at small scale, when Polars and DuckDB are really impressive) but it was fastest at large scale (easily beating Spark) and it was the most robustly performant across scales.

For more information, read about TPC-H Benchmarks

Regular Production Jobs#

  • Q: How do I use Coiled in my CI/CD pipeline for regular production jobs?

  • A: Because you can use Coiled from anywhere you run Python, Coiled composes naturally with all job schedulers.  Just keep using whatever you’re using today.

Other frameworks tie you into a certain job scheduling frameworks (ahem, Databricks) or depend on explicit third-party integrations. That may be what you want, or it may be terrible for your use case.  Fortunately, because of Coiled’s Environment Synchronization, you can use Coiled from anywhere that lets you call Python code.

Here are some examples, but the point is that Coiled works from anywhere

from prefect import task, flow
import coiled

@task
def my_job_task(...):
    with coiled.Cluster(...) as cluster:
        ...

See Prefect documentation for more information and different ways to use the projects

from dagster import op
import coiled

@op
def my_regular_job(...):
    with coiled.Cluster(...) as cluster:
        ...

See Example: Dagster and Coiled for Production for a worked example.

0 * * * * python process.py
# process.py

import coiled, dask

with coiled.Cluster(...) as cluster:
    ...

For more extensive examples see also

Because Coiled operates as a library you can use it from anywhere. The examples above are not exhaustive. They’re demonstrative that explicit integrations are not necessary.

For a more fully featured example that uses Coiled + Prefect + Delta table, see Example: Easy Scalable Production ETL

Metrics and Observability#

Things break.  It’s your job to fix them.  We can help.

Jobs fail for all sorts of reasons.  Fortunately, Coiled ships with an excellent suite of metrics and observability dashboards that can help you track down failures quickly.  Coiled gives you an integrated timeline of your Code, Metrics, and Logs. This combination helps you quickly track down annoying failures and attribute them to hardware or code failures.

 

Prefer your own logging stack?  No worries, we dump logs directly into your normal log store (like AWS Cloudwatch) and tag all resources so your existing systems should continue working just fine.

Data Access#

Because of the rich history of Python and pandas, Dask and Coiled can read data from pretty much anywhere.

import dask.dataframe as dd

df = dd.read_parquet(
    path="s3://my-bucket/*.parquet"
)


import dask_snowflake

df = read_snowflake(
    query="SELECT * FROM MyTable",
    connection_kwargs={...},
)

import dask_deltatable

df = dask_deltatable.read_deltalake(
    path="mytable",
    datetime="2020-01-01"
)

import dask_bigquery

ddf = dask_bigquery.read_gbq(
    table_id="...",
    project_id="...",
    dataset_id="...",
)

For more information, read Dask DataFrame documentation on loading data

Additionally, because of Coiled’s Environment Synchronization data access credentials are automatically transmitted from wherever you’re calling Coiled (a user’s machine, or a CI job) to the cloud VMs. This means that you don’t need to do anything special for data credentials, beyond what you’re already doing for small-data cases.

Cloud Architecture#

Coiled’s Raw Cloud Architecture offers several advantages over other data transformation / engineering platforms that we’ve seen historically.

  1. Minute-long cluster spin-up times means that users quickly spin things up and spin them down, making things both responsive and cost efficient.

    In contrast, EMR and Databricks users experience 5-15 minute spin-up times, which causes a behavior of leaving clusters on and idling all-day, leading to high costs.

  2. Zero architecture to maintain means that the top-engineer who’s managing the cluster can go do more interesting work. Coiled only spins up VMs, and only when you’re actively using them. There are no Kubernetes clusters or node pools or other standing infrastructure to keep healthy.

  3. Any instance type means that you can use any hardware type to optimize for the job at hand. Need fast disk / GPUs / fast networking / big-memory machines for your job? No problem. For more information read Manage Hardware