Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Memodifikasi skrip TensorFlow pelatihan
Di bagian ini, Anda mempelajari cara memodifikasi skrip TensorFlow pelatihan untuk mengonfigurasi pustaka paralelisme SageMaker model untuk partisi otomatis dan partisi manual. Pemilihan contoh ini juga mencakup contoh yang terintegrasi dengan Horovod untuk model hibrida dan paralelisme data.
catatan
Untuk menemukan TensorFlow versi mana yang didukung oleh perpustakaan, lihatKerangka kerja yang didukung dan Wilayah AWS.
Modifikasi yang diperlukan yang harus Anda lakukan pada skrip pelatihan Anda untuk menggunakan perpustakaan tercantum di dalamnyaPemisahan otomatis dengan TensorFlow.
Untuk mempelajari cara memodifikasi skrip pelatihan Anda untuk menggunakan model hibrida dan paralelisme data dengan Horovod, lihat. Pemisahan otomatis dengan TensorFlow dan Horovod untuk model hibrida dan paralelisme data
Jika Anda ingin menggunakan partisi manual, tinjau juga. Pemisahan manual dengan TensorFlow
Topik berikut menunjukkan contoh skrip pelatihan yang dapat Anda gunakan untuk mengonfigurasi SageMaker pustaka paralelisme model untuk model partisi otomatis dan partisi manual. TensorFlow
catatan
Partisi otomatis diaktifkan secara default. Kecuali ditentukan lain, skrip contoh menggunakan partisi otomatis.
Topik
Pemisahan otomatis dengan TensorFlow
Perubahan skrip pelatihan berikut diperlukan untuk menjalankan TensorFlow model dengan pustaka SageMaker paralelisme model:
-
Impor dan inisialisasi perpustakaan dengan
smp.init()
. -
Mendefinisikan model Keras dengan mewarisi dari
smp.DistributedModel
bukan kelas Model Keras. Kembalikan output model dari metode panggilan smp.DistributedModel
objek. Perhatikan bahwa setiap tensor yang dikembalikan dari metode panggilan akan disiarkan di seluruh perangkat paralel model, yang menimbulkan overhead komunikasi, jadi tensor apa pun yang tidak diperlukan di luar metode panggilan (seperti aktivasi perantara) tidak boleh dikembalikan. -
Ditetapkan
drop_remainder=True
dalamtf.Dataset.batch()
metode. Ini untuk memastikan bahwa ukuran batch selalu habis dibagi dengan jumlah microbatch. -
Benih operasi acak dalam pipa data menggunakan
smp.dp_rank()
, misalnya,shuffle(ds, seed=smp.dp_rank())
untuk memastikan konsistensi sampel data di seluruh GPU yang memegang partisi model yang berbeda. -
Letakkan logika maju dan mundur dalam fungsi langkah dan hiasi dengan
smp.step
. -
Lakukan pasca-pemrosesan pada output di seluruh microbatch menggunakan
StepOutput
metode seperti. reduce_mean
smp.step
Fungsi harus memiliki nilai kembali yang tergantung pada output dari smp.DistributedModel
.
Untuk mempelajari lebih lanjut tentang API pustaka paralelisme model, lihat dokumentasi API
Skrip Python berikut adalah contoh skrip pelatihan setelah perubahan dilakukan.
import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()
Jika Anda selesai mempersiapkan skrip pelatihan Anda, lanjutkan keLangkah 2: Luncurkan Training Job Menggunakan SageMaker Python SDK. Jika Anda ingin menjalankan model hybrid dan pekerjaan pelatihan paralel data, lanjutkan ke bagian berikutnya.
Pemisahan otomatis dengan TensorFlow dan Horovod untuk model hibrida dan paralelisme data
Anda dapat menggunakan perpustakaan paralelisme SageMaker model dengan Horovod untuk model hibrida dan paralelisme data. Untuk membaca lebih lanjut tentang bagaimana perpustakaan membagi model untuk paralelisme hibrida, lihat. Paralelisme pipa (tersedia untuk PyTorch dan) TensorFlow
Pada langkah ini, kami fokus pada cara memodifikasi skrip pelatihan Anda untuk mengadaptasi perpustakaan paralelisme SageMaker model.
Untuk mengatur skrip pelatihan dengan benar untuk mengambil konfigurasi paralelisme hibrida yang akan Anda aturLangkah 2: Luncurkan Training Job Menggunakan SageMaker Python SDK, gunakan fungsi pembantu perpustakaan, smp.dp_rank()
dansmp.mp_rank()
, yang secara otomatis mendeteksi peringkat paralel data dan peringkat paralel model masing-masing.
Untuk menemukan semua primitif MPI yang didukung perpustakaan, lihat Dasar MPI
Perubahan yang diperlukan dalam skrip adalah:
-
Menambahkan
hvd.allreduce
-
Variabel penyiaran setelah batch pertama, seperti yang dipersyaratkan oleh Horovod
-
Operasi pengocokan pembibitan dan/atau sharding dalam pipa data dengan.
smp.dp_rank()
catatan
Saat Anda menggunakan Horovod, Anda tidak boleh langsung memanggil skrip hvd.init
pelatihan Anda. Sebagai gantinya, Anda harus mengatur "horovod"
ke True
dalam parameter SDK SageMaker modelparallel
Python di. Langkah 2: Luncurkan Training Job Menggunakan SageMaker Python SDK Hal ini memungkinkan perpustakaan untuk menginisialisasi Horovod secara internal berdasarkan penetapan perangkat partisi model. Memanggil hvd.init()
langsung dalam skrip pelatihan Anda dapat menyebabkan masalah.
catatan
Menggunakan hvd.DistributedOptimizer
API secara langsung di skrip pelatihan Anda dapat mengakibatkan kinerja dan kecepatan pelatihan yang buruk, karena API secara implisit menempatkan AllReduce
operasi di dalamnya. smp.step
Kami menyarankan Anda untuk menggunakan perpustakaan paralelisme model dengan Horovod dengan langsung memanggil hvd.allreduce
setelah memanggil accumulate()
atau reduce_mean()
pada gradien yang dikembalikan darismp.step
, seperti yang akan ditunjukkan pada contoh berikut.
Untuk mempelajari lebih lanjut tentang API pustaka paralelisme model, lihat dokumentasi API
import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: AllReduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))
Pemisahan manual dengan TensorFlow
Gunakan manajer smp.partition
konteks untuk menempatkan operasi di partisi tertentu. Setiap operasi yang tidak ditempatkan dalam smp.partition
konteks apa pun ditempatkan di. default_partition
Untuk mempelajari lebih lanjut tentang API pustaka paralelisme model, lihat dokumentasi API
import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()
Fitur kerangka kerja yang tidak didukung
TensorFlow Fitur-fitur berikut tidak didukung oleh pustaka:
-
tf.GradientTape()
saat ini tidak didukung. Anda dapat menggunakanOptimizer.get_gradients()
atauOptimizer.compute_gradients()
sebagai gantinya untuk menghitung gradien. -
tf.train.Checkpoint.restore()
API saat ini tidak didukung. Untuk checkpointing, gunakansmp.CheckpointManager
sebagai gantinya, yang menyediakan API dan fungsionalitas yang sama. Perhatikan bahwa pemulihan pos pemeriksaan dengansmp.CheckpointManager
harus dilakukan setelah langkah pertama.