Modifier un script d'entraînement TensorFlow - Amazon SageMaker

Modifier un script d'entraînement TensorFlow

Dans cette section, vous apprendrez à modifier les scripts d'entraînement TensorFlow pour configurer la bibliothèque parallèle de modèles distribués SageMaker, le partitionnement automatique et le partitionnement manuel. Cette sélection d'exemples inclut également un exemple intégré à Horovod pour le modèle hybride et le parallélisme des données.

Note

Pour savoir quelles versions de TensorFlow sont prises en charge par la bibliothèque, voir Cadres pris en et Régions AWS.

Les modifications que vous devez apporter à votre script d'entraînement pour utiliser la bibliothèque sont répertoriées dans TensorFlow.

Pour savoir comment modifier votre script d'entraînement pour utiliser un modèle hybride et le parallélisme de données avec Horovod, consultez TensorFlow avec Horovod pour un parallélisme hybride des modèles et des données.

Si vous optez pour le partitionnement manuel, consultez également Partitionnement manuel avec TensorFlow.

Astuce

Pour obtenir des exemples de blocs-notes de bout en bout illustrant l'utilisation d'un script d'entraînement TensorFlow avec la bibliothèque de modèles parallèles distribués SageMaker, veuillez consulter Exemple TensorFlow.

Les rubriques suivantes montrent des exemples de scripts d'entraînement que vous pouvez utiliser pour configurer la bibliothèque de parallélisme de modèles de SageMaker pour le partitionnement automatique et manuel de modèles TensorFlow.

Note

Le partitionnement automatique est activé par défaut. Sauf indication contraire, les exemples de scripts utilisent le partitionnement automatique.

TensorFlow

Les modifications de script d'entraînement suivantes sont requises pour exécuter un modèle TensorFlow avec la bibliothèque de modèles parallèles distribués de SageMaker :

  1. Importez et initialisez la bibliothèque avec smp.init().

  2. Définissez un modèle Keras en héritant de smp.DistributedModel au lieu de la classe de modèles Keras. Renvoyez les sorties du modèle à partir de la méthode d'appel de l'objet smp.DistributedModel. N'oubliez pas que tous les tenseurs renvoyés par la méthode d'appel seront diffusés sur des périphériques avec parallélisme des modèles. Comme cela induira un surdébit de communication, évitez de renvoyer les tenseurs qui ne sont pas nécessaires en dehors de la méthode d'appel (activations intermédiaires, par exemple).

  3. Définissez drop_remainder=True dans la méthode tf.Dataset.batch(). Cela vise à garantir que la taille du lot est toujours divisible par le nombre de micro-lots.

  4. Répartissez les opérations aléatoires dans le pipeline de données en utilisant un smp.dp_rank(), par exemple shuffle(ds, seed=smp.dp_rank()), pour assurer la cohérence des échantillons de données entre les GPU contenant différentes partitions de modèle.

  5. Mettez la logique en avant et en arrière dans une fonction étape et décorez-la avec smp.step.

  6. Effectuez un post-traitement sur les sorties des différents micro-lots à l'aide de méthodes StepOutput telles que reduce_mean. La fonction smp.step doit avoir une valeur de retour qui dépend de la sortie de smp.DistributedModel.

  7. De façon similaire, s'il y a une étape d'évaluation, placez la logique en avant dans une fonction décorée smp.step et post-traitez les sorties en utilisant l'API StepOutput.

Pour en savoir plus sur l'API de la bibliothèque de modèles parallèles distribués de SageMaker, consultez la documentation sur l'API.

Le script Python suivant est un exemple de script d'entraînement après application des modifications.

import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

Si vous avez fini de préparer votre scénario d'entraînement, passez à Étape 2 : lancer une tâche d'entraînement à l'aide du kit SDK Python SageMaker. Si vous souhaitez exécuter une tâche d'entraînement parallèle modèle et données hybride, passez à la section suivante.

TensorFlow avec Horovod pour un parallélisme hybride des modèles et des données

Vous pouvez utiliser la bibliothèque parallèle de modèles distribués SageMaker avec Horovod pour un parallélisme hybride des modèles et des données. Pour en savoir plus sur la façon dont la bibliothèque divise un modèle pour le parallélisme hybride, reportez-vous à Parallélisme de pipeline.

Dans cette étape, nous nous concentrons sur la manière de modifier votre script d'entraînement pour adapter la bibliothèque parallèle de modèles distribués de SageMaker.

Pour configurer correctement votre script d'entraînement afin qu'il prenne en compte la configuration du parallélisme hybride que vous définirez dans Étape 2 : lancer une tâche d'entraînement à l'aide du kit SDK Python SageMaker, utilisez les fonctions d'aide de la bibliothèque, smp.dp_rank() et smp.mp_rank(), qui détectent automatiquement le rang parallèle des données et le rang parallèle du modèle, respectivement.

Pour connaître toutes les primitives MPI prises en charge par la bibliothèque, consultez la rubrique MPI Basics (Principes de base de MPI) dans la documentation du kit SDK Python de SageMaker.

Les modifications à apporter au script sont les suivantes :

  • Ajouter hvd.allreduce

  • Diffuser des variables après le premier lot, comme l'exige Horovod

  • Répartir des opérations de remaniement et/ou de partitionnement dans le pipeline de données avec smp.dp_rank().

Note

Lorsque vous utilisez Horovod, vous ne devez pas faire appel directement à hvd.init dans votre script d'entraînement. Au lieu de cela, vous devrez définir "horovod" sur True dans les paramètres modelparallel du kit SDK SageMaker Python de Étape 2 : lancer une tâche d'entraînement à l'aide du kit SDK Python SageMaker. Cela permet à la bibliothèque d'initialiser Horovod en interne en se basant sur les affectations de périphériques des partitions du modèle. Le fait d'appeler directement hvd.init() dans votre script d'entraînement peut poser des problèmes.

Note

L'utilisation de l'API hvd.DistributedOptimizer directement dans votre script d'entraînement peut entraîner une baisse des performances et de la vitesse d'entraînement, car l'API place implicitement l'opération AllReduce à l'intérieur de smp.step. Nous vous recommandons d'utiliser la bibliothèque parallèle de modèles avec Horovod en appelant directement hvd.allreduce après l'appel à accumulate() ou à reduce_mean() sur les gradients retournés par smp.step, comme le montre l'exemple suivant.

Pour en savoir plus sur l'API de la bibliothèque de modèles parallèles distribués de SageMaker, consultez la documentation sur l'API.

import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: Allreduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))

Partitionnement manuel avec TensorFlow

Utilisez les gestionnaires de contexte smp.partition pour placer les opérations dans une partition spécifique. Toute opération non placée dans un contexte smp.partition est placée dans le default_partition. Pour en savoir plus sur l'API de la bibliothèque de modèles parallèles distribués de SageMaker, consultez la documentation sur l'API.

import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

Fonctions de cadre non prises en charge

Les fonctions TensorFlow suivantes ne sont pas prises en charge par la bibliothèque :

  • tf.GradientTape() n'est pas prise en charge pour le moment. À la place, vous pouvez utiliser Optimizer.get_gradients() ou Optimizer.compute_gradients() pour calculer les gradients.

  • L'API tf.train.Checkpoint.restore() n'est pas prise en charge pour le moment. Pour le pointage, utilisez smp.CheckpointManager, qui fournit la même API et la même fonctionnalité. Les restaurations de point de contrôle avec smp.CheckpointManager doivent intervenir après la première étape.