Distributed SQL queries with Dask-SQL#

In this guide, you’ll use Dask-SQL, an open-source library which adds a SQL query layer on top of Dask. This allows you to query and transform Dask DataFrames using common SQL operations. You can download this jupyter notebook to follow along in your own JupyterLab session.

Before you start#

You’ll first need to create consistent local and remote software environments with dask, coiled, and the necessary dependencies installed. If you are unfamiliar with creating software environments, you can first follow the tutorial on setting up a custom software environment

First, you will install dask-sql and coiled-runtime, a Dask meta-package. Save the following file as environment.yml. You can get most up-to-date version of coiled-runtime from the latest tag in the public coiled-runtime repository.

channels:
  - conda-forge
dependencies:
  - python=3.9
  - coiled-runtime=0.0.3
  - dask-sql=2022.1.0

Next, create a local software environment using the environment.yml file:

conda env create -f environment.yml -n dask-sql-example
conda activate dask-sql-example

Lastly, create a remote software environment using the same environment.yml file:

coiled env create -n dask-sql-example --conda environment.yml

Launch your cluster#

You’ll start by creating a Coiled cluster:

import coiled
    
cluster = coiled.Cluster(
    name="dask-sql-example",
    n_workers=5,
    software="dask-sql-example",
)

and then connecting Dask to your remote Coiled cluster:

from dask.distributed import Client

# connect to the remote cluster
client = Client(cluster)

Getting started with Dask-SQL#

The main interface for interacting with Dask-SQL is the dask_sql.Context object. It allows you to register Dask DataFrames as data sources and can convert SQL queries to Dask DataFrame operations.

from dask_sql import Context

c = Context()

You’ll use the dask.datasets.timeseries dataset of random timeseries data:

import dask

# Load dataset
df = dask.datasets.timeseries(
    "2000", "2005", partition_freq="2w"
)

You can then use the dask_sql.Context to assign a table name to this Dask DataFrame, and use that table name for SQL queries:

# Register the Dask DataFrame df as a table
# use persist=True for reducing overhead for multiple queries
c.register_dask_table(df, "timeseries", persist=True)

# Perform a SQL operation on the "timeseries" table
result = c.sql("SELECT count(1) FROM timeseries")

Note that this returned another Dask DataFrame and no computation has been run yet. This is similar to other Dask operations, which are lazily evaluated. We can use compute to run the computation on our cluster.

result.compute()
COUNT(1)
0 157248000

You’ve run your first SQL query with Dask-SQL! Let’s try out some more complex queries.

More complex SQL examples#

With Dask-SQL we can run more complex SQL statements like, for example, a groupby-aggregation:

c.sql('SELECT max(y) as "max", name FROM timeseries GROUP BY name').compute()
max name
0 1.000000 Alice
1 1.000000 Bob
2 1.000000 Charlie
3 0.999999 Dan
4 1.000000 Edith
5 1.000000 Frank
6 1.000000 George
7 1.000000 Hannah
8 1.000000 Ingrid
9 0.999999 Jerry
10 1.000000 Kevin
11 1.000000 Laura
12 0.999999 Michael
13 1.000000 Norbert
14 1.000000 Oliver
15 1.000000 Patricia
16 0.999999 Quinn
17 1.000000 Ray
18 1.000000 Sarah
19 1.000000 Tim
20 1.000000 Ursula
21 1.000000 Victor
22 1.000000 Wendy
23 0.999999 Xavier
24 0.999999 Yvonne
25 0.999999 Zelda

The equivalent operation using the Dask DataFrame API would be df.groupby("passenger_count").tip_amount.mean().compute(). We can build up complexity by adding a WHERE clause to filter for certain values of x:

c.sql("""
    SELECT name, AVG(y) as "average"
    FROM timeseries 
    WHERE x > 0.2
    GROUP BY name
""").compute()
name average
0 Alice -4.057128e-04
1 Bob 3.807399e-04
2 Charlie 1.664932e-04
3 Dan 2.467381e-04
4 Edith 9.742376e-05
5 Frank -9.143104e-05
6 George -2.146390e-04
7 Hannah -6.085306e-05
8 Ingrid 2.383526e-05
9 Jerry -2.286118e-04
10 Kevin -3.735864e-04
11 Laura 4.983597e-05
12 Michael -6.790377e-04
13 Norbert 4.276074e-04
14 Oliver 2.105069e-04
15 Patricia 8.815882e-05
16 Quinn 3.242869e-04
17 Ray 6.378322e-05
18 Sarah 1.756173e-07
19 Tim 3.764230e-05
20 Ursula -1.870895e-04
21 Victor 2.181580e-04
22 Wendy 2.784554e-06
23 Xavier -4.598764e-04
24 Yvonne 2.373139e-04
25 Zelda 2.807484e-04

Once you’re done, you can shutdown the cluster (it will shutdown automatically after 20 minutes of inactivity):

cluster.close()
client.close()

Next Steps#

For a more in-depth look at what you can do with Dask-SQL, see the Operating on Dask DataFrames with SQL how-to guide. You can also reference the Dask-SQL docs or GitHub repo.