Ejecute un trabajo de entrenamiento en paralelo de un modelo SageMaker distribuido con Tensor Paralelism - Amazon SageMaker

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Ejecute un trabajo de entrenamiento en paralelo de un modelo SageMaker distribuido con Tensor Paralelism

En esta sección, aprenderá lo siguiente:

  • Cómo configurar un SageMaker PyTorch estimador y la opción de paralelismo del SageMaker modelo para utilizar el paralelismo tensorial.

  • Cómo adaptar el script de su entrenamiento mediante los módulos smdistributed.modelparallel ampliados para el paralelismo de tensores.

Para obtener más información sobre los smdistributed.modelparallel módulos, consulte las API paralelas del SageMaker modelo en la documentación del SDK de SageMaker Python.

Paralelismo tensorial por sí solo

A continuación se muestra un ejemplo de una opción de entrenamiento distribuido para activar solo el paralelismo de tensores, sin el paralelismo de canalización. Configure los smp_options diccionarios mpi_options y para especificar las opciones de entrenamiento distribuidas en el SageMaker PyTorch estimador.

nota

Las funciones ampliadas de ahorro de memoria están disponibles a través de Deep Learning Containers for PyTorch, que implementa la biblioteca de paralelismo de SageMaker modelos v1.6.0 o posterior.

Configure un estimador SageMaker PyTorch

mpi_options = { "enabled" : True, "processes_per_host" : 8,               # 8 processes "custom_mpi_options" : "--mca btl_vader_single_copy_mechanism none " }                 smp_options = { "enabled":True, "parameters": { "pipeline_parallel_degree": 1,    # alias for "partitions" "placement_strategy": "cluster", "tensor_parallel_degree": 4,      # tp over 4 devices "ddp": True } }                smp_estimator = PyTorch(    entry_point='your_training_script.py', # Specify    role=role,    instance_type='ml.p3.16xlarge',    sagemaker_session=sagemaker_session,    framework_version='1.13.1', py_version='py36',    instance_count=1,    distribution={        "smdistributed": {"modelparallel": smp_options},        "mpi": mpi_options    },    base_job_name="SMD-MP-demo", ) smp_estimator.fit('s3://my_bucket/my_training_data/')
sugerencia

Para encontrar una lista completa de parámetrosdistribution, consulte Parámetros de configuración para el paralelismo de modelos en la documentación del SDK de SageMaker Python.

Adapta tu guion de entrenamiento PyTorch

El siguiente ejemplo de guion de entrenamiento muestra cómo adaptar la biblioteca de paralelismo de SageMaker modelos a un guion de entrenamiento. En este ejemplo, se asume que el script se llama your_training_script.py.

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class Net(nn.Module):     def __init__(self):         super(Net, self).__init__()         self.conv1 = nn.Conv2d(1, 32, 3, 1)         self.conv2 = nn.Conv2d(32, 64, 3, 1)         self.fc1 = nn.Linear(9216, 128)         self.fc2 = nn.Linear(128, 10)     def forward(self, x):         x = self.conv1(x)         x = F.relu(x)         x = self.conv2(x)         x = F.relu(x)         x = F.max_pool2d(x, 2)         x = torch.flatten(x, 1)         x = self.fc1(x)         x = F.relu(x)         x = self.fc2(x)         return F.log_softmax(x, 1) def train(model, device, train_loader, optimizer):     model.train()     for batch_idx, (data, target) in enumerate(train_loader):         # smdistributed: Move input tensors to the GPU ID used by         # the current process, based on the set_device call.         data, target = data.to(device), target.to(device)         optimizer.zero_grad()         output = model(data)         loss = F.nll_loss(output, target, reduction="mean")         loss.backward()         optimizer.step() # smdistributed: Initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time if smp.local_rank() == 0:     dataset = datasets.MNIST("../data", train=True, download=False) smp.barrier() # smdistributed: Shard the dataset based on data parallel ranks if smp.dp_size() > 1:     partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())}     dataset = SplitDataset(dataset, partitions=partitions_dict)     dataset.select(f"{smp.dp_rank()}") train_loader = torch.utils.data.DataLoader(dataset, batch_size=64) # smdistributed: Enable tensor parallelism for all supported modules in the model # i.e., nn.Linear in this case. Alternatively, we can use # smp.set_tensor_parallelism(model.fc1, True) # to enable it only for model.fc1 with smp.tensor_parallelism():     model = Net() # smdistributed: Use the DistributedModel wrapper to distribute the # modules for which tensor parallelism is enabled model = smp.DistributedModel(model) optimizer = optim.AdaDelta(model.parameters(), lr=4.0) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)

Paralelismo tensorial combinado con paralelismo de canalización

El siguiente es un ejemplo de una opción de entrenamiento distribuida que permite combinar el paralelismo tensorial con el paralelismo en canalización. Configure los smp_options parámetros mpi_options y para especificar las opciones paralelas del modelo con paralelismo tensorial al configurar un estimador. SageMaker PyTorch

nota

Las funciones ampliadas de ahorro de memoria están disponibles a través de Deep Learning Containers for PyTorch, que implementa la biblioteca de paralelismo de SageMaker modelos v1.6.0 o posterior.

Configure un estimador SageMaker PyTorch

mpi_options = { "enabled" : True, "processes_per_host" : 8,               # 8 processes "custom_mpi_options" : "--mca btl_vader_single_copy_mechanism none " }                 smp_options = { "enabled":True, "parameters": { "microbatches": 4, "pipeline_parallel_degree": 2,    # alias for "partitions" "placement_strategy": "cluster", "tensor_parallel_degree": 2,      # tp over 2 devices "ddp": True } }                smp_estimator = PyTorch(    entry_point='your_training_script.py', # Specify    role=role,    instance_type='ml.p3.16xlarge',    sagemaker_session=sagemaker_session,    framework_version='1.13.1', py_version='py36',    instance_count=1,    distribution={        "smdistributed": {"modelparallel": smp_options},        "mpi": mpi_options    },    base_job_name="SMD-MP-demo", ) smp_estimator.fit('s3://my_bucket/my_training_data/')  

Adapta tu guion de entrenamiento PyTorch

El siguiente ejemplo de guion de entrenamiento muestra cómo adaptar la biblioteca de paralelismo de SageMaker modelos a un guion de entrenamiento. Tenga en cuenta que el script de entrenamiento ahora incluye el decorador smp.step:

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class Net(nn.Module):     def __init__(self):         super(Net, self).__init__()         self.conv1 = nn.Conv2d(1, 32, 3, 1)         self.conv2 = nn.Conv2d(32, 64, 3, 1)         self.fc1 = nn.Linear(9216, 128)         self.fc2 = nn.Linear(128, 10)     def forward(self, x):         x = self.conv1(x)         x = F.relu(x)         x = self.conv2(x)         x = F.relu(x)         x = F.max_pool2d(x, 2)         x = torch.flatten(x, 1)         x = self.fc1(x)         x = F.relu(x)         x = self.fc2(x)         return F.log_softmax(x, 1) # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target):     output = model(data)     loss = F.nll_loss(output, target, reduction="mean")     model.backward(loss)     return output, loss def train(model, device, train_loader, optimizer):     model.train()     for batch_idx, (data, target) in enumerate(train_loader):         # smdistributed: Move input tensors to the GPU ID used by         # the current process, based on the set_device call.         data, target = data.to(device), target.to(device)         optimizer.zero_grad()         # Return value, loss_mb is a StepOutput object         _, loss_mb = train_step(model, data, target)         # smdistributed: Average the loss across microbatches.         loss = loss_mb.reduce_mean()         optimizer.step() # smdistributed: Initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time if smp.local_rank() == 0:     dataset = datasets.MNIST("../data", train=True, download=False) smp.barrier() # smdistributed: Shard the dataset based on data parallel ranks if smp.dp_size() > 1:     partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())}     dataset = SplitDataset(dataset, partitions=partitions_dict)     dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = Net() # smdistributed: enable tensor parallelism only for model.fc1 smp.set_tensor_parallelism(model.fc1, True) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = optim.AdaDelta(model.parameters(), lr=4.0) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)