XGBoost for Gradient Boosted Trees#
XGBoost is a library used for training gradient boosted machine learning models, and it has a Dask integration for distributed training on larger than memory dataset. This example walks through how to train an XGBoost model in parallel using Dask and Coiled.
About the Data#
In this example we will use a dataset that joins the Uber/Lyft dataset from the High-Volume For-Hire Services, with the NYC Taxi Zone Lookup Table.
This dataset has ~1.4 billion rows and uses ~75 GB of memory.
Create a Dask cluster#
Let’s create a Dask cluster that’s large enough to handle this dataset.
import coiled
cluster = coiled.Cluster(
n_workers=50,
region="us-east-2", # Start workers in same region as data to minimize costs
)
client = cluster.get_client()
Load Data#
To start, let’s use Dask DataFrame’s read_parquet
functionality to load our dataset from S3.
import dask.dataframe as dd
df = dd.read_parquet("s3://coiled-datasets/dask-xgboost-example/feature_table.parquet/")
df = df.categorize(columns=df.select_dtypes(include="category").columns.tolist()) # Categorize
df.head()
hvfhs_license_num | PULocationID | DOLocationID | trip_miles | trip_time | tolls | congestion_surcharge | airport_fee | wav_request_flag | accessible_vehicle | pickup_month | pickup_dow | pickup_hour | PUBorough | DOBorough | PUSuperborough_DOSuperborough | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | HV0005 | 246 | 162 | 2.111 | 680 | 0.0 | 2.75 | 0.0 | N | True | 2 | 3 | 19 | Manhattan | Manhattan | Superborough 1-Superborough 1 |
1 | HV0003 | 246 | 162 | 3.190 | 806 | 0.0 | 2.75 | 0.0 | N | False | 2 | 3 | 20 | Manhattan | Manhattan | Superborough 1-Superborough 1 |
2 | HV0003 | 246 | 162 | 2.370 | 744 | 0.0 | 2.75 | 0.0 | N | False | 2 | 3 | 21 | Manhattan | Manhattan | Superborough 1-Superborough 1 |
3 | HV0005 | 246 | 162 | 2.867 | 696 | 0.0 | 2.75 | 0.0 | N | True | 2 | 4 | 5 | Manhattan | Manhattan | Superborough 1-Superborough 1 |
4 | HV0003 | 246 | 162 | 2.640 | 585 | 0.0 | 2.75 | 0.0 | N | False | 2 | 4 | 6 | Manhattan | Manhattan | Superborough 1-Superborough 1 |
In preparation for multiple rounds of model training, let’s load and cache our dataset into our cluster’s distributed memory using df.persist()
. This allows us to avoid repeated expensive data loading steps when doing multiple computations on the same data.
df = df.persist()
Train Model#
Now let’s train an XGBoost model over several cross-validation folds to determine how well it performs. To do this we need to define a functions for splitting our dataset into multiple training and testing sets.
def make_cv_splits(n_folds):
frac = [1 / n_folds] * n_folds
splits = df.random_split(frac, shuffle=True)
for i in range(n_folds):
train = [splits[j] for j in range(n_folds) if j != i]
test = splits[i]
yield dd.concat(train), test
Finally we can hand our dataset off to XGBoost to get an average prediction score.
%%time
import dask.array as da
import xgboost
from dask_ml.metrics import mean_squared_error
scores = []
for i, (train, test) in enumerate(make_cv_splits(5)):
print(f"Split #{i + 1} / 5 ...")
y_train = train["trip_time"]
X_train = train.drop(columns=["trip_time"])
y_test = test["trip_time"]
X_test = test.drop(columns=["trip_time"])
d_train = xgboost.dask.DaskDMatrix(client, X_train, y_train, enable_categorical=True)
model = xgboost.dask.train(
client,
{"tree_method": "hist"},
d_train,
num_boost_round=4,
evals=[(d_train, "train")],
)
predictions = xgboost.dask.predict(client, model, X_test)
score = mean_squared_error(
y_test.to_dask_array(),
predictions.to_dask_array(),
squared=False,
compute=False,
)
scores.append(score.reshape(1).persist())
scores = da.concatenate(scores).compute()
print(f"MSE score = {scores.mean()} +/- {scores.std()}")
Split #1 / 5 ...
Split #2 / 5 ...
Split #3 / 5 ...
Split #4 / 5 ...
Split #5 / 5 ...
MSE score = 451.48317015111104 +/- 0.2541167494547274
CPU times: user 11.6 s, sys: 1.08 s, total: 12.6 s
Wall time: 5min 59s
Conclusion#
XGBoost integrates natively with Dask, making it easy to train models on larger-than-memory dataset using familiar APIs. Coiled makes it easy to scale up computing resources by creating large Dask clusters on the cloud with just a few lines of code.
If you have a cloud account it’s easy to try this out yourself. The data is public and this computation costs less than $1.00 in AWS cloud costs. You can sign up under the Coiled free tier here (setup takes a couple minutes).