Converting a Dask DataFrame to a pandas DataFrame

Matthew Powers

2021-10-01

3 min read

This post explains how to convert from a Dask DataFrame to a pandas DataFrame and when it’s a good idea to perform this operation.

Each partition in a Dask DataFrame is a pandas DataFrame. Converting from a Dask DataFrame to a pandas DataFrame combines multiple pandas DataFrames (partitions) into a single pandas DataFrame.

Dask DataFrames can store massive datasets, whereas pandas DataFrames must be smaller than the memory of a single computer. This means only small Dask DataFrames can be converted into pandas DataFrames.

This post shows the syntax to perform the conversion, the error message if the Dask DataFrame is too big, and how to assess if conversion is possible.

Convert from Dask to pandas on localhost

Start by creating a Dask DataFrame.

import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame(
    {"nums": [1, 2, 3, 4, 5, 6], "letters": ["a", "b", "c", "d", "e", "f"]}
)
ddf = dd.from_pandas(df, npartitions=2)

If we print the Dask DataFrame, we can see a ... instead of the actual values. This is because Dask DataFrame uses lazy evaluation and won’t compute the DataFrame until explicitly requested.

>>> print(ddf)
Dask DataFrame Structure:
                nums letters
npartitions=2
0              int64  string
3                ...     ...
5                ...     ...
Dask Name: frompandas, 1 expression
Expr=df

To convert the Dask DataFrame into a pandas DataFrame, you can call compute. This turns the lazy Dask DataFrame into an in-memory pandas DataFrame.

>>> pandas_df = ddf.compute()
>>> type(pandas_df)
pandas.core.frame.DataFrame

You can use type(pandas_df) which returns pandas.core.frame.DataFrame and confirms it’s a pandas DataFrame.

You can also print the pandas_df to visually inspect the DataFrame contents.

>>> print(pandas_df)
   nums letters
0     1       a
1     2       b
2     3       c
3     4       d
4     5       e
5     6       f

We’re easily able to convert a small Dask DataFrame to a pandas DataFrame on localhost (your local computer). Let’s try to convert a big Dask DataFrame to a pandas DataFrame on a cloud-based cluster.

Converting big datasets on the cloud

This Parquet dataset contains 662 million records and is 16.7 GiB when saved on disk. It’s stored on S3 in a public bucket s3://coiled-datasets/timeseries/20-years/parquet. This dataset is too big to fit in memory on my laptop and even if I had a local machine with enough memory, it would be prohibitively slow to download from cloud storage.

Dask and Coiled make it easy to work with large DataFrames in the cloud (learn more). Let’s create a 5 node Dask cluster on AWS with Coiled and read it into a Dask DataFrame.

import coiled
import dask.dataframe as dd

cluster = coiled.Cluster(
    n_workers=5,
    region="us-east-2" # same region as the data, faster + cheaper
)
client = cluster.get_client()

ddf = dd.read_parquet(
    "s3://coiled-datasets/timeseries/20-years/parquet",
    storage_options={"anon": True, "use_ssl": True},
)

The dataset contains 662 million records and uses 58 GB of memory. We won’t be able to convert this whole Dask DataFrame to a pandas DataFrame because it’s too big.

Let’s run a filtering operation and see if a subset of the Dask DataFrame can be converted to a pandas DataFrame.

>>> filtered_ddf = ddf.loc[ddf["id"] > 1150]
>>> filtered_ddf.compute()
                       id    name         x         y
timestamp
2000-01-09 01:52:30  1152   Edith  0.273674  0.997075
2000-01-29 17:22:59  1175  Oliver -0.909065  0.017086
2000-01-29 20:34:37  1158     Bob -0.910895  0.652333
2000-02-06 00:13:44  1152   Sarah  0.080475  0.855420
2000-02-08 05:23:59  1153   Kevin  0.258087 -0.144844
...                   ...     ...       ...       ...
2020-10-25 18:15:35  1175  George  0.060843  0.229963
2020-11-01 14:25:07  1153   Alice -0.537439 -0.544084
2020-11-20 02:30:17  1158  Oliver  0.733396  0.227974
2020-11-30 03:01:06  1155   Kevin -0.963094 -0.638443
2020-11-30 07:11:46  1163  Yvonne -0.671973 -0.700749

[1103 rows x 4 columns]

It takes 22 seconds to query this dataset. The filtered_ddf contains 1,103 rows and only uses 50 KB of memory. It’s a tiny subset of the original dataset that contains 662 million rows and can easily be converted to a pandas DataFrame with filtered_ddf.compute().

Just for fun, let’s try to convert the entire Dask DataFrame to pandas and see what happens.

Error when converting a big dataset

The entire Dask DataFrame takes 58 GB of memory, and we already know that’s too much data to be collected into a single pandas DataFrame. Each node in our cluster only has 16 GB of memory. You’d need a Dask DataFrame that’s smaller than the memory of a single node to collect the data in a pandas DataFrame, so ddf.compute() will error out.

>>> ddf.compute()
MemoryError: Task ('repartitiontofewer-2a0f640e5769ea9c1c31514c1d6e3358', 0) has 27.97 GiB worth of input dependencies, but worker tls://10.0.46.55:45469 has memory_limit set to 14.85 GiB.

Even if the Dask DataFrame is sufficiently small to be converted to a pandas DataFrame, you don’t necessarily want to perform the operation.

When to convert to pandas

Dask uses multiple cores to operate on datasets in parallel. Dask is often quicker than pandas, even for localhost workflows, because it uses all the cores of your machine.

For one-off tasks, it’s fine reverting to pandas, but it should generally be avoided when building production data pipelines, since pandas cannot scale when data sizes grow. Data pipelines that can scale up are more robust. Rebuilding large components of systems when datasets grow isn’t ideal.

Only convert to pandas for one-off tasks or when you’re sure datasets will stay small and you won’t have to deal with pandas scaling issues in the future.

Learn more

In this post we went through how to convert a Dask DataFrame to a pandas DataFrame. This works well when the dataset is small to begin with or it’s been reduced to a manageable size. For more realistic use cases you might consider: