作者: Anshuman Mishra
建立日期 2023/07/07
上次修改日期 2023/07/07
說明: 使用 KerasHub 和 tf.distribute 進行資料平行訓練。
分散式訓練是一種用於同時在多個裝置或機器上訓練深度學習模型的技術。它有助於縮短訓練時間,並允許使用更多資料訓練更大的模型。KerasHub 是一個提供自然語言處理任務工具和公用程式的函式庫,包括分散式訓練。
在本教學中,我們將使用 KerasHub 在 wikitext-2 資料集(一個包含 200 萬個單字的維基百科文章資料集)上訓練基於 BERT 的遮罩語言模型 (MLM)。MLM 任務涉及預測句子中被遮罩的單字,這有助於模型學習單字的上下文表示。
本指南重點介紹資料平行處理,特別是同步資料平行處理,其中每個加速器(GPU 或 TPU)都持有模型的完整副本,並看到輸入資料的不同部分批次。部分梯度在每個裝置上計算,聚合,並用於計算全域梯度更新。
具體而言,本指南教您如何使用 tf.distribute
API 在多個 GPU 上訓練 Keras 模型,只需對您的程式碼進行最少的更改,在以下兩種設定中進行
!pip install -q --upgrade keras-hub
!pip install -q --upgrade keras # Upgrade to Keras 3.
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import tensorflow as tf
import keras
import keras_hub
在我們開始任何訓練之前,讓我們將單一 GPU 設定為顯示為兩個邏輯裝置。
當您使用兩個或更多實體 GPU 進行訓練時,這完全沒有必要。這只是一個技巧,可以在預設的 colab GPU 執行階段上顯示真正的分散式訓練,它只有一個可用的 GPU。
!nvidia-smi --query-gpu=memory.total --format=csv,noheader
physical_devices = tf.config.list_physical_devices("GPU")
tf.config.set_logical_device_configuration(
physical_devices[0],
[
tf.config.LogicalDeviceConfiguration(memory_limit=15360 // 2),
tf.config.LogicalDeviceConfiguration(memory_limit=15360 // 2),
],
)
logical_devices = tf.config.list_logical_devices("GPU")
logical_devices
EPOCHS = 3
24576 MiB
要使用 Keras 模型進行單一主機、多裝置同步訓練,您可以使用 tf.distribute.MirroredStrategy
API。以下是它的運作方式
MirroredStrategy
,選擇性地設定您要使用的特定裝置(預設情況下,該策略將使用所有可用的 GPU)。fit()
訓練模型。strategy = tf.distribute.MirroredStrategy()
print(f"Number of devices: {strategy.num_replicas_in_sync}")
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
Number of devices: 2
基本批次大小和學習率
base_batch_size = 32
base_learning_rate = 1e-4
計算縮放批次大小和學習率
scaled_batch_size = base_batch_size * strategy.num_replicas_in_sync
scaled_learning_rate = base_learning_rate * strategy.num_replicas_in_sync
現在,我們需要下載並預處理 wikitext-2 資料集。此資料集將用於預訓練 BERT 模型。我們將篩選掉短行,以確保資料具有足夠的訓練上下文。
keras.utils.get_file(
origin="https://s3.amazonaws.com/research.metamind.io/wikitext/wikitext-2-v1.zip",
extract=True,
)
wiki_dir = os.path.expanduser("~/.keras/datasets/wikitext-2/")
# Load wikitext-103 and filter out short lines.
wiki_train_ds = (
tf.data.TextLineDataset(
wiki_dir + "wiki.train.tokens",
)
.filter(lambda x: tf.strings.length(x) > 100)
.shuffle(buffer_size=500)
.batch(scaled_batch_size)
.cache()
.prefetch(tf.data.AUTOTUNE)
)
wiki_val_ds = (
tf.data.TextLineDataset(wiki_dir + "wiki.valid.tokens")
.filter(lambda x: tf.strings.length(x) > 100)
.shuffle(buffer_size=500)
.batch(scaled_batch_size)
.cache()
.prefetch(tf.data.AUTOTUNE)
)
wiki_test_ds = (
tf.data.TextLineDataset(wiki_dir + "wiki.test.tokens")
.filter(lambda x: tf.strings.length(x) > 100)
.shuffle(buffer_size=500)
.batch(scaled_batch_size)
.cache()
.prefetch(tf.data.AUTOTUNE)
)
在上面的程式碼中,我們下載 wikitext-2 資料集並提取它。然後,我們定義三個資料集:wiki_train_ds、wiki_val_ds 和 wiki_test_ds。這些資料集會被篩選以移除短行,並分批處理以進行有效訓練。
在 NLP 訓練/調整中使用衰減學習率是一種常見的做法。我們將在這裡使用 PolynomialDecay
排程。
total_training_steps = sum(1 for _ in wiki_train_ds.as_numpy_iterator()) * EPOCHS
lr_schedule = tf.keras.optimizers.schedules.PolynomialDecay(
initial_learning_rate=scaled_learning_rate,
decay_steps=total_training_steps,
end_learning_rate=0.0,
)
class PrintLR(tf.keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
print(
f"\nLearning rate for epoch {epoch + 1} is {model_dist.optimizer.learning_rate.numpy()}"
)
我們也建立一個 TensorBoard 的回呼,這將讓我們在教學的稍後部分訓練模型時,能夠將不同的指標視覺化。我們將所有回呼放在一起,如下所示
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir="./logs"),
PrintLR(),
]
print(tf.config.list_physical_devices("GPU"))
[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
準備好資料集後,我們現在在 strategy.scope()
內初始化並編譯我們的模型和最佳化器
with strategy.scope():
# Everything that creates variables should be under the strategy scope.
# In general this is only model construction & `compile()`.
model_dist = keras_hub.models.BertMaskedLM.from_preset("bert_tiny_en_uncased")
# This line just sets pooled_dense layer as non-trainiable, we do this to avoid
# warnings of this layer being unused
model_dist.get_layer("bert_backbone").get_layer("pooled_dense").trainable = False
model_dist.compile(
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.AdamW(learning_rate=scaled_learning_rate),
weighted_metrics=[keras.metrics.SparseCategoricalAccuracy()],
jit_compile=False,
)
model_dist.fit(
wiki_train_ds, validation_data=wiki_val_ds, epochs=EPOCHS, callbacks=callbacks
)
Epoch 1/3
Learning rate for epoch 1 is 0.00019999999494757503
239/239 ━━━━━━━━━━━━━━━━━━━━ 43s 136ms/step - loss: 3.7009 - sparse_categorical_accuracy: 0.1499 - val_loss: 1.1509 - val_sparse_categorical_accuracy: 0.3485
Epoch 2/3
239/239 ━━━━━━━━━━━━━━━━━━━━ 0s 122ms/step - loss: 2.6094 - sparse_categorical_accuracy: 0.5284
Learning rate for epoch 2 is 0.00019999999494757503
239/239 ━━━━━━━━━━━━━━━━━━━━ 32s 133ms/step - loss: 2.6038 - sparse_categorical_accuracy: 0.5274 - val_loss: 0.9812 - val_sparse_categorical_accuracy: 0.4006
Epoch 3/3
239/239 ━━━━━━━━━━━━━━━━━━━━ 0s 123ms/step - loss: 2.3564 - sparse_categorical_accuracy: 0.6053
Learning rate for epoch 3 is 0.00019999999494757503
239/239 ━━━━━━━━━━━━━━━━━━━━ 32s 134ms/step - loss: 2.3514 - sparse_categorical_accuracy: 0.6040 - val_loss: 0.9213 - val_sparse_categorical_accuracy: 0.4230
在範圍內擬合我們的模型後,我們正常評估它!
model_dist.evaluate(wiki_test_ds)
29/29 ━━━━━━━━━━━━━━━━━━━━ 3s 60ms/step - loss: 1.9197 - sparse_categorical_accuracy: 0.8527
[0.9470901489257812, 0.4373602867126465]
對於跨多個機器的分散式訓練(與僅利用單一機器上的多個裝置的訓練相反),您可以使用兩種分散式策略:MultiWorkerMirroredStrategy
和 ParameterServerStrategy
tf.distribute.MultiWorkerMirroredStrategy
實作同步 CPU/GPU 多工作者解決方案,以使用 Keras 樣式的模型建置和訓練迴圈,並在副本之間使用同步梯度縮減。tf.distribute.experimental.ParameterServerStrategy
實作非同步 CPU/GPU 多工作者解決方案,其中參數儲存在參數伺服器上,而工作者會非同步地將梯度更新到參數伺服器。