{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Distributed SQL queries with Dask-SQL\n", "\n", "In this guide, you'll use [Dask-SQL](https://dask-sql.readthedocs.io/en/latest/), an open-source library which adds a SQL query layer on top of Dask. This allows you to query and transform Dask DataFrames using common SQL operations. You can download {download}`this jupyter notebook ` to follow along in your own JupyterLab session.\n", "\n", "## Before you start\n", "\n", "You'll first need to create consistent local and remote software environments\n", "with `dask`, `coiled`, and the necessary dependencies installed.\n", "If you are unfamiliar with creating software environments, you can first\n", "follow the [tutorial on setting up a custom software environment](https://docs.coiled.io/user_guide/tutorials/matching_coiled_senvs.html)\n", "\n", "First, you will install `dask-sql` and [coiled-runtime](https://docs.coiled.io/user_guide/software_environment.html#coiled-runtime), a Dask meta-package.\n", "Save the following file as `environment.yml`. You can get most up-to-date version of coiled-runtime from the latest\n", "[tag](https://github.com/coiled/coiled-runtime/tags) in the public coiled-runtime repository." ] }, { "cell_type": "markdown", "metadata": { "vscode": { "languageId": "shellscript" } }, "source": [ "```\n", "channels:\n", " - conda-forge\n", "dependencies:\n", " - python=3.9\n", " - coiled-runtime=0.0.3\n", " - dask-sql=2022.1.0\n", " ```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, create a local software environment using the `environment.yml` file:" ] }, { "cell_type": "markdown", "metadata": { "vscode": { "languageId": "shellscript" } }, "source": [ "```\n", "conda env create -f environment.yml -n dask-sql-example\n", "conda activate dask-sql-example\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Lastly, create a remote software environment using the same `environment.yml` file:" ] }, { "cell_type": "markdown", "metadata": { "vscode": { "languageId": "shellscript" } }, "source": [ "```\n", "coiled env create -n dask-sql-example --conda environment.yml\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Launch your cluster\n", "\n", "You'll start by creating a Coiled cluster:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import coiled\n", " \n", "cluster = coiled.Cluster(\n", " name=\"dask-sql-example\",\n", " n_workers=5,\n", " software=\"dask-sql-example\",\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "and then connecting Dask to your remote Coiled cluster:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from dask.distributed import Client\n", "\n", "# connect to the remote cluster\n", "client = Client(cluster)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Getting started with Dask-SQL\n", "\n", "The main interface for interacting with Dask-SQL is the `dask_sql.Context` object. It allows you to register Dask DataFrames as data sources and can convert SQL queries to Dask DataFrame operations." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "from dask_sql import Context\n", "\n", "c = Context()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You'll use the `dask.datasets.timeseries` dataset of random timeseries data:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "import dask\n", "\n", "# Load dataset\n", "df = dask.datasets.timeseries(\n", " \"2000\", \"2005\", partition_freq=\"2w\"\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can then use the `dask_sql.Context` to assign a table name to this Dask DataFrame, and use that table name for SQL queries:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "# Register the Dask DataFrame df as a table\n", "# use persist=True for reducing overhead for multiple queries\n", "c.register_dask_table(df, \"timeseries\", persist=True)\n", "\n", "# Perform a SQL operation on the \"timeseries\" table\n", "result = c.sql(\"SELECT count(1) FROM timeseries\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that this returned another Dask DataFrame and no computation has been run yet. This is similar to other Dask operations, which are lazily evaluated. We can use `compute` to run the computation on our cluster." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
COUNT(1)
0157248000
\n", "
" ], "text/plain": [ " COUNT(1)\n", "0 157248000" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "result.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You've run your first SQL query with Dask-SQL! Let’s try out some more complex queries.\n", "\n", "## More complex SQL examples\n", "\n", "With Dask-SQL we can run more complex SQL statements like, for example, a groupby-aggregation:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
maxname
01.000000Alice
11.000000Bob
21.000000Charlie
30.999999Dan
41.000000Edith
51.000000Frank
61.000000George
71.000000Hannah
81.000000Ingrid
90.999999Jerry
101.000000Kevin
111.000000Laura
120.999999Michael
131.000000Norbert
141.000000Oliver
151.000000Patricia
160.999999Quinn
171.000000Ray
181.000000Sarah
191.000000Tim
201.000000Ursula
211.000000Victor
221.000000Wendy
230.999999Xavier
240.999999Yvonne
250.999999Zelda
\n", "
" ], "text/plain": [ " max name\n", "0 1.000000 Alice\n", "1 1.000000 Bob\n", "2 1.000000 Charlie\n", "3 0.999999 Dan\n", "4 1.000000 Edith\n", "5 1.000000 Frank\n", "6 1.000000 George\n", "7 1.000000 Hannah\n", "8 1.000000 Ingrid\n", "9 0.999999 Jerry\n", "10 1.000000 Kevin\n", "11 1.000000 Laura\n", "12 0.999999 Michael\n", "13 1.000000 Norbert\n", "14 1.000000 Oliver\n", "15 1.000000 Patricia\n", "16 0.999999 Quinn\n", "17 1.000000 Ray\n", "18 1.000000 Sarah\n", "19 1.000000 Tim\n", "20 1.000000 Ursula\n", "21 1.000000 Victor\n", "22 1.000000 Wendy\n", "23 0.999999 Xavier\n", "24 0.999999 Yvonne\n", "25 0.999999 Zelda" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "c.sql('SELECT max(y) as \"max\", name FROM timeseries GROUP BY name').compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The equivalent operation using the Dask DataFrame API would be `df.groupby(\"passenger_count\").tip_amount.mean().compute()`. We can build up complexity by adding a ``WHERE`` clause to filter for certain values of ``x``:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
nameaverage
0Alice-4.057128e-04
1Bob3.807399e-04
2Charlie1.664932e-04
3Dan2.467381e-04
4Edith9.742376e-05
5Frank-9.143104e-05
6George-2.146390e-04
7Hannah-6.085306e-05
8Ingrid2.383526e-05
9Jerry-2.286118e-04
10Kevin-3.735864e-04
11Laura4.983597e-05
12Michael-6.790377e-04
13Norbert4.276074e-04
14Oliver2.105069e-04
15Patricia8.815882e-05
16Quinn3.242869e-04
17Ray6.378322e-05
18Sarah1.756173e-07
19Tim3.764230e-05
20Ursula-1.870895e-04
21Victor2.181580e-04
22Wendy2.784554e-06
23Xavier-4.598764e-04
24Yvonne2.373139e-04
25Zelda2.807484e-04
\n", "
" ], "text/plain": [ " name average\n", "0 Alice -4.057128e-04\n", "1 Bob 3.807399e-04\n", "2 Charlie 1.664932e-04\n", "3 Dan 2.467381e-04\n", "4 Edith 9.742376e-05\n", "5 Frank -9.143104e-05\n", "6 George -2.146390e-04\n", "7 Hannah -6.085306e-05\n", "8 Ingrid 2.383526e-05\n", "9 Jerry -2.286118e-04\n", "10 Kevin -3.735864e-04\n", "11 Laura 4.983597e-05\n", "12 Michael -6.790377e-04\n", "13 Norbert 4.276074e-04\n", "14 Oliver 2.105069e-04\n", "15 Patricia 8.815882e-05\n", "16 Quinn 3.242869e-04\n", "17 Ray 6.378322e-05\n", "18 Sarah 1.756173e-07\n", "19 Tim 3.764230e-05\n", "20 Ursula -1.870895e-04\n", "21 Victor 2.181580e-04\n", "22 Wendy 2.784554e-06\n", "23 Xavier -4.598764e-04\n", "24 Yvonne 2.373139e-04\n", "25 Zelda 2.807484e-04" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "c.sql(\"\"\"\n", " SELECT name, AVG(y) as \"average\"\n", " FROM timeseries \n", " WHERE x > 0.2\n", " GROUP BY name\n", "\"\"\").compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once you're done, you can shutdown the cluster (it will shutdown automatically after 20 minutes of inactivity):" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "cluster.close()\n", "client.close()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Next Steps\n", "\n", "For a more in-depth look at what you can do with Dask-SQL, see the [Operating on Dask DataFrames with SQL](https://examples.dask.org/sql.html) how-to guide. You can also reference the [Dask-SQL docs](https://dask-sql.readthedocs.io/) or [GitHub repo](https://github.com/dask-contrib/dask-sql)." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.13" }, "vscode": { "interpreter": { "hash": "02c852c24d9f048ccdc209be0dc4985b81e663aaf523cefac5b7672a31b52420" } } }, "nbformat": 4, "nbformat_minor": 4 }