本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
修改 TensorFlow 训练脚本
在本节中,您将学习如何修改 TensorFlow 训练脚本以配置用于自动分区和手动分区的 SageMaker模型并行度库。这些示例还包括与 Horovod 集成的示例,用于混合模型和数据并行性。
注意
要了解该库支持哪些 TensorFlow 版本,请参阅支持的框架和 AWS 区域。
使用自动拆分 TensorFlow中列出了为使用库而必须对训练脚本进行的修改。
要了解如何修改训练脚本以在 Horovod 中使用混合模型和数据并行性,请参阅 使用 TensorFlow 和 Horovod 自动拆分,实现混合模型和数据并行性。
如果您要使用手动分区,另请参阅 使用手动拆分 TensorFlow。
以下主题显示了训练脚本的示例,您可以使用这些脚本来配置自动分区和手动分区模型 SageMaker的模型并行度库。 TensorFlow
注意
默认情况下启用自动分区。除非另行指定,否则示例脚本使用自动分区。
使用自动拆分 TensorFlow
要使用模型并行度库运行 TensorFlow 模型,需要对训练脚本进行 SageMaker以下更改:
-
使用
smp.init()
导入和初始化库。 -
通过从
smp.DistributedModel
继承来定义 Keras 模型,而不是从 Keras 模型类继承。从 smp.DistributedModel
对象的调用方法返回模型输出。请注意,从调用方法返回的任何张量都将在模型并行设备之间广播,这会产生通信开销,因此在调用方法之外不需要的任何张量(例如中间激活)都不应返回。 -
在
tf.Dataset.batch()
方法中设置drop_remainder=True
。这是为了确保批次大小始终可以被微批次数量整除。 -
使用
smp.dp_rank()
在数据管道中植入随机操作(例如shuffle(ds, seed=smp.dp_rank())
),以确保存有不同模型分区的 GPU 之间数据样本的一致性。 -
将向前和向后逻辑放在步进函数中,然后用
smp.step
进行修饰。 -
使用
StepOutput
方法(例如 reduce_mean
)对微批次的输出进行后处理。smp.step
函数必须具有一个取决于 smp.DistributedModel
的输出的返回值。 -
如果有评估步骤,则同样将向前逻辑放在
smp.step
修饰的函数中,然后使用StepOutput
API对输出进行后处理。
要了解有关模型并行度库 API SageMaker 的更多信息,请参阅 AP I
以下 Python 脚本是进行更改后的训练脚本的示例。
import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()
如果您已准备好训练脚本,请继续到第 2 步:使用 SageMaker Python 软件开发工具包启动训练 Job。如果要运行混合模型和数据并行训练作业,请继续到下一个部分。
使用 TensorFlow 和 Horovod 自动拆分,实现混合模型和数据并行性
您可以将 SageMaker 模型并行度库与 Horovod 一起使用,以实现混合模型和数据并行性。要详细了解库如何拆分模型以用于混合并行性,请参阅管道并行度(适用于 PyTorch 和) TensorFlow。
在这一步中,我们将重点介绍如何修改训练脚本以适应 SageMaker模型并行度库。
要正确设置训练脚本,以便选取要在 第 2 步:使用 SageMaker Python 软件开发工具包启动训练 Job 中设置的混合并行度配置,请使用库的帮助程序函数 smp.dp_rank()
和 smp.mp_rank()
,它们分别自动检测数据并行秩和模型并行秩。
要查找该库支持的所有 MPI 原语,请参阅 Pyth SageMaker on SDK 文档中的 MPI 基础知识
脚本中需要进行以下更改:
-
添加
hvd.allreduce
-
按照 Horovod 的要求,在第一个批次之后广播变量
-
使用
smp.dp_rank()
在数据管道中植入随机排序和/或分片操作。
注意
使用 Horovod 时,您不可在训练脚本中直接调用 hvd.init
。相反,你必须在中的 SageMaker Python SDK modelparallel
参数True
中"horovod"
将其设置为第 2 步:使用 SageMaker Python 软件开发工具包启动训练 Job。这使得库可以根据模型分区的设备分配,在内部初始化 Horovod。直接在训练脚本中调用 hvd.init()
可能会导致问题。
注意
在训练脚本中直接使用 hvd.DistributedOptimizer
API 可能会导致训练性能和速度不佳,因为 API 会隐式地将 AllReduce
操作放入 smp.step
中。我们建议您在 smp.step
返回的梯度上调用 accumulate()
或 reduce_mean()
后,直接调用 hvd.allreduce
,以将模型并行性库与 Horovod 一起使用,如下例所示。
要了解有关模型并行度库 API SageMaker 的更多信息,请参阅 AP I
import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: AllReduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))
使用手动拆分 TensorFlow
使用 smp.partition
上下文管理器将操作放在特定的分区中。未放在任何 smp.partition
上下文中的任何操作都放在 default_partition
中。要了解有关模型并行度库 API SageMaker 的更多信息,请参阅 AP I
import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()
不支持的框架功能
该库不支持以下 TensorFlow 功能:
-
当前不支持
tf.GradientTape()
。您可以改用Optimizer.get_gradients()
或Optimizer.compute_gradients()
来计算梯度。 -
目前不支持
tf.train.Checkpoint.restore()
API。对于检查点操作,请改用smp.CheckpointManager
,它提供相同的 API 和功能。请注意,smp.CheckpointManager
的检查点还原应在第一步之后进行。