修改 TensorFlow 訓練指令集 - Amazon SageMaker

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

修改 TensorFlow 訓練指令集

在本節中,您將學習如何修改 TensorFlow 訓練指令碼,以設定用於自動磁碟分割和手動磁碟分割的 SageMaker模型平行程式庫。這個範例選擇也包含與 Horovod 整合的範例,以用於混合模型和資料平行處理。

注意

若要尋找程式庫支援哪些 TensorFlow 版本,請參閱支援的架構與 AWS 區域

有關使用程式庫前必須對訓練指令碼進行的修改,已列入 自動拆分 TensorFlow

想了解如何修改訓練指令碼,以便將混合模型和資料平行處理與 Horovod 搭配使用,請參閱使用 TensorFlow 和 Horovod 進行自動化分割,適用於混合模型和資料平行處理

若選擇手動磁碟分割,請參考手動分割 TensorFlow

下列主題顯示訓練指令碼範例,您可以使用這些指令碼來設定 SageMaker自動磁碟分割和手動磁碟分割模型的模型平行程式庫。 TensorFlow

注意

自動分割預設為開啟。除非特別指定,否則範例指令碼都採用自動分割。

自動拆分 TensorFlow

若要執行具有模型平行程式庫 SageMaker的 TensorFlow 模型,需要進行下列訓練指令碼變更:

  1. 使用 smp.init() 匯入和初始化程式庫。

  2. 透過繼承自 smp.DistributedModel 而不是 Keras 模型類別,來定義一個 Keras 模型。從 smp.DistributedModel 物件的呼叫方法傳回模型輸出。請注意,從呼叫方法傳回的任何張量都將跨模型平行裝置廣播,造成通訊開銷增加,因此不需傳回在呼叫方法之外的非必要張量 (例如中繼啟動)。

  3. tf.Dataset.batch() 方法中設定 drop_remainder=True。這是為了確保批次大小必然可以被微批次數量整除。

  4. 使用 smp.dp_rank() (如 shuffle(ds, seed=smp.dp_rank())) 在 Data Pipeline 中植入隨機操作,確保存放不同模型分割的 GPU 上的資料範例保持一致。

  5. 將轉送和向後邏輯放在 Step Function 中,並使用 smp.step 進行裝飾。

  6. 使用 StepOutput 方法 (如 reduce_mean) 對微批次的輸出上執行後處理。smp.step 函式的傳回值必須取決於 smp.DistributedModel 的輸出。

  7. 如果有評估步驟,採取類似將轉送邏輯放在 smp.step-裝飾函式內的作法,並使用 StepOutput API 對輸出執行後處理。

要了解有關模型並行性庫 API SageMaker 的更多信息,請參閱 API 文檔

下列 Python 指令碼是進行變更之後的訓練指令碼範例。

import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

如果您已完成訓練指令碼的準備,請繼續執行 步驟 2:使用開發套件啟動訓練 Job SageMaker。如果想要執行混合模型和資料平行訓練任務,請繼續至下一節。

使用 TensorFlow 和 Horovod 進行自動化分割,適用於混合模型和資料平行處理

您可以將 SageMaker 模型平行程度程式庫與 Horovod 搭配使用,以進行混合模型和資料平行處理原則。若要閱讀更多有關程式庫針對混合式平行處理分割模型方式的資訊,請參閱管線平行度 (可用於 PyTorch 和 TensorFlow)

在此步驟中,我們會著重於如何修改訓練指令碼,以調整 SageMaker模型平行程度程式庫。

為正確設定訓練指令碼,以取得您在 步驟 2:使用開發套件啟動訓練 Job SageMaker 要設定的混合式平行處理組態,請使用程式庫的輔助函式 smp.dp_rank()smp.mp_rank(),即會分別自動偵測資料平行和模型平行的排名。

要查找庫支持的所有 MPI 原語,請參閱 SageMaker Python SDK 文檔中的 MPI 基礎知識

指令碼中所需的必要變更如下:

  • 新增 hvd.allreduce

  • 根據 Horovod 的要求,在第一批次後廣播變數。

  • smp.dp_rank() 的 Data Pipeline 中植入隨機顯示和/或碎片操作。

注意

當您使用 Horovod 時,不得在訓練指令碼中直接呼叫 hvd.init。相反地,您必須 SageMaker Python Truemodelparallel步驟 2:使用開發套件啟動訓練 Job SageMaker. "horovod" 這讓程式庫可以基於模型分割的裝置指派狀況,在內部初始化 Horovod。直接在訓練指令碼中呼叫 hvd.init(),可能會造成問題。

注意

直接在訓練指令碼中使用 hvd.DistributedOptimizer API,可能會導致訓練成效不佳與速度減緩,因為 API 背景作業下會將 AllReduce 操作置於 smp.step 中。我們建議您在取得 smp.step 傳回的漸層上呼叫 accumulate()reduce_mean() 之後直接呼叫 hvd.allreduce,以搭配 Horovod 使用模型平行處理程式庫,如下列範例所示。

要了解有關模型並行性庫 API SageMaker 的更多信息,請參閱 API 文檔

import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: AllReduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))

手動分割 TensorFlow

使用 smp.partition 內容管理器,將操作指派至特定分割中。未放置在任何 smp.partition 內容中的任何操作都會放置在 default_partition。要了解有關模型並行性庫 API SageMaker 的更多信息,請參閱 API 文檔

import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

不支援架構功能

資料庫不支援下列 TensorFlow 功能:

  • 目前不支援 tf.GradientTape()。您可以使用 Optimizer.get_gradients()Optimizer.compute_gradients() 來運算漸層。

  • 目前不支援 tf.train.Checkpoint.restore() API。如為檢查點,請改為使用 smp.CheckpointManager 提供相同的 API 和功能。請注意,smp.CheckpointManager 的檢查點還原應該在第一步驟後進行。