Faster data transfer with Snowflake#

Note

The dask-snowflake connector is currently in beta.

Snowflake is is a cloud-based data warehouse, SQL query engine, and analytics service. Coiled helps scale Python workloads by provisioning cloud-hosted Dask clusters on demand.

Coiled and Snowflake work great together - Snowflake handles the data storage and SQL query processing while Coiled handles the infrastructure for creating Dask clusters in your cloud.

Loading data from Snowflake into Python typically involves either using the Python Snowflake connector to send SQL queries to Snowflake or exporting data from Snowflake into Parquet format. This works well for small datasets, but can be a limiting factor when working with larger datasets and more complex queries. The Dask-Snowflake connector helps enables parallel I/O between Snowflake and Dask.

In this guide, you’ll learn how to use the dask-snowflake connector to read and write large datasets in parallel and perform distributed computations using Dask clusters on the cloud with Coiled. Click here to download this example.

Before you start#

You’ll first need to create consistent local and remote software environments with dask, coiled, and the necessary dependencies installed. If you are unfamiliar with creating software environments, you can first follow the tutorial on setting up a custom software environment.

First, you will install snowflake-connector-python, dask-snowflake, and coiled-runtime, a Dask meta-package. Save the following file as environment.yml, replacing <x.x.x> with the versions you would like to use. You can get most up-to-date version of coiled-runtime from the latest tag in the public coiled-runtime repository.

channels:
  - conda-forge
dependencies:
  - dask-snowflake=<x.x.x>
  - snowflake-connector-python=<x.x.x>
  - coiled-runtime=<x.x.x>
  - python=3.9

Next, create a local software environment using the environment.yml file:

$ conda env create -f environment.yml -n snowflake-example
$ conda activate snowflake-example

Lastly, create a remote software environment using the environment.yml file:

$ coiled env create -n snowflake-example --conda environment.yml

1. Verify connectivity#

Define your Snowflake connection parameters as environment variables by running the following Python code and replacing the user, password, and account with your own values (see the Snowflake documentation on account identifiers):

# Set environment variables related to Snowflake
import os

os.environ["SNOWFLAKE_USER"] = "<YOUR-USERNAME>"
os.environ["SNOWFLAKE_PASSWORD"] = "<YOUR-PASSWORD>"
os.environ["SNOWFLAKE_ACCOUNT"] = "<YOUR-ACCOUNT>"
# leave this line as-is, you'll use it in a later step
os.environ["SNOWFLAKE_WAREHOUSE"] = "dask_snowflake_wh"

Note

Don’t have a Snowflake account? You can sign up here for a free trial.

Verify your Snowflake connection with a query to a sample dataset:

# Verify Snowflake connectivity
import snowflake.connector

ctx = snowflake.connector.connect(
    user=os.environ["SNOWFLAKE_USER"],
    password=os.environ["SNOWFLAKE_PASSWORD"],
    account=os.environ["SNOWFLAKE_ACCOUNT"],
)

cs = ctx.cursor()

schema = "TPCDS_SF100TCL"
table = "CALL_CENTER"

cs.execute("USE SNOWFLAKE_SAMPLE_DATA")
cs.execute("SELECT * FROM " + schema + "." + table)

one_row = str(cs.fetchone())

print(one_row)

If the connection and query were successful, then you should see output similar to the following:

(1, 'AAAAAAAABAAAAAAA', datetime.date(1998, 1, 1), None, None, 2450952, 'NY
Metro', 'large', 597159671, 481436415, '8AM-4PM', 'Bob Belcher', 6, 'More
than other authori', 'Shared others could not count fully dollars. New
members ca', 'Julius Tran', 3, 'pri', 6, 'cally', '730', 'Ash Hill',
'Boulevard', 'Suite 0', 'Georgetown', 'Harmon County', 'OK', '77057', 'United
States', Decimal('-6.00'), Decimal('0.11'))

Note

We defined the Snowflake username, password, account, and warehouse as environment variables and then passed them to the Snowflake connector. If you use a different authentication method, you can modify the example accordingly. See the Snowflake documentation on using the Snowflake connector for Python.

2. Launch your cluster#

Create a Dask cluster on your cloud with Coiled with the environment you created:

# Create a Dask cluster with Coiled
import coiled
from dask.distributed import Client

cluster = coiled.Cluster(
    name="snowflake-example",
    software="snowflake-example",
)
# connect Dask to your Coiled cluster
client = Client(cluster)
print("Dashboard:", client.dashboard_link)

The above example also connects Dask to your Coiled cluster and prints a link to the Dask dashboard, which you can use later to view the progress of parallel reads and writes to Snowflake.

3. Generate data#

You’ll first generate a random timeseries of data from dask.datasets:

# Generate synthetic data
import dask

ddf = dask.datasets.timeseries(
    start="2021-01-01",
    end="2021-03-31",
)

You’ll use the dask-snowflake connector to load this sample data into Snowflake in a later step.

4. Create Snowflake resources#

Create a test warehouse and database to write and read data to and from Snowflake:

# Create a warehouse and database in Snowflake
ctx = snowflake.connector.connect(
    user=os.environ["SNOWFLAKE_USER"],
    password=os.environ["SNOWFLAKE_PASSWORD"],
    account=os.environ["SNOWFLAKE_ACCOUNT"],
    warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
)

cs = ctx.cursor()

cs.execute("CREATE WAREHOUSE IF NOT EXISTS dask_snowflake_wh")
cs.execute("CREATE DATABASE IF NOT EXISTS dask_snowflake_db")
cs.execute("USE DATABASE dask_snowflake_db")

5. Write data in parallel#

Now you can use dask-snowflake to write the random timeseries dataset in parallel via a distributed fetch:

# Write data to Snowflake in parallel
from dask_snowflake import to_snowflake

connection_kwargs = {
    "user": os.environ["SNOWFLAKE_USER"],
    "password": os.environ["SNOWFLAKE_PASSWORD"],
    "account": os.environ["SNOWFLAKE_ACCOUNT"],
    "warehouse": os.environ["SNOWFLAKE_WAREHOUSE"],
    "database": "dask_snowflake_db",
    "schema": "PUBLIC",
}

to_snowflake(
    ddf,
    name="dask_snowflake_table",
    connection_kwargs=connection_kwargs,
)

You can monitor the progress of the parallel write operation with the Dask dashboard. After about a minute, the sample data should appear in your Snowflake database. Congrats, you just loaded about 7.7 million records into Snowflake in parallel!

6. Read data in parallel#

Now that you have a timeseries dataset stored in Snowflake, you can read the data back into your Coiled cluster in parallel via a distributed fetch:

# Read data from Snowflake in parallel
from dask_snowflake import read_snowflake

ddf = read_snowflake(
    query="""
        SELECT *
        FROM dask_snowflake_table;
    """,
    connection_kwargs=connection_kwargs,
)

print(ddf.head())

After a few seconds, you should see the results. As usual, Dask only loads the data that it needs, since operations in Dask are lazy until computed. You can now work with Dask as usual to perform computations in parallel.

7. Work with Dask#

After you’ve loaded data on to your Coiled cluster, you can perform typical Dask operations:

# Work with Dask as usual
result = ddf.X.mean().compute()
print(result)

result = ddf.X.mean().compute()
print(result)

After the computation completes, you should see output similar to the following:

0.00020641088610962797

Lastly, you can stop the running cluster using the following commands. By default, clusters will shutdown after 20 minutes of inactivity.

# Close the cluster
cluster.close()

# Close the client
client.close()

Next Steps#

You can run through the example again and increase the size of the sample dataset or increase the size of your Coiled cluster with the n_workers argument. You can also step through this guide using any other datasets you have stored in Snowflake. Check out Coiled’s blogpost for more details on how to use Snowflake and Dask.