XGBoost#
XGBoost is a library used for training gradient boosted supervised machine learning models, and it has a Dask integration for distributed training. In this guide, you’ll learn how to train an XGBoost model in parallel using Dask and Coiled. Download this jupyter notebook
to follow along.
Before you start#
You’ll first need install the necessary packages. For the purposes of this example, we’ll do this in a new virtual environment, but you could also install them in whatever environment you’re already using for your project.
$ conda create -n dask-xgboost-example -c conda-forge python=3.10 dask coiled s3fs pyarrow dask-ml
$ conda activate dask-xgboost-example
$ pip install xgboost
You also could use pip
for everything, or any other package manager you prefer; conda
isn’t required.
When you create a cluster, Coiled will automatically replicate your local dask-xgboost-example
environment to your cluster.
About the Data#
In this example we will use a dataset that the Coiled team created by pre-processing the Uber/Lyft dataset from the High-Volume For-Hire Services, joined it with the NYC Taxi Zone Lookup Table. This results in a dataset with ~1.4 billion rows.
About the Model#
We will be training a single XGBoost model with Dask using the xgboost.dask
module built into XGBoost. In this notebook we:
Load the data
Perform basic feature engineering (date type optimization, categorization)
Train a single model with XGBoost, using custom cross-validation
Get a Coiled Cluster#
To start we need to spin up a Dask cluster.
Note
The total amount of RAM needed on the cluster is directly proportional to the total size of the training dataset. Users should expect large amounts of unmanaged memory during the training phase, consumed by the heap of the training tasks. This heap memory requirements are directly proportional to total dataset size / number of workers. This is unlike most other dask workflows, where heap size is proportional to partition size * threads per worker.
import coiled
cluster = coiled.Cluster(
n_workers=50,
name="dask-xgboost", # use region close to data to avoid egress charges
region = "us-east-2",
worker_vm_types=["r6i.large"], # memory-optimized instance types for AWS
scheduler_vm_types=["m6i.large"],
)
client = cluster.get_client()
Basic Feature Engineering#
This dataset is pretty polished, but there are few details to take care of that are specific to the data types.
import numpy as np
import dask.dataframe as dd
# load the dataset
ddf = dd.read_parquet(
"s3://coiled-datasets/dask-xgboost-example/feature_table.parquet/"
)
# Under the hood, XGBoost converts floats to `float32`.
# Let's do it only once here.
float_cols = ddf.select_dtypes(include="float").columns.tolist()
ddf = ddf.astype({c: np.float32 for c in float_cols})
# We need the categories to be known
categorical_vars = ddf.select_dtypes(include="category").columns.tolist()
# categorize() reads the whole input and then discards it.
# Let's read from disk only once.
ddf = ddf.persist()
ddf = ddf.categorize(columns=categorical_vars)
# We will need to access this multiple times. Let's persist it.
ddf = ddf.persist()
Custom cross-validation#
In this example we show you how you can use a custom cross-validation function such as:
# Number of folds determines the train/test split
# (e.g. N_FOLDS=5 -> train=4/5 of the total data, test=1/5)
N_FOLDS = 5
def make_cv_splits(n_folds = N_FOLDS):
frac = [1 / n_folds] * n_folds
splits = ddf.random_split(frac, shuffle=True)
for i in range(n_folds):
train = [splits[j] for j in range(n_folds) if j != i]
test = splits[i]
yield dd.concat(train), test
Train Model#
When using XGBoost with Dask, we need to call the XGBoost Dask interface from the client side. The main difference with XGBoost’s Dask interface is that we pass our Dask client as an additional argument for carrying out the computation. Note that if the client
parameter below is set to None
, XGBoost will use the default client returned by Dask.
from datetime import datetime
import dask.array as da
import xgboost
from dask_ml.metrics import mean_squared_error
start = datetime.now()
scores = []
for i, (train, test) in enumerate(make_cv_splits()):
print(f"Training/Test split #{i}")
y_train = train["trip_time"]
X_train = train.drop(columns=["trip_time"])
y_test = test["trip_time"]
X_test = test.drop(columns=["trip_time"])
print("Building DMatrix...")
d_train = xgboost.dask.DaskDMatrix(None, X_train, y_train, enable_categorical=True)
print("Training model...")
model = xgboost.dask.train(
None,
{"tree_method": "hist"},
d_train,
num_boost_round=4,
evals=[(d_train, "train")],
)
print("Running model on test data...")
predictions = xgboost.dask.predict(None, model, X_test)
print("Measuring accuracy of model vs. ground truth...")
score = mean_squared_error(
y_test.to_dask_array(),
predictions.to_dask_array(),
squared=False,
compute=False,
)
# Compute predictions and mean squared error for this iteration
# while we start the next one
scores.append(score.reshape(1).persist())
print("-" * 80)
scores = da.concatenate(scores).compute()
print(f"RSME={scores.mean()} +/- {scores.std()}")
print(f"Total time: {datetime.now() - start}")
Training/Test split #0
Building DMatrix...
Training model...
Running model on test data...
Measuring accuracy of model vs. ground truth...
--------------------------------------------------------------------------------
Training/Test split #1
Building DMatrix...
Training model...
Running model on test data...
Measuring accuracy of model vs. ground truth...
--------------------------------------------------------------------------------
Training/Test split #2
Building DMatrix...
Training model...
Running model on test data...
Measuring accuracy of model vs. ground truth...
--------------------------------------------------------------------------------
Training/Test split #3
Building DMatrix...
Training model...
Running model on test data...
Measuring accuracy of model vs. ground truth...
--------------------------------------------------------------------------------
Training/Test split #4
Building DMatrix...
Training model...
Running model on test data...
Measuring accuracy of model vs. ground truth...
--------------------------------------------------------------------------------
RSME=452.76386874699745 +/- 0.31916762874476584
Total time: 0:07:43.182956
# Inspect our model
model
{'booster': <xgboost.core.Booster at 0x284455de0>,
'history': {'train': OrderedDict([('rmse',
[942.7679525904991,
704.6123860747031,
549.255274388161,
451.5426385498415])])}}
Clean up#
cluster.shutdown()