Scaling XGBoost with Dask and Coiled#

XGBoost is a library used for training gradient boosted supervised machine learning models, and it has a Dask integration for distributed training. In this guide, you’ll learn how to train an XGBoost model in parallel using Dask and Coiled. Download this jupyter notebook to follow along.

Before you start#

You’ll first need install the necessary packages. For the purposes of this example, we’ll do this in a new virtual environment, but you could also install them in whatever environment you’re already using for your project.

$ conda create -n dask-xgboost-example -c conda-forge python=3.10 dask coiled s3fs pyarrow dask-ml
$ conda activate dask-xgboost-example
$ pip install xgboost

You also could use pip for everything, or any other package manager you prefer; conda isn’t required.

When you create a cluster, Coiled will automatically replicate your local dask-xgboost-example environment to your cluster.

About the Data#

In this example we will use a dataset that the Coiled team created by pre-processing the Uber/Lyft dataset from the High-Volume For-Hire Services, joined it with the NYC Taxi Zone Lookup Table. This results in a dataset with ~1.4 billion rows.

About the Model#

We will be training a single XGBoost model with Dask using the xgboost.dask module built into XGBoost. In this notebook we:

  • Load the data

  • Perform basic feature engineering (date type optimization, categorization)

  • Train a single model with XGBoost, using custom cross-validation

Get a Coiled Cluster#

To start we need to spin up a Dask cluster.

Note

The total amount of RAM needed on the cluster is directly proportional to the total size of the training dataset. Users should expect large amounts of unmanaged memory during the training phase, consumed by the heap of the training tasks. This heap memory requirements are directly proportional to total dataset size / number of workers. This is unlike most other dask workflows, where heap size is proportional to partition size * threads per worker.

import coiled

cluster = coiled.Cluster(
    n_workers=50,
    name="dask-xgboost",
    worker_vm_types=["r6i.large"],
    scheduler_vm_types=["m6i.large"],
    backend_options={"region": "us-east-2"},  # match data zone
)

and then connect Dask to your remote Coiled cluster.

from dask.distributed import Client

client = Client(cluster)
client

Load the Data#

import dask.dataframe as dd

ddf = dd.read_parquet(
    "s3://coiled-datasets/dask-xgboost-example/feature_table.parquet/"
)

Basic Feature Engineering#

This dataset is pretty polished, but there are few details to take care of that are specific to the data types.

import numpy as np

# Under the hood, XGBoost converts floats to `float32`.
# Let's do it only once here.
float_cols = ddf.select_dtypes(include="float").columns.tolist()
ddf = ddf.astype({c: np.float32 for c in float_cols})
# We need the categories to be known
categorical_vars = ddf.select_dtypes(include="category").columns.tolist()

# categorize() reads the whole input and then discards it.
# Let's read from disk only once.
ddf = ddf.persist()
ddf = ddf.categorize(columns=categorical_vars)
# We will need to access this multiple times. Let's persist it.
ddf = ddf.persist()

Custom cross-validation#

In this example we show you how you can use a custom cross-validation function such as:

# Number of folds determines the train/test split
# (e.g. N_FOLDS=5 -> train=4/5 of the total data, test=1/5)
N_FOLDS = 5


def make_cv_splits(n_folds = N_FOLDS):
    frac = [1 / n_folds] * n_folds
    splits = ddf.random_split(frac, shuffle=True)
    for i in range(n_folds):
        train = [splits[j] for j in range(n_folds) if j != i]
        test = splits[i]
        yield dd.concat(train), test

Train Model#

When using XGBoost with Dask, we need to call the XGBoost Dask interface from the client side. The main difference with XGBoost’s Dask interface is that we pass our Dask client as an additional argument for carrying out the computation. Note that if the client parameter below is set to None, XGBoost will use the default client returned by Dask.

from datetime import datetime

import dask.array as da
import xgboost
from dask_ml.metrics import mean_squared_error

start = datetime.now()
scores = []

for i, (train, test) in enumerate(make_cv_splits()):
    print(f"Training/Test split #{i}")
    y_train = train["trip_time"]
    X_train = train.drop(columns=["trip_time"])
    y_test = test["trip_time"]
    X_test = test.drop(columns=["trip_time"])

    print("Building DMatrix...")
    d_train = xgboost.dask.DaskDMatrix(None, X_train, y_train, enable_categorical=True)

    print("Training model...")
    model = xgboost.dask.train(
        None,
        {"tree_method": "hist"},
        d_train,
        num_boost_round=4,
        evals=[(d_train, "train")],
    )

    print("Running model on test data...")
    predictions = xgboost.dask.predict(None, model, X_test)

    print("Measuring accuracy of model vs. ground truth...")
    score = mean_squared_error(
        y_test.to_dask_array(),
        predictions.to_dask_array(),
        squared=False,
        compute=False,
    )
    # Compute predictions and mean squared error for this iteration
    # while we start the next one
    scores.append(score.reshape(1).persist())
    print("-" * 80)

scores = da.concatenate(scores).compute()
print(f"RSME={scores.mean()} +/- {scores.std()}")
print(f"Total time:  {datetime.now() - start}")
Training/Test split #0
Building DMatrix...
Training model...
Running model on test data...
Measuring accuracy of model vs. ground truth...
--------------------------------------------------------------------------------
Training/Test split #1
Building DMatrix...
Training model...
Running model on test data...
Measuring accuracy of model vs. ground truth...
--------------------------------------------------------------------------------
Training/Test split #2
Building DMatrix...
Training model...
Running model on test data...
Measuring accuracy of model vs. ground truth...
--------------------------------------------------------------------------------
Training/Test split #3
Building DMatrix...
Training model...
Running model on test data...
Measuring accuracy of model vs. ground truth...
--------------------------------------------------------------------------------
Training/Test split #4
Building DMatrix...
Training model...
Running model on test data...
Measuring accuracy of model vs. ground truth...
--------------------------------------------------------------------------------
RSME=452.76386874699745 +/- 0.31916762874476584
Total time:  0:07:43.182956
# Inspect our model
model
{'booster': <xgboost.core.Booster at 0x284455de0>,
 'history': {'train': OrderedDict([('rmse',
                [942.7679525904991,
                 704.6123860747031,
                 549.255274388161,
                 451.5426385498415])])}}

Clean up#

cluster.close()