Pandas parallel apply and map with Dask DataFrame#

Pavithra Eswaramoorthy

2021-11-22

3 min read

Dask DataFrame can speed up pandas apply() and map() operations by running in parallel across all cores on a single machine or a cluster of machines.

Dask DataFrame helps you quickly scale your single-core pandas code, while keeping the API familiar. The map() and apply() functions are at the core of data manipulation with pandas. In this post, we’ll take a closer look at how you can perform these operations efficiently with Dask DataFrame.

In particular:

  • Dask’s map_partitions() function

  • Dask’s implementation of pandas parallel apply() and map()

Quick overview of pandas apply() and map()#

You can use pandas’ apply() function to apply any inbuilt or custom Python function across a pandas one-dimensional array (for example, a Series or a single dimension of a DataFrame). This can be applied across columns (axis=0), or rows (axis=1).

Consider a pandas DataFrame with 10 rows and 2 columns: a and b, where each element in the DataFrame is a random integer:

>>> import pandas as pd
>>> import numpy as np

>>> df = pd.DataFrame(np.random.randint(0, 20, size=(10, 2)), columns=['a', 'b'])
>>> print(df)
    a   b
0  15   3
1   9  18
2   5  12
3  14   9
4  12  15
5   9   9
6  15   5
7   9   3
8  10  10
9  14  11

Also, consider a function minmax() that sleeps for 1 second and returns the difference between the largest and smallest value:

from time import sleep

def minmax(x):
    sleep(1)
    return x.max() - x.min()

We’ll use apply() to do the minmax() operation on each row in the DataFrame with df.apply(minmax, axis=1):

>>> %time df.apply(minmax, axis=1)
CPU times: user 10.3 ms, sys: 1.37 ms, total: 11.7 ms
Wall time: 10 s
0    12
1     9
2     7
3     5
4     3
5     0
6    10
7     6
8     0
9     3

This computation takes ~10s because we have 10 rows.

You can use map() for element-wise operations across a pandas DataFrame. Consider a function inc that sleeps for 1s and returns the input incremented by 1:

def inc(i):
    sleep(1)
    return i+1

We’ll use map() to do this inc() operation all values in the pandas DataFrame:

>>> %time df.a.map(inc)
CPU times: user 11.2 ms, sys: 1.66 ms, total: 12.9 ms
Wall time: 20.1 s
    a   b
0  16   4
1  10  19
2   6  13
3  15  10
4  13  16
5  10  10
6  16   6
7  10   4
8  11  11
9  15  12

This computation takes ~20s because we have 20 values.

Parallel pandas apply with Dask#

A Dask DataFrame consists of multiple pandas DataFrames, and each pandas DataFrame is called a partition. This mechanism allows you to work with larger-than-memory data because your computations are distributed across these pandas DataFrames and can be executed in parallel. This includes the apply() and map() computations.

https://docs.dask.org/en/stable/_images/dask-dataframe.svg
import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=5)  # Dask DataFrame with 5 partitions

We can manually access and work on these individual partitions using Dask’s DataFrame.apply function, which is like running a pandas.DataFrame.apply in parallel.

Using the same minmax() function we defined earlier, we can apply this to our Dask DataFrame in parallel.

>>> result = ddf.apply(minmax, axis=1, meta=(None, 'int64'))
>>> %time result.compute()
CPU times: user 9.23 ms, sys: 1.66 ms, total: 10.9 ms
Wall time: 2.01 s
0    12
1     9
2     7
3     5
4     3
5     0
6    10
7     6
8     0
9     3

Note

A number of Dask DataFrame methods have an important keyword argument called meta, which describes the structure of the expected output of your computation. In this example we passed meta=(None, 'int64') since the result of minmax is a Series of int64 values. See Understanding Dask’s meta keyword argument to learn more.

Dask is nearly 5x faster than pandas. This is because Dask applies the minmax function in parallel across each partition and there are five partitions in the Dask DataFrame. There’s a small amount of overhead from Dask, which is why it’s not exactly 5x faster.

Similar to pandas DataFrame.map we can also use Dask’s DataFrame.map to apply an element-wise function across a DataFrame.

>>> result = ddf.map(inc, meta=df)
>>> %time result.compute()
CPU times: user 9.56 ms, sys: 2.21 ms, total: 11.8 ms
Wall time: 4.03 s
    a   b
0  16   4
1  10  19
2   6  13
3  15  10
4  13  16
5  10  10
6  16   6
7  10   4
8  11  11
9  15  12

Dask’s DataFrame.map is ~5x faster than using pandas since it is able to apply the inc function in parallel across the DataFrame partitions.

Dask map_partitions#

In addition to Dask’s apply and map functions, there is also Dask’s DataFrame.map_partitions which you can use to apply any Python function on each DataFrame partition in parallel. This is very similar to Dask’s DataFrame.apply; the main advantage is it can offer additional flexibility in some cases.

For example, we can define a function that does a row-wise sum operation and returns a Series:

def myadd(df, x, y=1):
    return df.a + df.b + x + y

We can use map_partitions to apply this function, using the Dask DataFrame defined above:

>>> result = ddf.map_partitions(myadd, 2, y=3, meta=(None, 'int64'))
>>> result.compute()
0   23
1   32
2   22
3   28
4   32
5   23
6   25
7   17
8   25
9   30

Learn more#

In this post we went through how to perform a parallel pandas apply operation with Dask DataFrame. For more realistic use cases you might consider: