Xarray at Large Scale: A Beginner’s Guide

Parallel Xarray Can be Easy

You don’t need to be a distributed systems expert to process terabytes of geospatial data. The tools today are good enough to make this accessible to everyone, but most people still stay on a single computer, even though it’s often more expensive.

In fact the video below condenses 250TiB of data, and cost only $25 to produce over 20 minutes, while it would have taken thousands of dollars and weeks of download time on a single machine. Let’s walk through the common toolset used today by Python researchers to process large volumes of geospatial raster data.

What is Xarray?

Xarray is an open-source project and Python library that makes working with labeled multi-dimensional arrays simple, efficient, and fun. It’s well suited to work with raster geospatial data which are often used in climate and meteorological research, as well as other forms of environmental, oceanographic, and atmospheric data. Xarray brings the power of pandas to n-dimensional arrays, which makes it a great tool for handling complex datasets.

Key features of Xarray include:

  • N-Dimensional Data and Labels: Xarray manipulates multi-dimensional arrays like NumPy, but with dimension labels, making data manipulation and analysis more intuitive.

  • Pandas-like Operations: Xarray supports many of the same data manipulation operations as pandas, such as group-by, merge, and reshaping.

  • Data Readers for Common Formats like NetCDF, GeoTIFF, Zarr, HDF, and others.

  • Integration with Other Libraries: Xarray integrates well with other data analysis libraries like pandas, NumPy, and Matplotlib, making it a versatile tool for data science projects.

  • Parallel Computing: Xarray can leverage Dask to support parallel computing, allowing efficient analysis of very large datasets.

For a practical example, let’s go through reading a netCDF file and performing some simple analysis using Xarray:

import xarray as xr
ds = xr.open_dataset('example_data.nc')
mean_temperature = ds['temperature'].mean(dim='time')
mean_temperature.plot()

See the Xarray documentation for more introductory-level examples.

What is Dask?

Dask works with Xarray by providing a backend for parallel and out-of-core computation. This integration allows Xarray to handle datasets that are larger than the available memory, which is essential for processing the large, multi-dimensional arrays commonly used in scientific fields like climate science, oceanography, and geospatial analysis.

Integration Mechanism:

  • Chunking: When an Xarray dataset is loaded, it can be chunked into smaller parts using Dask. These chunks are essentially small, manageable blocks of the larger dataset.

  • Lazy Evaluation: Xarray operations on Dask-backed arrays become lazy. This means that computations are not executed immediately but are instead queued up as tasks in a Dask graph.

  • Task Graph: Each operation on the dataset creates a task in the Dask graph. This graph represents the sequence of operations needed to obtain the final result.

  • Parallel Computation: When a result is requested (e.g., for plotting, saving, or explicitly computing), Dask executes the task graph. The computations are carried out in parallel, with each chunk being processed independently. This parallel execution is key to handling large datasets efficiently.

  • Memory Efficiency: By working on small chunks, Dask ensures that the memory usage is kept in check, avoiding out-of-memory errors that would occur if the entire dataset were loaded into memory.

Practical Usage:

In practical terms, this means that you can work with Xarray in a way that feels familiar (similar to using in-memory NumPy arrays) but on much larger datasets. The Dask integration is transparent, so you don’t need to manage the parallelism or the chunking directly; Xarray and Dask handle these aspects behind the scenes. This makes it possible to write code that scales from small, in-memory datasets on a single machine to large datasets that are distributed across a cluster, without changing your code.

Overall, the synergy between Xarray and Dask empowers users to efficiently process and analyze large datasets that would otherwise be impractical or impossible to handle on standard hardware, making it a powerful combination for data-intensive scientific applications.

Example: How to load many NetCDF files with Xarray and Dask

To use Dask with Xarray, especially in a scenario where you want to open multiple netCDF files as a single dataset, you would use the xarray.open_mfdataset function with a chunks argument. This chunks argument specifies how the dataset should be divided into Dask chunks.

Here’s an example of how you might use xarray.open_mfdataset with Dask:

import xarray as xr

# Specify the path to your netCDF files (can use a wildcard for multiple files)
file_path = 'path/to/your/netcdf/files/*.nc'

# Open the dataset with Dask chunking
# The 'chunks' argument specifies the chunk size for each dimension
# For example, {'time': 10} would chunk the data along the time dimension in blocks of 10
ds = xr.open_mfdataset(file_path, chunks={'time': 10})

# At this point, computations on 'ds' will use Dask for parallel, out-of-core computation

This approach enables handling larger-than-memory datasets on your machine or scaling up to a cluster without changing your code significantly.

Deploy Dask and Xarray onto many machines

You can use Xarray with Dask to process large volumes of data on a single machine. This lets you process as much data as you can download and fit onto your hard drive. However, this has two limitations:

  1. You can only fit so much data on your hard drive (around 100 GiB probably)

  2. Downloading data can be slow and expensive

Often it is better instead to run your computations close to where your data lives. Often large datasets are stored either on HPC Network File Systems (NFS) or on cloud object stores, like AWS’s S3 or Google’s GCS. It’s best to run your computations on data where the data is stored for reasons of scale, cost, and speed.

Deploy Xarray and Dask on an HPC Cluster

It is easy to run Xarray and Dask on HPC machines using common job schedulers like SLURM, PBS, LSF, SGE, or others. Typically we recommend using the library dask-jobqueue for this.

Dask-jobqueue enables you to create and manage Dask clusters in HPC environments using common job schedulers like SLURM, PBS, SGE, or LSF. It allows dynamic scaling of Dask workers as HPC jobs, adhering to the HPC’s resource allocation and scheduling policies.

Steps to Deploy:

Install dask-jobqueue

pip install dask-jobqueue

Configure Dask Cluster

In your Python script, configure a Dask cluster that submits jobs via your HPC’s scheduler. For example, using SLURM:

from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(
    queue='regular', cores=24, memory='120GB', walltime='02:00:00'
)
client = cluster.get_client()

Scale the Cluster

Adjust the number of worker jobs based on your needs.

cluster.scale(jobs=10)

Run Xarray Computations

Perform your data analysis with Xarray, leveraging the Dask cluster for parallel processing.

import xarray as xr

ds = xr.open_dataset('data.nc', chunks={'time': 100})

result = ds.mean(dim='time').compute()

This setup allows you to efficiently utilize HPC resources for processing large datasets with Xarray and Dask, scaling from a single machine to multiple nodes in an HPC environment.

Deploy Xarray and Dask on AWS

Agencies like NASA, NOAA, USGS, and others are pumping massive datasets to Amazon and making them publicly available. Researchers are using this data today to ask questions like the following:

  • What does air pollution look like over the 9000 most populated cities on earth?

  • Where should we build factories to take advantage of cheap renewable energy while bypassing local utilities?

  • What is the capacity of China’s fleet of solar farms, and how has this been changing over time?

  • How much oil has been flowing out of Venezuela on tanker ships?

  • How is drought likely to affect grain production in Northern Africa?

This data is plentiful and freely available, provided you can run computations close to it.

Fortunately, there are several ways to run Xarray and Dask on AWS.

Dask-Cloudprovider

Summary: Automates the setup of Dask clusters on AWS, including options like AWS Fargate for serverless deployment. See the documentation.

Code Sample:

from dask_cloudprovider.aws import FargateCluster
from dask.distributed import Client

cluster = FargateCluster(n_workers=4)
client = Client(cluster)

Dask-Kubernetes

Summary: Ideal for deploying Dask within a Kubernetes environment on AWS (Amazon EKS), allowing integration with Kubernetes resources. See the documentation.

Code Sample:

from dask_kubernetes import KubeCluster
from dask.distributed import Client

cluster = KubeCluster.from_yaml('worker-spec.yml')
client = Client(cluster)

Coiled

Summary: Provides a managed service for Dask on AWS, handling cluster provisioning and management, suitable for users preferring a simplified approach. See the documentation.

Code Sample:

import coiled
from dask.distributed import Client

cluster = coiled.Cluster(n_workers=10)
client = Client(cluster)

Each method offers a different balance of control and convenience, allowing you to choose the one that best fits your project’s requirements and your level of expertise with AWS and Dask.

We of course like Coiled the best. It’s the only option that’s secure out of the box, avoids common costly mistakes like idle clusters or lingering network resources, and it’s also the easiest to set up. It’s nice knowing though that you can always go to OSS options if you prefer.

Should I use HPC or Cloud?

Often people want to know where they should run their computations. If you’re a user the answer is simple:

  1. Run wherever your IT admins are willing to give you resources 🙂

  2. If you have choices, then run wherever your data is. Data gravity is a big deal.

So if you’re processing data that just came out of a simulation on a big HPC machine, and the data is sitting right there on the NFS, then use HPC. If you’re processing data that is hosted on AWS by NASA or NOAA or ESA then please run on the cloud.

If you’re a data provider and you’re planning where to host data then the answer comes down to reuse:

  1. Is this dataset going to be generated once and then read once? If so, then host the data wherever it’s generated, often this is HPC.

  2. Is the dataset going to be generated once but then used many times, possibly by many different groups? If so, then the cloud might lower barriers to easier sharing and collaboration. There’s no data web portal you’ll be able to build which will be nearly as interoperable or accessible as just dumping raw files into an S3 bucket with requester pays public access.

How much does the cloud cost?

This varies by region and tier, but we’ll include some rule of thumb numbers below to store 1 TB of data:

  • Storage: $10-20

  • Egress: $100

  • Cloud Compute: $0.10

Storing large volumes of data in the cloud can be moderately expensive, but generally these costs don’t compare to, say, the costs of an FTE. Egress costs can be very substantial if the primary way that people access your data is to download it outside of the cloud, or to different regions than where the data is stored.

Once data is stored in the cloud though, it is incredibly cheap to process, at around the cost of $0.10 per 1 TB, if you do everything right. The cost to rent an EC2 instance in the same region, where accessing the data is fast and cheap.

If done correctly, processing cloud data is incredibly cheap.

Example: Process 250 TB for $25

As an example, we’ll include below some code that we ran to produce this video:

# Start a Coiled cluster
cluster = coiled.Cluster(
    name="nwm-1979-2020",
    region="us-east-1",  # close to data
    n_workers=200
)
client = cluster.get_client()

# Access 250 TB NOAA National Water Model in the cloud with Xarray
ds = xr.open_zarr(
    fsspec.get_mapper('s3://noaa-nwm-retrospective-2-1-zarr-pds/rtout.zarr', anon=True),
    consolidated=True,
    chunks={'time': 896, 'x': 350, 'y': 350},
).zwattablrt.sel(time=slice('1979-02-01', '2020-12-31')) # water table depth

# Merge on US counties
fs = fsspec.filesystem('s3', requester_pays=True)

counties = rioxarray.open_rasterio(
    fs.open('s3://nwm-250m-us-counties/Counties_on_250m_grid.tif'), chunks='auto'
).squeeze()

_, counties_aligned = xr.align(ds, counties, join='override')

# Calculate average US county water table depth
county_id = np.unique(counties_aligned.data).compute()

county_mean = flox.xarray.xarray_reduce(
    ds,
    counties_aligned.rename('county'),
    func='mean',
    expected_groups=(county_id,),
)

county_mean.load()

This example processes 250 TiB for $25 in 20 minutes on 200 cloud machines. If you have an AWS account you should be able to run it yourself (you’ll have to set up Coiled first, which takes around 5 minutes). You can read more on technical details in our blog post.

What is Pangeo?

When Xarray and Dask started to take off in the geoscience community Xarray and Dask maintainers got together and started Pangeo, a community organization for large scale geoscience development.

Since then the Pangeo community has grown to researchers and developers at dozens of organizations working on …

  • OSS software: with developers collaborating on libraries like Xarray, Dask, Zarr, and other projects.

  • community support including frequent showcase meetings describing technical or scientific research, as well as online forums to help new researchers

  • Distributed infrastructure with publicly hosted cloud infrastructure

One of the strongest aspects of Pangeo today is the community. We’ll include a few links below for further study:

Summary

Easy-to-use tools like Xarray and Dask represent a significant leap in data processing, making the analysis of extensive geospatial datasets feasible and affordable. By moving computations closer to where the data is stored, whether in HPC environments or cloud platforms like AWS, researchers can now handle vast datasets efficiently. This democratizes access to high-performance computing, allowing a broader range of researchers to engage in complex data analysis tasks without the burden of prohibitive costs or the need for specialized knowledge in distributed systems. As technology continues to evolve, it’s clear that tools like Xarray and Dask will play a crucial role in unlocking new possibilities in data-driven research.