Memodifikasi skrip TensorFlow pelatihan - Amazon SageMaker

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Memodifikasi skrip TensorFlow pelatihan

Di bagian ini, Anda mempelajari cara memodifikasi skrip TensorFlow pelatihan untuk mengonfigurasi pustaka paralelisme SageMaker model untuk partisi otomatis dan partisi manual. Pemilihan contoh ini juga mencakup contoh yang terintegrasi dengan Horovod untuk model hibrida dan paralelisme data.

catatan

Untuk menemukan TensorFlow versi mana yang didukung oleh perpustakaan, lihatKerangka kerja yang didukung dan Wilayah AWS.

Modifikasi yang diperlukan yang harus Anda lakukan pada skrip pelatihan Anda untuk menggunakan perpustakaan tercantum di dalamnyaPemisahan otomatis dengan TensorFlow.

Untuk mempelajari cara memodifikasi skrip pelatihan Anda untuk menggunakan model hibrida dan paralelisme data dengan Horovod, lihat. Pemisahan otomatis dengan TensorFlow dan Horovod untuk model hibrida dan paralelisme data

Jika Anda ingin menggunakan partisi manual, tinjau juga. Pemisahan manual dengan TensorFlow

Topik berikut menunjukkan contoh skrip pelatihan yang dapat Anda gunakan untuk mengonfigurasi SageMaker pustaka paralelisme model untuk model partisi otomatis dan partisi manual. TensorFlow

catatan

Partisi otomatis diaktifkan secara default. Kecuali ditentukan lain, skrip contoh menggunakan partisi otomatis.

Pemisahan otomatis dengan TensorFlow

Perubahan skrip pelatihan berikut diperlukan untuk menjalankan TensorFlow model dengan pustaka SageMaker paralelisme model:

  1. Impor dan inisialisasi perpustakaan dengan smp.init().

  2. Mendefinisikan model Keras dengan mewarisi dari smp.DistributedModelbukan kelas Model Keras. Kembalikan output model dari metode panggilan smp.DistributedModel objek. Perhatikan bahwa setiap tensor yang dikembalikan dari metode panggilan akan disiarkan di seluruh perangkat paralel model, yang menimbulkan overhead komunikasi, jadi tensor apa pun yang tidak diperlukan di luar metode panggilan (seperti aktivasi perantara) tidak boleh dikembalikan.

  3. Ditetapkan drop_remainder=True dalam tf.Dataset.batch() metode. Ini untuk memastikan bahwa ukuran batch selalu habis dibagi dengan jumlah microbatch.

  4. Benih operasi acak dalam pipa data menggunakansmp.dp_rank(), misalnya, shuffle(ds, seed=smp.dp_rank()) untuk memastikan konsistensi sampel data di seluruh GPU yang memegang partisi model yang berbeda.

  5. Letakkan logika maju dan mundur dalam fungsi langkah dan hiasi dengansmp.step.

  6. Lakukan pasca-pemrosesan pada output di seluruh microbatch menggunakan StepOutputmetode seperti. reduce_mean smp.stepFungsi harus memiliki nilai kembali yang tergantung pada output darismp.DistributedModel.

  7. Jika ada langkah evaluasi, tempatkan logika penerusan di dalam fungsi smp.step -decorated dan pasca-proses output menggunakan API. StepOutput

Untuk mempelajari lebih lanjut tentang API pustaka paralelisme model, lihat dokumentasi API. SageMaker

Skrip Python berikut adalah contoh skrip pelatihan setelah perubahan dilakukan.

import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

Jika Anda selesai mempersiapkan skrip pelatihan Anda, lanjutkan keLangkah 2: Luncurkan Training Job Menggunakan SageMaker Python SDK. Jika Anda ingin menjalankan model hybrid dan pekerjaan pelatihan paralel data, lanjutkan ke bagian berikutnya.

Pemisahan otomatis dengan TensorFlow dan Horovod untuk model hibrida dan paralelisme data

Anda dapat menggunakan perpustakaan paralelisme SageMaker model dengan Horovod untuk model hibrida dan paralelisme data. Untuk membaca lebih lanjut tentang bagaimana perpustakaan membagi model untuk paralelisme hibrida, lihat. Paralelisme pipa (tersedia untuk PyTorch dan) TensorFlow

Pada langkah ini, kami fokus pada cara memodifikasi skrip pelatihan Anda untuk mengadaptasi perpustakaan paralelisme SageMaker model.

Untuk mengatur skrip pelatihan dengan benar untuk mengambil konfigurasi paralelisme hibrida yang akan Anda aturLangkah 2: Luncurkan Training Job Menggunakan SageMaker Python SDK, gunakan fungsi pembantu perpustakaan, smp.dp_rank() dansmp.mp_rank(), yang secara otomatis mendeteksi peringkat paralel data dan peringkat paralel model masing-masing.

Untuk menemukan semua primitif MPI yang didukung perpustakaan, lihat Dasar MPI dalam dokumentasi Python SageMaker SDK.

Perubahan yang diperlukan dalam skrip adalah:

  • Menambahkan hvd.allreduce

  • Variabel penyiaran setelah batch pertama, seperti yang dipersyaratkan oleh Horovod

  • Operasi pengocokan pembibitan dan/atau sharding dalam pipa data dengan. smp.dp_rank()

catatan

Saat Anda menggunakan Horovod, Anda tidak boleh langsung memanggil skrip hvd.init pelatihan Anda. Sebagai gantinya, Anda harus mengatur "horovod" ke True dalam parameter SDK SageMaker modelparallel Python di. Langkah 2: Luncurkan Training Job Menggunakan SageMaker Python SDK Hal ini memungkinkan perpustakaan untuk menginisialisasi Horovod secara internal berdasarkan penetapan perangkat partisi model. Memanggil hvd.init() langsung dalam skrip pelatihan Anda dapat menyebabkan masalah.

catatan

Menggunakan hvd.DistributedOptimizer API secara langsung di skrip pelatihan Anda dapat mengakibatkan kinerja dan kecepatan pelatihan yang buruk, karena API secara implisit menempatkan AllReduce operasi di dalamnya. smp.step Kami menyarankan Anda untuk menggunakan perpustakaan paralelisme model dengan Horovod dengan langsung memanggil hvd.allreduce setelah memanggil accumulate() atau reduce_mean() pada gradien yang dikembalikan darismp.step, seperti yang akan ditunjukkan pada contoh berikut.

Untuk mempelajari lebih lanjut tentang API pustaka paralelisme model, lihat dokumentasi API. SageMaker

import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: AllReduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))

Pemisahan manual dengan TensorFlow

Gunakan manajer smp.partition konteks untuk menempatkan operasi di partisi tertentu. Setiap operasi yang tidak ditempatkan dalam smp.partition konteks apa pun ditempatkan di. default_partition Untuk mempelajari lebih lanjut tentang API pustaka paralelisme model, lihat dokumentasi API. SageMaker

import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()

Fitur kerangka kerja yang tidak didukung

TensorFlow Fitur-fitur berikut tidak didukung oleh pustaka:

  • tf.GradientTape()saat ini tidak didukung. Anda dapat menggunakan Optimizer.get_gradients() atau Optimizer.compute_gradients() sebagai gantinya untuk menghitung gradien.

  • tf.train.Checkpoint.restore()API saat ini tidak didukung. Untuk checkpointing, gunakan smp.CheckpointManager sebagai gantinya, yang menyediakan API dan fungsionalitas yang sama. Perhatikan bahwa pemulihan pos pemeriksaan dengan smp.CheckpointManager harus dilakukan setelah langkah pertama.