Modifica un script de entrenamiento TensorFlow - Amazon SageMaker AI

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Modifica un script de entrenamiento TensorFlow

En esta sección, aprenderá a modificar los scripts de TensorFlow entrenamiento para configurar la biblioteca de paralelismo del SageMaker modelo para el particionamiento automático y el particionamiento manual. Esta selección de ejemplos incluye también un ejemplo integrado con Horovod para modelos híbridos y paralelismo de datos.

nota

Para saber qué TensorFlow versiones son compatibles con la biblioteca, consulte. Marcos admitidos y Regiones de AWS

Las modificaciones necesarias que debe realizar en el script de entrenamiento para utilizar la biblioteca se enumeran en División automática con TensorFlow.

Para obtener información sobre cómo modificar el script de entrenamiento para utilizar el modelo híbrido y el paralelismo de datos con Horovod, consulte División automática con Horovod para el paralelismo de datos TensorFlow y modelos híbridos.

Si quiere usar particiones manuales, revise también División manual con TensorFlow.

Los siguientes temas muestran ejemplos de scripts de entrenamiento que puede usar para configurar la biblioteca de paralelismo SageMaker de modelos para modelos de particionamiento automático y particionamiento manual. TensorFlow

nota

La partición automática está habilitada de forma predeterminada. A menos que se especifique lo contrario, los scripts de ejemplo utilizan la partición automática.

División automática con TensorFlow

Se requieren los siguientes cambios en el script de entrenamiento para ejecutar un TensorFlow modelo con SageMaker la biblioteca de paralelismo de modelos:

  1. Importe e inicialice la biblioteca con smp.init().

  2. Definir el modelo de Keras que se hereda de smp.DistributedModel en lugar de la clase de modelo Keras. Devolver las salidas del modelo desde el método de llamada del objeto smp.DistributedModel. Tenga en cuenta que los tensores devueltos del método de llamada se transmitirán a través de dispositivos de paralelismo de modelos, lo que supondrá una sobrecarga de comunicación, por lo que no se debe devolver los tensores que no sean necesarios fuera del método de llamada (como las activaciones intermedias).

  3. Establezca drop_remainder=True en el método tf.Dataset.batch(). Esto sirve para garantizar que el tamaño del lote sea siempre divisible por el número de microlotes.

  4. Inserte las operaciones aleatorias en la canalización de datossmp.dp_rank(), por ejemplo, shuffle(ds, seed=smp.dp_rank()) para garantizar la coherencia de las muestras de datos entre las GPUs que se encuentran distintas particiones de modelos.

  5. Coloque la lógica hacia adelante y hacia atrás en una función de paso y decórela con smp.step.

  6. Realice un procesamiento posterior en las salidas de los microlotes mediante los métodos StepOutput tales como reduce_mean. La función smp.step debe tener un valor devuelto que depende de la salida de smp.DistributedModel.

  7. Si hay algún paso de evaluación, coloque de manera similar la lógica de avance dentro de una función decorada smp.step y procese posteriormente las salidas utilizando la API StepOutput.

Para obtener más información sobre la API SageMaker de la biblioteca de paralelismo de modelos de la API, consulte la documentación de la API.

El siguiente script de Python es un ejemplo de un script de entrenamiento después de realizar los cambios.

import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

Si ha terminado de preparar el script de entrenamiento, continúe con Paso 2: Inicie un trabajo de formación con el SDK de SageMaker Python. Si desea ejecutar un trabajo de entrenamiento paralelo de datos y modelo híbrido, proceda con la siguiente sección.

División automática con Horovod para el paralelismo de datos TensorFlow y modelos híbridos

Puede utilizar la biblioteca de paralelismo de SageMaker modelos con Horovod para el paralelismo de datos y modelos híbridos. Para obtener más información sobre cómo la biblioteca divide un modelo para el paralelismo híbrido, consulte PyTorch TensorFlowParalelismo de canalización (disponible para y).

En este paso, nos centramos en cómo modificar el guion de entrenamiento para adaptar la biblioteca de paralelismo del modelo. SageMaker

Para configurar correctamente su script de entrenamiento para que recoja la configuración de paralelismo híbrido que utilizará en Paso 2: Inicie un trabajo de formación con el SDK de SageMaker Python, utilice las funciones auxiliares de la biblioteca, smp.dp_rank() y smp.mp_rank(), que detectan automáticamente el rango de paralelismo de datos y el rango de paralelismo de modelos, respectivamente.

Para encontrar todas las primitivas de MPI que admite la biblioteca, consulte Conceptos básicos de MPI en la documentación del SDK de SageMaker Python.

Los cambios principales necesarios en el script son:

  • Añadir hvd.allreduce

  • Variables de radiodifusión después del primer lote, según lo requerido por Horovod

  • Propagación de operaciones de partición y/o fragmentación en la canalización de datos con smp.dp_rank().

nota

Cuando utilice Horovod, no debe llamar directamente hvd.init en su script de entrenamiento. En su lugar, tendrás que "horovod" configurarlo True en los modelparallel parámetros del SDK de SageMaker Python enPaso 2: Inicie un trabajo de formación con el SDK de SageMaker Python. Esto permite que la biblioteca inicialice Horovod internamente en función de las asignaciones de dispositivos de las particiones del modelo. Llamar hvd.init() directamente a su script de entrenamiento puede provocar problemas.

nota

El uso de la API hvd.DistributedOptimizer directamente en el script de entrenamiento puede provocar un rendimiento y una velocidad de entrenamiento deficientes, ya que la API incluye implícitamente la operación AllReduce en smp.step. Le recomendamos utilizar la biblioteca de paralelismo de modelos con Horovod llamando hvd.allreduce directamente después de llamar a accumulate() o reduce_mean() o en los gradientes devueltos por smp.step, como se mostrará en el siguiente ejemplo.

Para obtener más información sobre la API SageMaker de la biblioteca de paralelismo de modelos de la API, consulta la documentación de la API.

import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: AllReduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))

División manual con TensorFlow

Uso de gestores de contexto de smp.partition para colocar las operaciones en una partición específica. Toda operación no colocada en ningún contexto smp.partition se colocará en default_partition. Para obtener más información sobre la API SageMaker de la biblioteca de paralelismo de modelos de esta, consulta la documentación de la API.

import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

Características del marco no compatibles

La biblioteca no admite las siguientes TensorFlow funciones:

  • Actualmente tf.GradientTape() no es compatible. Puede usar Optimizer.get_gradients() o Optimizer.compute_gradients() en lugar de gradiente informáticos.

  • Actualmente, la API tf.train.Checkpoint.restore() no es compatible. Para los puntos de control, utilice smp.CheckpointManager en su lugar, que proporciona la misma API y funcionalidad. Tenga en cuenta que las restauraciones del punto de control con smp.CheckpointManager deben realizarse después del primer paso.