トレーニングスクリプトを変更します。 TensorFlow - Amazon SageMaker

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

トレーニングスクリプトを変更します。 TensorFlow

このセクションでは、 TensorFlow トレーニングスクリプトを変更して、 SageMaker自動パーティショニングと手動パーティショニング用のモデル並列処理ライブラリを設定する方法を学習します。例として選んだ中には、モデルとデータのハイブリッド並列処理のための Horovod との統合例も含まれています。

注記

TensorFlow ライブラリがどのバージョンをサポートしているかを確認するには、を参照してください。サポートされるフレームワークと AWS リージョン

ライブラリを使うためにトレーニングスクリプトに加える必要のある変更については、「による自動分割 TensorFlow」を参照してください。

Horovod でモデルとデータのハイブリッド並列処理を使うようにトレーニングスクリプトを変更する方法については、「 TensorFlow と Horovod による自動分割により、ハイブリッドモデルとデータ並列処理を実現」を参照してください。

手動パーティショニングを使用する場合は、「による手動分割 TensorFlow」も確認してください。

以下のトピックでは、 SageMaker自動パーティショニングモデルと手動パーティショニングモデルのモデル並列処理ライブラリを設定するために使用できるトレーニングスクリプトの例を示します。 TensorFlow

注記

自動パーティショニングはデフォルトで有効になっています。特に指定がない限り、サンプルスクリプトでは自動パーティショニングを使用します。

による自動分割 TensorFlow

TensorFlow SageMakerのモデル並列処理ライブラリを使用してモデルを実行するには、以下のトレーニングスクリプトの変更が必要です。

  1. ライブラリをインポートし、smp.init() で初期化します。

  2. Keras Model クラスの代わりに smp.DistributedModel から継承することで、Keras モデルを定義します。smp.DistributedModel オブジェクトの call メソッドからのモデル出力を返します。call メソッドから返されたテンソルはどれもモデル並列デバイス間でブロードキャストされるため、通信オーバーヘッドが発生します。したがって、call メソッド範囲外の不要なテンソル (中間アクティベーションなど) は返さないように注意してください。

  3. tf.Dataset.batch() メソッドで drop_remainder=True を設定します。これは、バッチサイズが常にマイクロバッチ数で割り切れるようにするためです。

  4. データパイプラインのランダムオペレーションを smp.dp_rank() を使ってシードします (例: shuffle(ds, seed=smp.dp_rank()))。これにより、異なるモデルパーティションを保持する GPU 間でデータサンプルの一貫性が確保されます。

  5. フォワードおよびバックワードロジックをステップ関数に入れ、smp.step で修飾します。

  6. reduce_mean などの StepOutput メソッドを使って、マイクロバッチ全体の出力に対して後処理を実行します。smp.step 関数には、smp.DistributedModel の出力に依存する戻り値が必要です。

  7. 評価ステップがある場合は、同様にフォワードロジックを smp.step で修飾された関数内に配置し、StepOutput API を使って出力を後処理します。

SageMakerのモデル並列処理ライブラリ API の詳細については、API ドキュメントを参照してください。

次の Python スクリプトは、変更後のトレーニングスクリプトの例です。

import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

トレーニングスクリプトの準備が完了したら、ステップ 2: SageMaker Python SDK を使用してトレーニングJob を起動する に進んでください。ハイブリッドモデルとデータの並列トレーニングジョブを実行する場合は、次のセクションに進んでください。

TensorFlow と Horovod による自動分割により、ハイブリッドモデルとデータ並列処理を実現

Horovod でモデル並列処理ライブラリを使用すると、 SageMaker ハイブリッドモデルとデータ並列処理を行うことができます。ハイブリッド並列処理用にライブラリがモデルを分割する方法の詳細については、「パイプラインの並列処理 ( PyTorch および で使用可能 TensorFlow)」を参照してください。

このステップでは、トレーニングスクリプトを変更してモデル並列処理ライブラリを適合させる方法に焦点を当てます。 SageMaker

トレーニングスクリプトを適切に設定して、ステップ 2: SageMaker Python SDK を使用してトレーニングJob を起動する で設定するハイブリッド並列処理設定を適用するには、データ並列ランクとモデル並列ランクをそれぞれ自動的に検出するライブラリのヘルパー関数 smp.dp_rank() および smp.mp_rank() を使用します。

ライブラリがサポートするすべての MPI プリミティブを見つけるには、Python SDK ドキュメントの「MPI の基本」を参照してください。 SageMaker

スクリプトに必要な変更点は次のとおりです。

  • hvd.allreduce を追加する

  • Horovod の要求に応じて、最初のバッチの後に変数をブロードキャストする

  • データパイプラインのシャッフルやシャードのオペレーションを smp.dp_rank() を使ってシードする。

注記

Horovod を使用するときは、トレーニングスクリプトで hvd.init を直接呼び出すことはできません。代わりに、の SageMaker Python SDK "horovod" True modelparallel ステップ 2: SageMaker Python SDK を使用してトレーニングJob を起動する パラメータでを設定する必要があります。これにより、ライブラリはモデルパーティションのデバイス割り当てに基づいて Horovod を内部的に初期化できます。hvd.init() をトレーニングスクリプト内で直接呼び出すと、問題が発生する可能性があります。

注記

トレーニングスクリプト内で hvd.DistributedOptimizer API を直接使用すると、API が黙示的に AllReduce オペレーションを smp.step 内に配置するため、トレーニングのパフォーマンスと速度が低下する可能性があります。Horovod でモデル並列処理ライブラリを使用する場合は、次の例で示すように、smp.step から返された勾配に対して accumulate() または reduce_mean() を呼び出した後に hvd.allreduce を直接呼び出すことをお勧めします。

SageMakerのモデル並列処理ライブラリ API の詳細については、API ドキュメントを参照してください。

import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: AllReduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))

による手動分割 TensorFlow

smp.partition コンテキストマネージャーを使って、特定のパーティションにオペレーションを配置します。どの smp.partition コンテキストにも配置されていないオペレーションは、default_partition に配置されます。 SageMakerのモデル並列処理ライブラリ API の詳細については、API ドキュメントを参照してください。

import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

サポートされていないフレームワーク機能

TensorFlow 以下の機能はライブラリではサポートされていません。

  • tf.GradientTape() は現在サポートされていません。代わりに Optimizer.get_gradients() または Optimizer.compute_gradients() を使って勾配を計算できます。

  • tf.train.Checkpoint.restore() API は現在サポートされていません。チェックポイントには、代わりに同じ API と機能を備えた smp.CheckpointManager を使用してください。smp.CheckpointManager によるチェックポイントの復元は、最初のステップの後で実行することに注意してください。