网站中 点击出现登录框怎么做,产品推广方案ppt模板,温州市城乡建设厅网站,阿里云虚拟主机做淘客网站Pytorch DistributedDataParallel#xff08;DDP#xff09;教程二#xff1a;快速入门实践篇 文章目录 一、简要回顾DDP二、DDP训练框架的流程1. 准备DDP环境2. 准备数据加载器3. 准备DDP模型和优化器4. 开始训练5. 评估测试 三、完整代码四、DP, DDP性能对比五、总结1. H…Pytorch DistributedDataParallelDDP教程二快速入门实践篇 文章目录 一、简要回顾DDP二、DDP训练框架的流程1. 准备DDP环境2. 准备数据加载器3. 准备DDP模型和优化器4. 开始训练5. 评估测试 三、完整代码四、DP, DDP性能对比五、总结1. How to use DDP all depends on yourself.2. 又Out了 一、简要回顾DDP
在上一篇文章中简单介绍了Pytorch分布式训练的一些基础原理和基本概念。简要回顾如下
1DDP采用Ring-All-Reduce架构其核心思想为所有的GPU设备安排在一个逻辑环中每个GPU应该有一个左邻和一个右邻设备从它的左邻居接收数据并将数据汇总后发送给右邻。通过N轮迭代以后每个设备都拥有全局数据的计算结果。
2DDP每个GPU对应一个进程这些进程可以看作是相互独立的。除非我们自己手动实现不然各个进程的数据都是不互通的。Pytorch只为我们实现了梯度同步。
3DDP相关代码需要关注三个部分数据拆分、IO操作、和评估测试。
二、DDP训练框架的流程
1. 准备DDP环境
在使用DDP训时我们首先要初始化一下DDP环境设置好通信后端进程组这些。代码很简单如下所示
def setup(rank, world_size):# 设置主机地址和端口号这两个环境变量用于配置进程组通信的初始化。# MASTER_ADDR指定了负责协调初始化过程的主机地址在这里设置为localhost# 表示在单机多GPU的设置中所有的进程都将连接到本地机器上。os.environ[MASTER_ADDR] localhost# MASTER_PORT指定了主机监听的端口号用于进程间的通信。这里设置为12355。# 注意要选择一个未被使用的端口号来进行监听os.environ[MASTER_PORT] 12355# 初始化分布式进程组。# 使用NCCL作为通信后端这是NVIDIA GPUs优化的通信库适用于高性能GPU之间的通信。# rank是当前进程在进程组中的编号world_size是总进程数GPU数量即进程组的大小。dist.init_process_group(nccl, rankrank, world_sizeworld_size)# 为每个进程设置GPUtorch.cuda.set_device(rank)2. 准备数据加载器
假设我们已经定义好了dataset这里只需要略加修改使用DistributedSampler即可。代码如下
def get_loader(trainset, testset, batch_size, rank, world_size):train_sampler DistributedSampler(train_set, num_replicasworld_size, rankrank)train_loader DataLoader(train_set, batch_sizebatch_size, samplertrain_sampler)# 对于测试集来说可以选择使用DistributedSampler也可以选择不使用这里选择使用test_sampler DistributedSampler(test_set, num_replicasworld_size, rankrank)test_loader DataLoader(test_set, batch_sizebatch_size, samplertrain_sampler)# 不使用的代码很简单 如下所示# test_loader DataLoader(test_set, batch_sizebatch_size, shuffleFalse)rerturn train_loader, test_loader注关于testloader要不要使用分布式采样器取决于自己的需求。如果测试数据集相对较小或者不需要频繁进行测试评估不使用DistributedSampler可能更简单因为每个GPU或进程都会独立处理完整的数据集从而简化了测试流程。然而对于大型数据集或当需要在训练过程中频繁进行模型评估的情况使用DistributedSampler可以显著提高测试的效率因为它允许每个GPU只处理数据的一个子集从而减少了单个进程的负载并加快了处理速度。
对于DDP而言每个进程上都会有一个dataloader如果使用了DistributedSampler那么真的批大小会是batch_size*num_gpus。
有关DistributedSampler的更多细节可以参考
DDP系列第二篇实现原理与源代码解析
3. 准备DDP模型和优化器
在定义好模型之后需要在所有进程中复制模型并用DDP封装。代码如下
def prepare_model_and_optimizer(model, rank, lr):# 设置设备为当前进程的GPU。这里rank代表当前进程的编号# cuda:{rank} 指定模型应该运行在对应编号的GPU上。device torch.device(fcuda:{rank}) # 包装模型以使用分布式数据并行。DDP将在多个进程间同步模型的参数# 并且只有指定的device_ids中的GPU才会被使用。model model.to(device)model DDP(model, device_ids[rank])optimizer torch.optim.SGD(model.parameters(), lrlr)return model, optimizer注在DDP中不同进程之间只会同步梯度因此为了保证训练时的参数同步需要在训练开始前确保不同进程上模型和优化器的初始状态相同。对于优化器而言当使用PyTorch中内置的优化器如SGD, Adam等时只要模型在每个进程中初始化状态相同优化器在每个进程中创建后的初始状态也将是相同的。但是如果是自定义的优化器确保在设计时考虑到跨进程的一致性和同步特别是当涉及到需要维护跨步骤状态如动量、RMS等时。
确保模型的初始状态相同有如下两种方式
1参数初始化方法
在DDP中每个GPU上都会有一个模型。我们可以利用统一的初始化方法来保证不同GPU上的参数统一性。一个简单的示例代码如下
def weights_init(m):if isinstance(m, nn.Conv2d):nn.init.kaiming_normal_(m.weight, modefan_out, nonlinearityrelu)elif isinstance(m, nn.Linear):nn.init.xavier_uniform_(m.weight)m.bias.data.fill_(0.01)
torch.manual_seed(42) # 设置随机种子以确保可重复性
# 设置所有GPU的随机种子
torch.cuda.manual_seed_all(42)
model MyModel()
model.apply(weights_init)注在初始化时需要为所有的进程设置好相同的随机种子不然weights_init的结果也会不一样。
2加载相同的模型权重文件
另一种方法是在所有进程中加载相同的预训练权重。这确保了无论在哪个GPU上模型的起点都是一致的。代码如下
model MyModel()
model.load_state_dict(torch.load(path_to_weights.pth))注如果你既没有设置初始化方法也没有模型权重。一个可行的方式是手动同步将rank0的进程上模型文件临时保存然后其他进程加载最后再删掉临时文件。代码如下
def synchronize_model(model, rank, roottemp_model.pth):if rank 0:# 保存模型到文件torch.save(model.state_dict(), root)torch.distributed.barrier() # 等待rank0保存模型if rank ! 0:# 加载模型权重model.load_state_dict(torch.load(root))torch.distributed.barrier() # 确保所有进程都加载了模型if rank 0:# 删除临时文件os.remove(root)模型同步似乎可以省略在使用torch.nn.parallel.DistributedDataParallel封装模型时它会在内部处理所需的同步操作。
4. 开始训练
训练时的代码其实和单卡训练没有什么区别。最主要的就是在每个epoch开始的时候要设置一下sampler的epoch以保证每个epoch的采样数据的顺序都是不一样的。代码如下
def train(model, optimizer, criterion, rank, train_loader, num_epochs):sampler train_loader.samplerfor epoch in range(num_epochs):# 在每个epoch开始时更新samplersampler.set_epoch(epoch)model.train()for batch_idx, (data, targets) in enumerate(dataloader):data, targets data.cuda(rank), targets.cuda(rank)optimizer.zero_grad()outputs model(data)loss criterion(outputs, targets)loss.backward()optimizer.step()# 只在rank为0的进程中打印信息if rank 0 and batch_idx % 100 0:print(fEpoch {epoch}, Batch {batch_idx}, Loss: {loss.item()})注这里的打印的loss只是rank0上的loss如果要打印所有卡上的平均loss则需要使用all_reduce方法。代码如下 # 将损失从所有进程中收集起来并求平均# 创建一个和loss相同的tensor用于聚合操作reduced_loss torch.tensor([loss.item()]).cuda(rank)# all_reduce操作默认是求和dist.all_reduce(reduced_loss)# 求平均reduced_loss reduced_loss / dist.get_world_size()# 只在rank为0的进程中打印信息if rank 0 and batch_idx % 100 0:print(fEpoch {epoch}, Batch {batch_idx}, Average Loss: {reduced_loss.item()})5. 评估测试
评估的代码也和单卡比较类似唯一的区别就是如果使用了DistributedSampler在计算指标时需要gather每个进程上的preds和gts然后计算全局指标。
def evaluate(model, test_loader, rank):model.eval()total_preds []total_targets []with torch.no_grad():for data, targets in test_loader:data, targets data.to(rank), targets.to(rank)outputs model(data)_, preds torch.max(outputs, 1)# 收集当前进程的结果total_preds.append(preds)total_targets.append(targets)# 将所有进程的preds和targets转换为全局列表total_preds torch.cat(total_preds).cpu()total_targets torch.cat(total_targets).cpu()# 使用all_gather将所有进程的数据集中到一个列表中gathered_preds [torch.zeros_like(total_preds) for _ in range(dist.get_world_size())]gathered_targets [torch.zeros_like(total_targets) for _ in range(dist.get_world_size())]dist.all_gather(gathered_preds, total_preds)dist.all_gather(gathered_targets, total_targets)if rank 0:# 只在一个进程中进行计算和输出gathered_preds torch.cat(gathered_preds)gathered_targets torch.cat(gathered_targets)# 计算全局性能指标accuracy (gathered_preds gathered_targets).float().mean()print(fGlobal Accuracy: {accuracy.item()})注如果test_loader没有设置DistributedSampler评估的代码可以和单卡代码完全一样不需要任何修改。
三、完整代码
下面以CIFAR100数据集为例完整展示一下DDP的训练流程。
import os
import time
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from torchvision import datasets, transforms
from torch.nn.parallel import DistributedDataParallel as DDP# 模型定义
class LeNet(nn.Module):def __init__(self, num_classes100):super(LeNet, self).__init__()self.conv1 nn.Conv2d(3, 6, 5)self.pool nn.MaxPool2d(2, 2)self.conv2 nn.Conv2d(6, 16, 5)self.fc1 nn.Linear(16 * 5 * 5, 120)self.fc2 nn.Linear(120, 84)self.fc3 nn.Linear(84, num_classes) # CIFAR100 has 100 classesdef forward(self, x):x self.pool(torch.relu(self.conv1(x)))x self.pool(torch.relu(self.conv2(x)))x torch.flatten(x, 1)x torch.relu(self.fc1(x))x torch.relu(self.fc2(x))x self.fc3(x)return xdef setup(rank, world_size):os.environ[MASTER_ADDR] localhostos.environ[MASTER_PORT] 12355dist.init_process_group(nccl, rankrank, world_sizeworld_size)torch.cuda.set_device(rank)def cleanup():# 销毁进程组dist.destroy_process_group()def get_model():model LeNet(100).cuda()model DDP(model, device_ids[torch.cuda.current_device()])return modeldef get_dataloader(trainTrue):transform transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])rank dist.get_rank()# 每个进程创建其独立的数据目录避免I/O冲突# 这里使用rank来创建独立目录例如./data_0./data_1等# 这种方法避免了多个进程同时写入同一个文件所导致的冲突# 注这是一种简单的解决方案但在需要大量磁盘空间的情况下并不高效因为每个进程都需要存储数据集的一个完整副本。dataset datasets.CIFAR100(rootf./data_{rank}, traintrain, downloadTrue, transformtransform)sampler DistributedSampler(dataset, shuffletrain)loader DataLoader(dataset, batch_size64, samplersampler)return loaderdef train(model, loader, optimizer, criterion, epoch, rank):model.train()# 设置DistributedSampler的epochloader.sampler.set_epoch(epoch)for batch_idx, (data, targets) in enumerate(loader):data, targets data.cuda(rank), targets.cuda(rank)optimizer.zero_grad()outputs model(data)loss criterion(outputs, targets)loss.backward()optimizer.step()# 每100个batch计算当前的损失并在所有进程中进行聚合然后打印if (batch_idx 1) % 100 0:# 将当前的loss转换为tensor并在所有进程间进行求和loss_tensor torch.tensor([loss.item()]).cuda(rank)dist.all_reduce(loss_tensor)# 计算所有进程的平均损失mean_loss loss_tensor.item() / dist.get_world_size() # 平均损失# 如果是rank 0则打印平均损失if rank 0:print(fRank {rank}, Epoch {epoch}, Batch {batch_idx 1}, Mean Loss: {mean_loss})def evaluate(model, dataloader, device):model.eval()local_preds []local_targets []with torch.no_grad():for data, targets in dataloader:data, targets data.to(device), targets.to(device)outputs model(data)_, preds torch.max(outputs, 1)local_preds.append(preds)local_targets.append(targets)# 将本地预测和目标转换为全局列表local_preds torch.cat(local_preds)local_targets torch.cat(local_targets)# 使用all_gather收集所有进程的预测和目标world_size dist.get_world_size()gathered_preds [torch.zeros_like(local_preds) for _ in range(world_size)]gathered_targets [torch.zeros_like(local_targets) for _ in range(world_size)]dist.all_gather(gathered_preds, local_preds)dist.all_gather(gathered_targets, local_targets)# 只在rank 0进行计算和输出if dist.get_rank() 0:gathered_preds torch.cat(gathered_preds)gathered_targets torch.cat(gathered_targets)accuracy (gathered_preds gathered_targets).float().mean()print(fGlobal Test Accuracy: {accuracy.item()})def main_worker(rank, world_size, num_epochs):setup(rank, world_size)model get_model()optimizer optim.Adam(model.parameters(), lr0.001)criterion nn.CrossEntropyLoss()train_loader get_dataloader(trainTrue)test_loader get_dataloader(trainFalse)start_time time.time()for epoch in range(num_epochs): # num of epochstrain(model, train_loader, optimizer, criterion, epoch, rank)evaluate(model, test_loader, rank)# 计时结束前同步所有进程确保所有进程已经完成训练dist.barrier()duration time.time() - start_timeif rank 0:print(fTraining completed in {duration:.2f} seconds)cleanup()if __name__ __main__:world_size 4 # 4块GPUnum_epochs 10 # 总共训练10轮# 采用mp.spawn启动mp.spawn(main_worker, args(world_size,num_epochs), nprocsworld_size, joinTrue)
注
1关于get_loader函数中数据加载有关部分的问题
dataset datasets.CIFAR100(rootf./data_{rank}, traintrain, downloadTrue, transformtransform)上面这段代码的最大问题在于每个进程都会去下载一份数据到该进程对应的目录这些目录之间是物理隔离的。显然当要下载的数据集很大时这种方法并不合适因为会占用更多的硬盘资源并且大量时间会花费在下载数据集上。但是如果不为每个进程设置单独的目录就会造成读写冲突多个进程都去同时读写同一个文件最终导致数据集加载不成功。
一种更合理的解决方法是提前下载好文件并在创建数据集时设置download为False。代码如下
def download_data(rank):if rank 0:transform transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])# 只在Rank 0 中下载数据集datasets.CIFAR100(root./data_cifar100, trainTrue, downloadTrue, transformtransform)# 等待rank0下载完成dist.barrier()def get_dataloader(trainTrue):# 现在只需要下载一次rank dist.get_rank()download_data(rank)transform transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])dataset datasets.CIFAR100(root./data_cifar100, traintrain, downloadFalse, transformtransform) # 设置download为Falsesampler DistributedSampler(dataset, num_replicasdist.get_world_size(), rankrank, shuffletrain)loader DataLoader(dataset, batch_size64, samplersampler, num_workers4)return loader
2关于DDP启动方式有关的问题
DDP的启动方式有两种一种是**torch.distributed.launch**这个工具脚本设置必要的环境变量并为每个节点启动多个进程。通常在从命令行运行脚本时使用在新版本的Pytorch 2.版本中这种方式是deprecated不推荐继续使用。
还有一种就是**torch.multiprocessing.spawn** 这个函数在Python脚本内部编程方式启动多个进程。启动方式很简单分别传入主入口函数main_worker然后传入main_woker的参数以及GPU数量即可。
四、DP, DDP性能对比
基于上述的代码我还实现了一个DP的代码。实验setting为
GPU: 4 × 4 \times 4× RTX 4090, batch_size256, optimizer为Adam学习率为0.001loss是CE loss。
它们之间的性能对比如下
方式时间准确率DDP77秒27.12%DP293秒26.76%单卡训练248秒26.34%
没有调参网络结构也是个最简单的LeNet只训练了10轮所以准确率比较低。不过这个结果还是能说明一些问题的。可以看到DDP耗时最短DP的时间反而比单卡训练还长。这主要是因为对于CIFAR100分类单卡也可以很好地支持训练显卡并不是性能瓶颈。当使用DP时模型的所有参数在每次前向和反向传播后都需要在主GPU上进行聚合然后再分发到各个GPU。这种多余的聚合和分发操作会带来显著的通信开销。并且在DataParallel中主GPU承担了额外的数据分发和收集工作会成为性能瓶颈导致其他GPU在等待主GPU处理完成时出现闲置状态。
五、总结
1. How to use DDP all depends on yourself.
在最开始学习DDP的时候有很多地方是很困惑的。每个博客的代码都有所区别让我很是困惑。例如在testloader到底要不要用DistributedSampler在计算损失的时候到底要不要用all_reduce操作来计算mean_loss在计算指标的时候到底要不要all_gather。后面了解多了之后才发现到底用不用完全取决自己的需求。
1mean_loss
由于Pytorch在DDP中会自动同步梯度因此计算不计算mean_loss对于模型的训练和参数没有任何影响。唯一的区别在于打印日志的时候是打印全局的平均损失还是只打印某个进程上的损失。如果每张卡上的batch size已经足够大例如设置为128或者更高打印全局平均损失和单进程上的损失一般来说差别不大。
2测试时DistributedSampler
测试时testloader设不设置DistributedSampler也完全取决于自己的实际需求。如果不设置那么就是在每个进程上都会用全部的数据的来进行测试。如果有八块卡那么就相当于在每个卡上都分别测试了一次一共测试了八次。如果你的测试数据集比较小比如只有几百张图像并且测试的频率也不高的话不设置DistributedSampler没有任何问题不会有太多的额外开销。但是如果测试数据集比较大比如几万张图像并且训练时每个epoch都要进行测试那么最好还是设置一下DistributedSampler可以有效地减少总体训练时间。
3评估时all_gather
至于要不要使用all_gather则和有没有使用DistributedSampler相关。如果设置了DistributedSampler那么评估时就要使用all_gather来汇总所有进程上的结果否则打印的只会是某个进程的结果并不准确。
4batch_size
此外testloader在使用DistributedSampler也需要格外注意数据能否被整除。举个例子假设我们有8块卡每块卡上的batch_size设置为64那么总的batch size就是512。如果我们的训练数据集只有1000份为了凑够完整的两个batchDistributedSampler会对数据进行补全重复部分数据使得数据总数变为1024份。在这个过程有24份数据被重复评估这些重复评估的数据可能会对评估结果产生影响。以4分类任务为例如果类别数量比较均衡相当于每个类别都有256份数据。在这种情况下重复评估24份数据对结果不会有什么影响。但是如果类别数据并不均衡有些类别只有十几份数据那么这个重复评估的影响就比较大了。如果正好重复的数据是样本数量只有十几份的类别那么评估结果将会变得极其不准确在这种情况下我们需要重写一个sampler来实现无重复的数据采样。或者也可以直接不使用DistributedSampler在每个进程上都进行一次完整的评估。
5同步批量归一化Synchronized Batch Normalization, SynBN
之前说过每个GPU对应一个进程不同进程的数据一般是不共享的。也就是说如果模型结构中有BN层每个GPU上的BN层只能访问到该GPU上的一部分数据无法获得全局的数据分布信息。为此可以使用同步BN层来利用所有GPU上的数据来计算BN层的mean和variance。代码也很简单只需要对实例化model之后转为同步BN即可。
def get_model():model LeNet(100).cuda()# 转换所有的BN层为同步BN层model nn.SyncBatchNorm.convert_sync_batchnorm(model)model DDP(model, device_ids[torch.cuda.current_device()])return model2. 又Out了
以上是借助Pytorch提供的DDP有关API来搭建自己的分布式训练代码框架的教程还是有一点小复杂的。现在有很多第三方库如HuggingFace的Accelerate、微软开发的DeepSpeed、Horovod、Pytorch Lightning对DDP进行了进一步的封装使用这些第三方库可以极大地简化代码。但是目前我还没有学习了解过这些第三方库再次out了没有及时学习前沿技术有机会真应该好好学习一下。尤其是Horovod它可以跨深度学习框架使用支持Pytorch、TensorFlow、Keras、MXNet等。Accelerate和DeepSeed也很不错做大模型相关基本上都会用到。Pytorch Lightning顾名思义让Pytorch变得更简单确实Pytorch Lightning把细节封装得非常好代码非常简洁值得一学。
最后推荐两个B站上对DDP讲解很不错的几个视频
pytorch多GPU并行训练教程
03 DDP 初步应用Trainertorchrun
知乎上有个帖子也不错
DDP系列第一篇入门教程