Reducing Memory Pressure for Xarray + Dask Workloads#

Patrick Hoefler

2025-03-17

3 min read

Historically, Dask scheduled thousands of data-loading tasks at once, overloading the system. The new queuing mechanism prevents this, making Xarray workloads smoother and more reliable.

Screencast of the Dask dashboard running a satellite data computation.

As of Dask 2025.1.0, Dask-backed Xarray workloads are more memory-efficient. Processing satellite images from Microsoft Planetary Computer, for example, is much smoother.#

The Challenge: Overeager Scheduling of Data-Loading Tasks#

Dask’s scheduler aims to distribute workloads efficiently, but data-loading tasks—such as reading from Zarr, NetCDF, or remote storage—have historically been scheduled too aggressively.

Previously, Dask would immediately schedule all available data-loading tasks, causing workers to fetch large amounts of data before they were ready to process it. This led to excessive memory use, frequent spilling to disk, and even crashes.

Dask introduced root task queuing in 2022 to mitigate this issue by identifying and scheduling these tasks more cautiously. However, the detection mechanism was fragile and often failed in cases like:

  • Opening multiple Zarr stores

  • Loading many NetCDF files

  • Accessing data from Microsoft Planetary Computer

This meant that for many Xarray workloads, the memory footprint was unpredictable, leading to inefficient resource usage.

Example of the Problem#

In the following example, we open multiple Zarr stores and calculate moving averages:

import dask.array as da

zarr_paths = ["/tmp/1.zarr", "/tmp/2.zarr", "/tmp/3.zarr", "/tmp/4.zarr", "/tmp/5.zarr"]

arrs = [da.from_zarr(p) for p in zarr_paths]
arr = da.concatenate(arrs, axis=0)
weights = da.ones_like(arr, chunks=arr.chunks)
avg = da.sum(arr * weights, axis=(1, 2)) / da.sum(weights, axis=(1, 2))
avg.compute()

Tip

Run the following snippet to re-create the test dataset used in the example above:

import dask.array as da

zarr_paths = ["/tmp/1.zarr", "/tmp/2.zarr", "/tmp/3.zarr", "/tmp/4.zarr", "/tmp/5.zarr"]
for path in zarr_paths:
    arr = da.random.random((365, 1280, 2560), chunks=(10, -1, -1))
    da.to_zarr(arr, path)

We can see that Dask is quickly spilling to disk, because it’s overeagerly loading the source data.

Before the recent improvements, Dask would load all chunks into memory at once, slowing down workloads and even crashing due to memory issues. This was especially troublesome for workloads creating multiple datasets from different Zarr stores, like in the example above, or when loading large NetCDF files.

The Solution: More Robust Root Task Detection#

Dask 2025.01.0 introduces a more reliable mechanism for detecting and scheduling data-loading tasks. Dask explicitly marks data-loading tasks so the scheduler can manage them efficiently. Now, data is only loaded when it can be processed, keeping memory usage stable and preventing unnecessary disk writes.

Running the same example as above, memory on the cluster remains fairly constant and reliably low enough that we don’t run into spilling issues. This particular example also runs twice as fast since Dask is no longer constantly writing to and reading from disk.

Real World Implications#

This change has a number of real-world implications for Xarray users and users of related Python libraries. Most workloads opening NetCDF files will run into this issue one way or another at some point when scaling to larger workloads. Additionally, we’ve also seen this regularly occurring with users of satellite data through odc-stac. odc-stac implements a great interface to load satellite images, but the graph construction is prone to run into this problem as well.

This improved queuing mechanism removes these issues and provides a smoother and more reliable user experience when working with large-scale datasets.

There is no user-side change required. The new behavior is picked up automatically after upgrading Dask.

conda update dask xarray
pip install --upgrade "dask[complete]" xarray

For other recent improvements to using Xarray with Dask, you might consider: