本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在本部分中,您将学习:
-
如何配置 SageMaker PyTorch 估计器和 SageMaker 模型并行度选项以使用张量并行度。
-
如何使用扩展的
smdistributed.modelparallel
模块来调整训练脚本以用于张量并行性。
要了解有关这些smdistributed.modelparallel
模块的更多信息,请参阅 SageMaker Python SDK 文档 APIs中的并行SageMaker 模型
仅使用张量并行性
以下分布式训练选项示例单独激活张量并行性,不使用管道并行性。配置mpi_options
和smp_options
字典以为 SageMaker PyTorch
估计器指定分布式训练选项。
注意
扩展的内存节省功能可通过 Deep Learning Containers for 获得 PyTorch,该容器实现了 SageMaker 模型并行度库 v1.6.0 或更高版本。
配置 SageMaker PyTorch 估算器
mpi_options = {
"enabled" : True,
"processes_per_host" : 8, # 8 processes
"custom_mpi_options" : "--mca btl_vader_single_copy_mechanism none "
}
smp_options = {
"enabled":True,
"parameters": {
"pipeline_parallel_degree": 1, # alias for "partitions"
"placement_strategy": "cluster",
"tensor_parallel_degree": 4, # tp over 4 devices
"ddp": True
}
}
smp_estimator = PyTorch(
entry_point='your_training_script.py
', # Specify
role=role,
instance_type='ml.p3.16xlarge
',
sagemaker_session=sagemaker_session,
framework_version='1.13.1',
py_version='py36',
instance_count=1,
distribution={
"smdistributed": {"modelparallel": smp_options},
"mpi": mpi_options
},
base_job_name="SMD-MP-demo
",
)
smp_estimator.fit('s3://my_bucket/my_training_data/
')
提示
要查找的完整参数列表distribution
,请参阅 Pyth SageMaker on SDK 文档中的模型并行度配置参数
调整您的 PyTorch 训练脚本
以下示例训练脚本展示了如何根据训练脚本调整 SageMaker 模型并行度库。在此示例中,假设脚本命名为 your_training_script.py
。
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchnet.dataset import SplitDataset
from torchvision import datasets
import smdistributed.modelparallel.torch as smp
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.fc2(x)
return F.log_softmax(x, 1)
def train(model, device, train_loader, optimizer):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
# smdistributed: Move input tensors to the GPU ID used by
# the current process, based on the set_device call.
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target, reduction="mean")
loss.backward()
optimizer.step()
# smdistributed: Initialize the backend
smp.init()
# smdistributed: Set the device to the GPU ID used by the current process.
# Input tensors should be transferred to this device.
torch.cuda.set_device(smp.local_rank())
device = torch.device("cuda")
# smdistributed: Download only on a single process per instance.
# When this is not present, the file is corrupted by multiple processes trying
# to download and extract at the same time
if smp.local_rank() == 0:
dataset = datasets.MNIST("../data", train=True, download=False)
smp.barrier()
# smdistributed: Shard the dataset based on data parallel ranks
if smp.dp_size() > 1:
partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())}
dataset = SplitDataset(dataset, partitions=partitions_dict)
dataset.select(f"{smp.dp_rank()}")
train_loader = torch.utils.data.DataLoader(dataset, batch_size=64)
# smdistributed: Enable tensor parallelism for all supported modules in the model
# i.e., nn.Linear in this case. Alternatively, we can use
# smp.set_tensor_parallelism(model.fc1, True)
# to enable it only for model.fc1
with smp.tensor_parallelism():
model = Net()
# smdistributed: Use the DistributedModel wrapper to distribute the
# modules for which tensor parallelism is enabled
model = smp.DistributedModel(model)
optimizer = optim.AdaDelta(model.parameters(), lr=4.0)
optimizer = smp.DistributedOptimizer(optimizer)
train(model, device, train_loader, optimizer)
张量并行性与管道并行性相结合
以下是一个分布式训练选项的示例,该选项支持张量并行性与流水线并行性相结合。 设置mpi_options
和smp_options
参数,以便在配置估计器时使用张量并行度指定模型并行度选项。 SageMaker PyTorch
注意
扩展的内存节省功能可通过 Deep Learning Containers for 获得 PyTorch,该容器实现了 SageMaker 模型并行度库 v1.6.0 或更高版本。
配置 SageMaker PyTorch 估算器
mpi_options = {
"enabled" : True,
"processes_per_host" : 8, # 8 processes
"custom_mpi_options" : "--mca btl_vader_single_copy_mechanism none "
}
smp_options = {
"enabled":True,
"parameters": {
"microbatches": 4,
"pipeline_parallel_degree": 2
, # alias for "partitions"
"placement_strategy": "cluster",
"tensor_parallel_degree": 2
, # tp over 2 devices
"ddp": True
}
}
smp_estimator = PyTorch(
entry_point='your_training_script.py
', # Specify
role=role,
instance_type='ml.p3.16xlarge
',
sagemaker_session=sagemaker_session,
framework_version='1.13.1',
py_version='py36',
instance_count=1,
distribution={
"smdistributed": {"modelparallel": smp_options},
"mpi": mpi_options
},
base_job_name="SMD-MP-demo
",
)
smp_estimator.fit('s3://my_bucket/my_training_data/
')
调整您的 PyTorch 训练脚本
以下示例训练脚本展示了如何根据训练脚本调整 SageMaker 模型并行度库。请注意,训练脚本现在包括 smp.step
修饰器:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchnet.dataset import SplitDataset
from torchvision import datasets
import smdistributed.modelparallel.torch as smp
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.fc2(x)
return F.log_softmax(x, 1)
# smdistributed: Define smp.step. Return any tensors needed outside.
@smp.step
def train_step(model, data, target):
output = model(data)
loss = F.nll_loss(output, target, reduction="mean")
model.backward(loss)
return output, loss
def train(model, device, train_loader, optimizer):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
# smdistributed: Move input tensors to the GPU ID used by
# the current process, based on the set_device call.
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
# Return value, loss_mb is a StepOutput object
_, loss_mb = train_step(model, data, target)
# smdistributed: Average the loss across microbatches.
loss = loss_mb.reduce_mean()
optimizer.step()
# smdistributed: Initialize the backend
smp.init()
# smdistributed: Set the device to the GPU ID used by the current process.
# Input tensors should be transferred to this device.
torch.cuda.set_device(smp.local_rank())
device = torch.device("cuda")
# smdistributed: Download only on a single process per instance.
# When this is not present, the file is corrupted by multiple processes trying
# to download and extract at the same time
if smp.local_rank() == 0:
dataset = datasets.MNIST("../data", train=True, download=False)
smp.barrier()
# smdistributed: Shard the dataset based on data parallel ranks
if smp.dp_size() > 1:
partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())}
dataset = SplitDataset(dataset, partitions=partitions_dict)
dataset.select(f"{smp.dp_rank()}")
# smdistributed: Set drop_last=True to ensure that batch size is always divisible
# by the number of microbatches
train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True)
model = Net()
# smdistributed: enable tensor parallelism only for model.fc1
smp.set_tensor_parallelism(model.fc1, True)
# smdistributed: Use the DistributedModel container to provide the model
# to be partitioned across different ranks. For the rest of the script,
# the returned DistributedModel object should be used in place of
# the model provided for DistributedModel class instantiation.
model = smp.DistributedModel(model)
optimizer = optim.AdaDelta(model.parameters(), lr=4.0)
optimizer = smp.DistributedOptimizer(optimizer)
train(model, device, train_loader, optimizer)