Reading CSV files into Dask DataFrames with read_csv#

Matthew Powers

2022-02-09

6 min read

Here’s how this post is organized:

Lots of datasets are stored with the CSV file format, so it’s important to understand the Dask read_csv API in detail.

Dask read_csv: single small file#

Dask makes it easy to read a small file into a Dask DataFrame. Suppose you have a dogs.csv file with the following contents:

first_name,age
fido,3
lucky,4
gus,8

Here’s how to read the CSV file into a Dask DataFrame.

>>> import dask.dataframe as dd
>>> ddf = dd.read_csv("dogs.csv")

You can inspect the content of the Dask DataFrame with the compute() method.

>>> ddf.compute()
  first_name  age
0       fido    2
1      lucky    8
2     copper    3

This is quite similar to the syntax for reading CSV files into pandas DataFrames.

>>> import pandas as pd
>>> df = pd.read_csv("dogs.csv")

The Dask DataFrame API was intentionally designed to look and feel just like the pandas API.

For a single small file, Dask may be overkill and you can probably just use pandas. Dask starts to gain a competitive advantage when dealing with large CSV files. Rule-of-thumb for working with pandas is to have at least 5x the size of your dataset as available RAM. Use Dask whenever you exceed this limit. For example, when working on a 16 GB RAM machine, consider switching over to Dask when your dataset exceeds 3 GB in size.

Dask read_csv: single large file#

Dask DataFrames are composed of multiple partitions, each of which is a pandas DataFrame. Dask intentionally splits up the data into multiple pandas DataFrames so operations can be performed on multiple slices of the data in parallel.

Let’s read in a 5.19 GB file (5,190 MB) into a Dask DataFrame. This file is hosted in a public S3 bucket at s3://coiled-datasets/h2o/G1_1e8_1e2_0_0/csv/G1_1e8_1e2_0_0.csv if you’d like to download it yourself.

>>> ddf = dd.read_csv("data/G1_1e8_1e2_0_0.csv")

We can run ddf.npartitions to see how many partitions the data is divided into.

>>> ddf.npartitions
81

You can customize the number of partitions that the DataFrame will contain by setting the blocksize parameter when invoking read_csv.

Dask read_csv: blocksize#

The number of partitions depends on the value of the blocksize argument. If you don’t supply a value to the blocksize keyword, it is set to “default” and the blocksize is computed based on the available memory and number of cores on the machine, up to a maximum of 64 MB. In the example above, Dask automatically splits up the 5,190 MB data file into ~64 MB chunks when run on a MacBook Air with 8 GB of RAM and 4 cores.

We can also manually set the blocksize parameter when reading CSV files to make the partitions larger or smaller.Let’s read this data file with a blocksize of 16 MB.

>>> ddf = dd.read_csv("data/G1_1e8_1e2_0_0.csv", blocksize="16MB")
>>> ddf.npartitions
324

The Dask DataFrame consists of 325 partitions when the blocksize is 16 MB. The number of partitions goes up when the blocksize decreases. Let’s read in this data file with a blocksize of 128 MB.

>>> ddf = dd.read_csv("data/G1_1e8_1e2_0_0.csv", blocksize="128MB")
>>> ddf.npartitions
40

The Dask DataFrame has 40 partitions when the blocksize is set to 128 MB. There are fewer partitions when the blocksize increases.

Let’s take a look at how Dask infers the data types of each column when a CSV file is read. The rule of thumb when working with Dask DataFrames is to keep your partitions under 100 MB in size.

Dask read_csv: inferring dtypes#

CSV is a text-based file format and does not contain metadata information about the data types or columns. When reading a CSV file, Dask needs to infer the column data types if they’re not explicitly set by the user.

Let’s look at the data types that Dask has inferred for our DataFrame.

>>> ddf.dtypes
id1    string[pyarrow]
id2    string[pyarrow]
id3    string[pyarrow]
id4              int64
id5              int64
id6              int64
v1               int64
v2               int64
v3             float64
dtype: object

Dask automatically uses PyArrow strings, which consume up to 70% less memory versus NumPy objects and can therefore provide a huge performance improvement (more details in Utilizing PyArrow to improve pandas and Dask workflows).

Dask infers the dtype based on a sample of the data. It doesn’t look at every row in the dataset to infer the data types because that would be prohibitively slow for large datasets.

You can increase the number of rows that are sampled by setting the sample parameter, though for this dataset, this does not change the inferred data types.

ddf = dd.read_csv("data/G1_1e8_1e2_0_0.csv", sample=5000)

Dask read_csv: manually specifying dtype#

Inferring data types based on a sample of the rows can be error-prone if the dtype is not the same for all column values. A common example of this is when a column contains NaN values. You can avoid dtype inference by explicitly specifying the dtype when reading CSV files.

Let’s manually set the id4, id5, and id6 columns to a Python float, which would avoid any errors on computing our results if these columns contained NaN values.

>>> ddf = dd.read_csv(
    "data/G1_1e8_1e2_0_0.csv",
    dtype={
        "id4": float,
        "id5": float,
        "id6": float,
    },
)
>>> ddf.dtypes
id1    string[pyarrow]
id2    string[pyarrow]
id3    string[pyarrow]
id4            float64
id5            float64
id6            float64
v1               int64
v2               int64
v3             float64
dtype: object

Dask will infer the data types for the columns that you don’t manually specify. If you specify the dtype for all the columns, then Dask won’t do any data type inference.

Dask read_csv: multiple files#

Dask can read data from a single file, but it’s even faster for Dask to read multiple files in parallel.

Let’s write out the large 5.19 GB CSV file from earlier examples as multiple CSV files so we can see how to read multiple CSV files into a Dask DataFrame. Start by writing out the single CSV file as multiple CSV files.

>>> ddf = dd.read_csv("data/G1_1e8_1e2_0_0.csv")
>>> ddf.npartitions
81
>>> ddf.to_csv("data/csvs")
['data/csvs/00.part',
 'data/csvs/01.part',
 'data/csvs/02.part',
 ...
 'data/csvs/79.part',
 'data/csvs/80.part']

This will write out 81 files, one for each partition. Let’s read these 81 CSV files into a Dask DataFrame.

ddf = dd.read_csv("data/csvs/*.part")

Reading multiple files into a pandas DataFrame must be done sequentially and requires more code. Here’s the pandas syntax:

import glob
import pandas as pd

all_files = glob.glob("./data/csvs/*.part")
df = pd.concat((pd.read_csv(f) for f in all_files))

Parallel I/O is a huge strength of Dask compared to pandas. pandas is designed for read / write operations with single files. pandas does not scale well with multiple files. Dask is designed to perform I/O in parallel and is more performant than pandas for operations with multiple files (learn more).

The same blocksize and dtype arguments we discussed earlier for reading a single file also apply when reading multiple files.

Dask readers also make it easy to read data that’s stored in remote object data stores, like AWS S3.

Dask read_csv: analyzing remote files with localhost compute#

You can easily read a CSV file that’s stored in S3 to your local machine. Here’s how to read a public S3 file.

ddf = dd.read_csv("s3://coiled-datasets/timeseries/20-years/csv/0000.part")

You normally should not analyze remote data on your localhost machine because it’s slow to download the data locally. It’s more natural to process cloud data with cloud compute power. Files stored in AWS S3, for example, should be processed with AWS EC2 cloud provisioned compute instances.

Let’s look at a S3 folder with a lot of data. The s3://coiled-datasets/timeseries/20-years/csv/ S3 bucket has 1,095 files and requires 100 GB of memory when loaded into a DataFrame. Let’s try to run a computation on the entire dataset and see if it works.

ddf = dd.read_csv("s3://coiled-datasets/timeseries/20-years/csv/*.part")
ddf.describe().compute()

I let this computation run for 30 minutes before canceling the query. Running this locally is way too slow.

Let’s see how to read in this large dataset of CSVs to a Dask cluster that contains multiple compute nodes, so we can execute this query faster.

Dask read_csv: read remote files in cluster environment#

Let’s spin up a 5 node cluster with Coiled and try to run the same computation with more computing power.

import coiled

cluster = coiled.Cluster(
    n_workers=5,
    region="us-east-2" # same region as data, faster + cheaper
)
client = cluster.get_client()

ddf = dd.read_csv("s3://coiled-datasets/timeseries/20-years/csv/*.part")
ddf.describe().compute()

This computation runs in just under 2 minutes and costs ~$0.08 in AWS costs. Running this computation on a cluster is certainly faster than running on localhost. It would have taken a very long time for the query to finish on localhost.

Storing the data in a different file format could make the query run even faster.

Limitations of CSV file format#

CSV files are commonly used because they’re human readable and support columns with messy data types (like a column with both string and integer values, for example). However, they are usually not the best file format for a data analysis.

Here are the five reasons:

  • Parquet files don’t require schema inference / manual schema specification

  • Parquet files are easier to compress

  • Columnar nature of Parquet files allows for column pruning, which often yields big query performance gains

  • Row group metadata in Parquet files allows for predicate pushdown filtering

  • Parquet files are immutable

See the Dask docs for more details on the advantages of Parquet files.

Learn more#

In this post we went through how to work with CSV files using Dask. For more realistic use cases you might consider: