開發者指南 / 使用 PyTorch 進行多 GPU 分散式訓練

使用 PyTorch 進行多 GPU 分散式訓練

作者: fchollet
建立日期 2023/06/29
上次修改日期 2023/06/29
說明: 使用 PyTorch 進行 Keras 模型多 GPU 訓練指南。

在 Colab 中檢視 GitHub 來源


簡介

一般而言,有兩種方式可以在多個裝置上分散計算

資料平行處理,其中單一模型會在多個裝置或多台機器上複製。它們各自處理不同的資料批次,然後合併它們的結果。這種設定有許多變體,它們在不同的模型複本如何合併結果、是否在每個批次都保持同步,或者它們是否更鬆散地耦合等方面有所不同。

模型平行處理,其中單一模型的不同部分在不同的裝置上執行,共同處理單一批次的資料。這對於具有自然平行架構的模型最有效,例如具有多個分支的模型。

本指南著重於資料平行處理,特別是同步資料平行處理,其中模型的不同複本在處理每個批次後保持同步。同步性使模型收斂行為與您在單裝置訓練中看到的相同。

具體來說,本指南將教您如何使用 PyTorch 的 DistributedDataParallel 模組包裝器,在最少更改程式碼的情況下,在單一機器上安裝的多個 GPU(通常是 2 到 16 個)(單一主機、多裝置訓練)上訓練 Keras。這是研究人員和小規模產業工作流程中最常見的設定。


設定

讓我們首先定義建立我們要訓練的模型之函數,以及建立我們要訓練的資料集之函數(在本例中為 MNIST)。

import os

os.environ["KERAS_BACKEND"] = "torch"

import torch
import numpy as np
import keras


def get_model():
    # Make a simple convnet with batch normalization and dropout.
    inputs = keras.Input(shape=(28, 28, 1))
    x = keras.layers.Rescaling(1.0 / 255.0)(inputs)
    x = keras.layers.Conv2D(filters=12, kernel_size=3, padding="same", use_bias=False)(
        x
    )
    x = keras.layers.BatchNormalization(scale=False, center=True)(x)
    x = keras.layers.ReLU()(x)
    x = keras.layers.Conv2D(
        filters=24,
        kernel_size=6,
        use_bias=False,
        strides=2,
    )(x)
    x = keras.layers.BatchNormalization(scale=False, center=True)(x)
    x = keras.layers.ReLU()(x)
    x = keras.layers.Conv2D(
        filters=32,
        kernel_size=6,
        padding="same",
        strides=2,
        name="large_k",
    )(x)
    x = keras.layers.BatchNormalization(scale=False, center=True)(x)
    x = keras.layers.ReLU()(x)
    x = keras.layers.GlobalAveragePooling2D()(x)
    x = keras.layers.Dense(256, activation="relu")(x)
    x = keras.layers.Dropout(0.5)(x)
    outputs = keras.layers.Dense(10)(x)
    model = keras.Model(inputs, outputs)
    return model


def get_dataset():
    # Load the data and split it between train and test sets
    (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

    # Scale images to the [0, 1] range
    x_train = x_train.astype("float32")
    x_test = x_test.astype("float32")
    # Make sure images have shape (28, 28, 1)
    x_train = np.expand_dims(x_train, -1)
    x_test = np.expand_dims(x_test, -1)
    print("x_train shape:", x_train.shape)

    # Create a TensorDataset
    dataset = torch.utils.data.TensorDataset(
        torch.from_numpy(x_train), torch.from_numpy(y_train)
    )
    return dataset

接下來,讓我們定義一個針對 GPU 的簡單 PyTorch 訓練迴圈(請注意對 .cuda() 的呼叫)。

def train_model(model, dataloader, num_epochs, optimizer, loss_fn):
    for epoch in range(num_epochs):
        running_loss = 0.0
        running_loss_count = 0
        for batch_idx, (inputs, targets) in enumerate(dataloader):
            inputs = inputs.cuda(non_blocking=True)
            targets = targets.cuda(non_blocking=True)

            # Forward pass
            outputs = model(inputs)
            loss = loss_fn(outputs, targets)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            running_loss_count += 1

        # Print loss statistics
        print(
            f"Epoch {epoch + 1}/{num_epochs}, "
            f"Loss: {running_loss / running_loss_count}"
        )

單一主機、多裝置同步訓練

在此設定中,您有一台機器,其中包含多個 GPU(通常是 2 到 16 個)。每個裝置都會執行您模型的副本(稱為複本)。為簡單起見,在下文中,我們假設我們正在處理 8 個 GPU,這並不失一般性。

運作方式

在訓練的每個步驟中

  • 目前的資料批次(稱為全域批次)會分割成 8 個不同的子批次(稱為局部批次)。例如,如果全域批次有 512 個樣本,則 8 個局部批次中的每一個都將有 64 個樣本。
  • 8 個複本中的每一個都會獨立處理局部批次:它們執行前向傳遞,然後執行後向傳遞,輸出權重相對於模型在局部批次上的損失的梯度。
  • 來自局部梯度的權重更新會在 8 個複本之間有效合併。因為這是每個步驟結束時完成的,所以複本始終保持同步。

實際上,同步更新模型複本權重的過程是在每個單獨的權重變數層級上處理的。這是透過鏡像變數物件完成的。

如何使用它

要使用 Keras 模型進行單一主機、多裝置同步訓練,您可以使用 torch.nn.parallel.DistributedDataParallel 模組包裝器。以下是其運作方式

  • 我們使用 torch.multiprocessing.start_processes 來啟動多個 Python 程序,每個裝置一個程序。每個程序都會執行 per_device_launch_fn 函數。
  • per_device_launch_fn 函數會執行以下操作: - 它使用 torch.distributed.init_process_grouptorch.cuda.set_device 來設定要用於該程序的裝置。 - 它使用 torch.utils.data.distributed.DistributedSamplertorch.utils.data.DataLoader 將我們的資料轉換為分散式資料載入器。 - 它還使用 torch.nn.parallel.DistributedDataParallel 將我們的模型轉換為分散式 PyTorch 模組。 - 然後,它會呼叫 train_model 函數。
  • 然後,train_model 函數將在每個程序中執行,模型在每個程序中使用不同的裝置。

以下是流程,其中每個步驟都分割成其自身的實用函數

# Config
num_gpu = torch.cuda.device_count()
num_epochs = 2
batch_size = 64
print(f"Running on {num_gpu} GPUs")


def setup_device(current_gpu_index, num_gpus):
    # Device setup
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "56492"
    device = torch.device("cuda:{}".format(current_gpu_index))
    torch.distributed.init_process_group(
        backend="nccl",
        init_method="env://",
        world_size=num_gpus,
        rank=current_gpu_index,
    )
    torch.cuda.set_device(device)


def cleanup():
    torch.distributed.destroy_process_group()


def prepare_dataloader(dataset, current_gpu_index, num_gpus, batch_size):
    sampler = torch.utils.data.distributed.DistributedSampler(
        dataset,
        num_replicas=num_gpus,
        rank=current_gpu_index,
        shuffle=False,
    )
    dataloader = torch.utils.data.DataLoader(
        dataset,
        sampler=sampler,
        batch_size=batch_size,
        shuffle=False,
    )
    return dataloader


def per_device_launch_fn(current_gpu_index, num_gpu):
    # Setup the process groups
    setup_device(current_gpu_index, num_gpu)

    dataset = get_dataset()
    model = get_model()

    # prepare the dataloader
    dataloader = prepare_dataloader(dataset, current_gpu_index, num_gpu, batch_size)

    # Instantiate the torch optimizer
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

    # Instantiate the torch loss function
    loss_fn = torch.nn.CrossEntropyLoss()

    # Put model on device
    model = model.to(current_gpu_index)
    ddp_model = torch.nn.parallel.DistributedDataParallel(
        model, device_ids=[current_gpu_index], output_device=current_gpu_index
    )

    train_model(ddp_model, dataloader, num_epochs, optimizer, loss_fn)

    cleanup()
Running on 0 GPUs

/opt/conda/envs/keras-torch/lib/python3.10/site-packages/torch/cuda/__init__.py:611: UserWarning: Can't initialize NVML
  warnings.warn("Can't initialize NVML")

是時候啟動多個程序了

if __name__ == "__main__":
    # We use the "fork" method rather than "spawn" to support notebooks
    torch.multiprocessing.start_processes(
        per_device_launch_fn,
        args=(num_gpu,),
        nprocs=num_gpu,
        join=True,
        start_method="fork",
    )

就是這樣!