MLOps with MLflow on Coiled

Run machine learning experiments on the cloud with Coiled.

We’ll show how to track GPU-accelerated PyTorch training experiments on the cloud. MLflow makes it straightforward to log parameters, metrics, and models during training, and Coiled makes it easy to run code on cloud hardware in parallel.

../_images/mlflow-pytorch.png

What is MLflow?

MLflow is a lightweight, open-source platform for managing machine learning workflows. It helps address common MLOps problems around model development, deployment, and management.

People tend to like MLflow for its simple interface and because it can run from anywhere, including a data scientist’s laptop. Though it’s common to run on cloud machines in cases where you want to operate:

  • On hardware that’s not available locally (e.g. a large GPU)

  • Across multiple machines in parallel

  • With data stored in the cloud (e.g. S3 bucket on AWS)

Here we focus on using MLflow for experiment tracking of a PyTorch model and Coiled for running model training on cloud GPUs in parallel.

We don’t go into details around deploying MLflow itself, which is covered in the MLflow docs here. You can deploy MLflow yourself, or use a managed service, here we just assume you have access to a running MLflow server.

Define Model

Let’s train a GPU-accelerated PyTorch model on the Fashion MNIST dataset. To start we’ll install the packages we need:

$ pip install torch torchvision torchmetrics mlflow coiled
$ conda create -n env python=3.11 pytorch torchvision torchmetrics mlflow coiled
$ conda activate env

Next, we’ll define our PyTorch model and how to load the dataset.

import torch
import torch.nn.functional as F
from torch import nn
from torch.utils.data import DataLoader
from torchmetrics import Accuracy
from torchvision import datasets
from torchvision.transforms import ToTensor

def get_data():
    # Create training data loader
    training_data = datasets.FashionMNIST(
        root="./data",
        train=True,
        download=True,
        transform=ToTensor(),
    )

    return DataLoader(training_data, batch_size=256, shuffle=True)


class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.conv1 = nn.Conv2d(1, 1024, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(1024, 16, 5)
        self.fc1 = nn.Linear(16 * 4 * 4, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 4 * 4)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

Train Model

Now that we have our dataset and model, let’s train the model over a few different configurations and log the results in MLflow.

Below is a train function that takes a learning rate hyperparameter as input, trains our model using that hyperparameter, and logs relevant information before, during, and after training to MLflow.

import mlflow
import mlflow.pytorch

def train(lr):
    # Select what hardware to use
    if torch.cuda.is_available():
        device = torch.device("cuda:0")      # NVIDIA GPU
    else:
        device = torch.device("cpu")         # CPU
    print(f"Using {device} for training")

    mlflow.set_experiment("pytorch-train")   # Set the MLflow experiment
    with mlflow.start_run():                 # Start a run within the experiment
        epochs = 10
        loss_fn = nn.CrossEntropyLoss()
        metric_fn = Accuracy(task="multiclass", num_classes=10).to(device)
        model = NeuralNetwork().to(device)
        optimizer = torch.optim.SGD(model.parameters(), lr=lr)

        train_dataloader = get_data()

        # Log model training parameters
        params = {
            "epochs": epochs,
            "learning_rate": learning_rate,
            "batch_size": train_dataloader.batch_size,
            "loss_function": str(loss_fn),
            "metric_function": str(metric_fn),
            "optimizer": "SGD",
            "device": str(device),
        }
        mlflow.log_params(params)
        for epoch in range(epochs):
            print(f"Epoch {epoch + 1}")
            model.train()
            for batch, (X, y) in enumerate(train_dataloader):
                X, y = X.to(device), y.to(device)

                pred = model(X)
                loss = loss_fn(pred, y)
                accuracy = metric_fn(pred, y)

                # Backpropagation
                loss.backward()
                optimizer.step()
                optimizer.zero_grad()

                if batch % 100 == 0:
                    # Log metrics during training
                    mlflow.log_metric("loss", f"{loss:3f}", step=batch//10)
                    mlflow.log_metric("accuracy", f"{accuracy:3f}", step=batch//10)
                    print(f"loss: {loss:3f} accuracy: {accuracy:3f} "
                          f"[{batch}/{len(train_dataloader)}]")

        # Save trained model
        mlflow.pytorch.log_model(model, "model")

        return loss.item()

The model training code above is a relatively vanilla PyTorch training loop. Actual model training details will vary depending on your use case, but the MLflow logging code will look the same. Let’s zoom in on the MLflow-specific parts.

MLflow Tracking

Tracking in MLflow is built around two main concepts: runs and experiments.

  • A run is a single execution of ML model code (e.g. train a PyTorch model with a learning rate of 0.01).

  • An experiment is a collection of runs that are part of a specific task (e.g. train a PyTorch model with several learning rates to find the optimal one).

Creating runs and experiments in MLflow is relatively straightforward and looks like this:

import mlflow

mlflow.set_experiment("pytorch-train")  # Set the MLflow experiment
with mlflow.start_run():                # Start a run within the experiment
    # Code for this run goes here
    ...

From within a run, MLflow has several methods for logging various parameters, metrics, and other artifacts.

# Log model training parameters
mlflow.log_params(params)
...
# Log metrics during training
mlflow.log_metric("loss", f"{loss:3f}", step=batch//10)
mlflow.log_metric("accuracy", f"{accuracy:3f}", step=batch//10)
...
# Save trained model
mlflow.pytorch.log_model(model, "model")

There are many things one might want to log with MLflow depending on the application. Here we focus on model parameters, training metrics, and the trained model itself.

Run on Cloud GPUs in Parallel

MLflow can be used from anywhere we want to train our model. Here’s we’ll run our model training on cloud machines using Coiled Functions to:

  • Train on hardware that’s not available locally (e.g. NVIDIA GPU)

  • Across multiple machines in parallel

To run our existing train function on the cloud, we add this @coiled.function decorator:

import coiled

@coiled.function(
    vm_type="g4dn.xlarge",   # NVIDIA Tesla T4 GPU on AWS
    environ={
        "MLFLOW_TRACKING_URI": "http://my-mlflow-server.com:8080",
        "MLFLOW_TRACKING_USERNAME": "my-username",
        "MLFLOW_TRACKING_PASSWORD": "my-password",
    },
)
def train(learning_rate):
    # Same training code as before
    ...

This will automatically handle provisioning a cloud VM (g4dn.xlarge instance on AWS in this case), installing the same software that’s installed locally on the remote VM, running our train function on the VM, and returning the result back locally.

Note that the cloud VMs will need to communicate with the MLflow server to log runs during training. MLflow supports setting MLFLOW_TRACKING_URI and (optional) MLFLOW_TRACKING_USERNAME / MLFLOW_TRACKING_PASSWORD environment variables for this, so we make sure those are securely set as secrets on the cloud VMs via the environ= parameter.

Finally, we want to train our model across several learning rates, all in parallel. We do this by using the Coiled Function .map method for automatic parallelization across multiple cloud machines:

learning_rates = [0.1, 0.05, 0.01, 0.005, 0.001, 0.0005, 0.0001]
results = train.map(learning_rates)
print(f"{list(results) = }")

Putting it All Together

Putting all the pieces together, here is the full workflow:

import coiled
import mlflow
import mlflow.pytorch
import torch
import torch.nn.functional as F
from torch import nn
from torch.utils.data import DataLoader
from torchmetrics import Accuracy
from torchvision import datasets
from torchvision.transforms import ToTensor

def get_data():
    # Create training data loader
    training_data = datasets.FashionMNIST(
        root="./data",
        train=True,
        download=True,
        transform=ToTensor(),
    )

    return DataLoader(training_data, batch_size=256, shuffle=True)


class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.conv1 = nn.Conv2d(1, 1024, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(1024, 16, 5)
        self.fc1 = nn.Linear(16 * 4 * 4, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 4 * 4)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


@coiled.function(
    vm_type="g4dn.xlarge",  # NVIDIA Tesla T4 GPU on AWS
    environ={
        "MLFLOW_TRACKING_URI": "http://my-mlflow-server.com:8080",
        "MLFLOW_TRACKING_USERNAME": "my-username",
        "MLFLOW_TRACKING_PASSWORD": "my-password",
    },
)
def train(learning_rate):
    # Select what hardware to use
    if torch.cuda.is_available():
        device = torch.device("cuda:0")  # NVIDIA GPU
    else:
        device = torch.device("cpu")  # CPU
    print(f"Using {device} for training")

    # Connect to MLflow server and set the experiment we're running
    mlflow.set_experiment("pytorch-train")
    with mlflow.start_run():
        epochs = 10
        loss_fn = nn.CrossEntropyLoss()
        metric_fn = Accuracy(task="multiclass", num_classes=10).to(device)
        model = NeuralNetwork().to(device)
        optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

        train_dataloader = get_data()

        # Log training parameters
        params = {
            "epochs": epochs,
            "learning_rate": learning_rate,
            "batch_size": train_dataloader.batch_size,
            "loss_function": str(loss_fn),
            "metric_function": str(metric_fn),
            "optimizer": "SGD",
            "device": str(device),
        }
        mlflow.log_params(params)

        for epoch in range(epochs):
            print(f"Epoch {epoch + 1}")
            model.train()
            for batch, (X, y) in enumerate(train_dataloader):
                X, y = X.to(device), y.to(device)

                pred = model(X)
                loss = loss_fn(pred, y)
                accuracy = metric_fn(pred, y)

                # Backpropagation
                loss.backward()
                optimizer.step()
                optimizer.zero_grad()

                if batch % 10 == 0:
                    # Log metrics during training
                    mlflow.log_metric("loss", f"{loss:3f}", step=batch//10)
                    mlflow.log_metric("accuracy", f"{accuracy:3f}", step=batch//10)
                    print(f"loss: {loss:3f} accuracy: {accuracy:3f} "
                          f"[{batch}/{len(train_dataloader)}]")

        # Save trained model
        mlflow.pytorch.log_model(model, "model")

        return loss.item()

learning_rates = [0.1, 0.05, 0.01, 0.005, 0.001, 0.0005, 0.0001]
results = train.map(learning_rates)
print(f"{list(results) = }")

It takes ~8 minutes to run (including VM spinup time) and costs ~$0.27 (running locally would have taken ~2.6 hours). We also see that GPU utilization is high, so we’re effectively using the available cloud hardware. Looking at the MLflow dashboard, we can see the parameter, metrics, and model logged during training.

GPU utilization throughout model training.

GPU utilization throughout model training. Utilization is typically high throughout, meaning we’re utilizing the available hardware well.

Training metrics shown in the MLflow UI.

Parameters, metrics, and models logged during training can be viewed in the MLflow UI.

Summary

We showed how to track GPU-accelerated PyTorch training experiments on the cloud. MLflow made it straightforward to log parameters, metrics, and models during training, and Coiled made it easy to run our code on cloud hardware in parallel.

Here are additional machine learning examples you might also be interested in: