在 TensorFlow 训练脚本中使用 SMDDP 库(已弃用) - Amazon SageMaker

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 TensorFlow 训练脚本中使用 SMDDP 库(已弃用)

重要

在 v2.11.0 之后,SMDDP 库已停止支持, TensorFlow 并且不再在 DLC 中可用。 TensorFlow 要查找以前安装了 SM TensorFlow DDP 库的 DLC,请参阅。支持的框架

以下步骤向您展示如何修改 TensorFlow 训练脚本以利用分布式数据 p SageMaker arallel 库。 

这些库 API 设计为类似于 Horovod API。有关该库提供的每个 API 的更多详细信息 TensorFlow,请参阅SageMaker 分布式数据 parallel TensorFlow API 文档

注意

SageMaker 分布式数据 parallel 适用于由除tf.keras模块之外的tf核心模块组成的 TensorFlow 训练脚本。 SageMaker 分布式数据 parallel 不支持 Ker TensorFlow as 实现。

注意

SageMaker 分布式数据并行度库支持开箱即用的自动混合精度 (AMP)。除了对训练脚本进行框架级别的修改之外,您无需执行额外操作即可启用 AMP。如果梯度在 FP16 中,则 SageMaker 数据并行度库在 FP16 中运行其操作。AllReduce有关对您的训练脚本实施 AMP API 的更多信息,请参阅以下资源:

  1. 导入库的 TensorFlow 客户端并对其进行初始化。

    import smdistributed.dataparallel.tensorflow as sdp  sdp.init()
  2. 使用 local_rank 将每个 GPU 固定到一个 smdistributed.dataparallel 进程,这表示进程在给定节点中的相对秩。sdp.tensorflow.local_rank()API 为您提供设备的本地等级。领导节点的秩为 0,Worker 节点的秩为 1、2、3,依此类推。在以下代码块中将其调用为sdp.local_rank()set_memory_growth与 SageMaker 分布式没有直接关系,但必须将其设置为使用进行分布式训练 TensorFlow。

    gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus:     tf.config.experimental.set_memory_growth(gpu, True) if gpus:     tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU')
  3. 根据工作线程数扩展学习率。sdp.tensorflow.size() API 为您提供集群中工作线程的数量。以下代码块中将其作为 sdp.size() 调用。

    learning_rate = learning_rate * sdp.size()
  4. 在训练期间,使用库的 DistributedGradientTape 来优化 AllReduce 操作。这会包装 tf.GradientTape。 

    with tf.GradientTape() as tape:       output = model(input)       loss_value = loss(label, output)      # SageMaker data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape tape = sdp.DistributedGradientTape(tape)
  5. 将初始模型变量从领导节点(秩 0)广播到所有 Worker 节点(秩 1 到 n)。这是确保对所有工作线程秩进行一致的初始化所必需的。在模型和优化器变量初始化后使用 sdp.tensorflow.broadcast_variables API。以下代码块中将其作为 sdp.broadcast_variables() 调用。

    sdp.broadcast_variables(model.variables, root_rank=0) sdp.broadcast_variables(opt.variables(), root_rank=0)
  6. 最后,修改脚本,仅在领导节点上保存检查点。领导节点具有同步模型。这还可以避免 Worker 节点覆盖检查点并可能损坏检查点。

    if sdp.rank() == 0:     checkpoint.save(checkpoint_dir)

以下是使用库进行分布式 TensorFlow 训练的训练脚本示例。

import tensorflow as tf # SageMaker data parallel: Import the library TF API import smdistributed.dataparallel.tensorflow as sdp # SageMaker data parallel: Initialize the library sdp.init() gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus:     tf.config.experimental.set_memory_growth(gpu, True) if gpus:     # SageMaker data parallel: Pin GPUs to a single library process     tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], 'GPU') # Prepare Dataset dataset = tf.data.Dataset.from_tensor_slices(...) # Define Model mnist_model = tf.keras.Sequential(...) loss = tf.losses.SparseCategoricalCrossentropy() # SageMaker data parallel: Scale Learning Rate # LR for 8 node run : 0.000125 # LR for single node run : 0.001 opt = tf.optimizers.Adam(0.000125 * sdp.size()) @tf.function def training_step(images, labels, first_batch):     with tf.GradientTape() as tape:         probs = mnist_model(images, training=True)         loss_value = loss(labels, probs)     # SageMaker data parallel: Wrap tf.GradientTape with the library's DistributedGradientTape     tape = sdp.DistributedGradientTape(tape)     grads = tape.gradient(loss_value, mnist_model.trainable_variables)     opt.apply_gradients(zip(grads, mnist_model.trainable_variables))     if first_batch:        # SageMaker data parallel: Broadcast model and optimizer variables        sdp.broadcast_variables(mnist_model.variables, root_rank=0)        sdp.broadcast_variables(opt.variables(), root_rank=0)     return loss_value ... # SageMaker data parallel: Save checkpoints only from master node. if sdp.rank() == 0:     checkpoint.save(checkpoint_dir)

调整完训练脚本后,请继续到第 2 步:使用 SageMaker Python 软件开发工具包启动分布式训练作业