XGBoost for Gradient Boosted Trees#

XGBoost is a library used for training gradient boosted machine learning models, and it has a Dask integration for distributed training on larger than memory dataset. This example walks through how to train an XGBoost model in parallel using Dask and Coiled.

About the Data#

In this example we will use a dataset that joins the Uber/Lyft dataset from the High-Volume For-Hire Services, with the NYC Taxi Zone Lookup Table.

This dataset has ~1.4 billion rows and uses ~75 GB of memory.

Create a Dask cluster#

Let’s create a Dask cluster that’s large enough to handle this dataset.

import coiled

cluster = coiled.Cluster(
    n_workers=50,
    region="us-east-2",  # Start workers in same region as data to minimize costs
)
client = cluster.get_client()

Load Data#

To start, let’s use Dask DataFrame’s read_parquet functionality to load our dataset from S3.

import dask.dataframe as dd

df = dd.read_parquet("s3://coiled-datasets/dask-xgboost-example/feature_table.parquet/")
df = df.categorize(columns=df.select_dtypes(include="category").columns.tolist()) # Categorize
df.head()
hvfhs_license_num PULocationID DOLocationID trip_miles trip_time tolls congestion_surcharge airport_fee wav_request_flag accessible_vehicle pickup_month pickup_dow pickup_hour PUBorough DOBorough PUSuperborough_DOSuperborough
0 HV0005 246 162 2.111 680 0.0 2.75 0.0 N True 2 3 19 Manhattan Manhattan Superborough 1-Superborough 1
1 HV0003 246 162 3.190 806 0.0 2.75 0.0 N False 2 3 20 Manhattan Manhattan Superborough 1-Superborough 1
2 HV0003 246 162 2.370 744 0.0 2.75 0.0 N False 2 3 21 Manhattan Manhattan Superborough 1-Superborough 1
3 HV0005 246 162 2.867 696 0.0 2.75 0.0 N True 2 4 5 Manhattan Manhattan Superborough 1-Superborough 1
4 HV0003 246 162 2.640 585 0.0 2.75 0.0 N False 2 4 6 Manhattan Manhattan Superborough 1-Superborough 1

In preparation for multiple rounds of model training, let’s load and cache our dataset into our cluster’s distributed memory using df.persist(). This allows us to avoid repeated expensive data loading steps when doing multiple computations on the same data.

df = df.persist()

Train Model#

Now let’s train an XGBoost model over several cross-validation folds to determine how well it performs. To do this we need to define a functions for splitting our dataset into multiple training and testing sets.

def make_cv_splits(n_folds):
    frac = [1 / n_folds] * n_folds
    splits = df.random_split(frac, shuffle=True)
    for i in range(n_folds):
        train = [splits[j] for j in range(n_folds) if j != i]
        test = splits[i]
        yield dd.concat(train), test

Finally we can hand our dataset off to XGBoost to get an average prediction score.

%%time

import dask.array as da
import xgboost
from dask_ml.metrics import mean_squared_error

scores = []
for i, (train, test) in enumerate(make_cv_splits(5)):
    print(f"Split #{i + 1} / 5 ...")
    y_train = train["trip_time"]
    X_train = train.drop(columns=["trip_time"])
    y_test = test["trip_time"]
    X_test = test.drop(columns=["trip_time"])

    d_train = xgboost.dask.DaskDMatrix(client, X_train, y_train, enable_categorical=True)
    model = xgboost.dask.train(
        client,
        {"tree_method": "hist"},
        d_train,
        num_boost_round=4,
        evals=[(d_train, "train")],
    )
    predictions = xgboost.dask.predict(client, model, X_test)

    score = mean_squared_error(
        y_test.to_dask_array(),
        predictions.to_dask_array(),
        squared=False,
        compute=False,
    )
    scores.append(score.reshape(1).persist())

scores = da.concatenate(scores).compute()
print(f"MSE score = {scores.mean()} +/- {scores.std()}")
Split #1 / 5 ...
Split #2 / 5 ...
Split #3 / 5 ...
Split #4 / 5 ...
Split #5 / 5 ...
MSE score = 451.48317015111104 +/- 0.2541167494547274
CPU times: user 11.6 s, sys: 1.08 s, total: 12.6 s
Wall time: 5min 59s

Conclusion#

XGBoost integrates natively with Dask, making it easy to train models on larger-than-memory dataset using familiar APIs. Coiled makes it easy to scale up computing resources by creating large Dask clusters on the cloud with just a few lines of code.

If you have a cloud account it’s easy to try this out yourself. The data is public and this computation costs less than $1.00 in AWS cloud costs. You can sign up under the Coiled free tier here (setup takes a couple minutes).