Distributed Data Parallel中的分布式训练

实现原理
与dataparallel不同的是,distributed data parallel会开设多个进程而非线程,进程数 = gpu数,每个进程都可以独立进行训练,也就是说代码的所有部分都会被每个进程同步调用,如果你某个地方print张量,你会发现device的差异
sampler会将数据按照进程数切分,
「确保不同进程的数据不同」
每个进程独立进行前向训练
每个进程利用ring all-reduce进行通信,将梯度信息进行聚合
每个进程同步更新模型参数,进行新一轮训练
按进程切分
如何确保数据不同呢?不妨看看distributedsampler的源码
# 判断数据集长度是否可以整除gpu数# 如果不能,选择舍弃还是补全,进而决定总数# if the dataset length is evenly divisible by # of replicas# then there is no need to drop any data, since the dataset # will be split equally.if (self.drop_last and  len(self.dataset) % self.num_replicas != 0): # num_replicas = num_gpus    self.num_samples = math.ceil((len(self.dataset) -         self.num_replicas) /self.num_replicas)else:    self.num_samples = math.ceil(len(self.dataset) /         self.num_replicas) self.total_size = self.num_samples * self.num_replicas# 根据是否shuffle来创建indicesif self.shuffle:    # deterministically shuffle based on epoch and seed    g = torch.generator()    g.manual_seed(self.seed + self.epoch)    indices = torch.randperm(len(self.dataset), generator=g).tolist()  else:    indices = list(range(len(self.dataset)))  if not self.drop_last:    # add extra samples to make it evenly divisible    padding_size = self.total_size - len(indices)    if padding_size  1,3 > 0
四个部分,哪块gpu上占的多,就由该块gpu往它下一个传,初始从主节点传播,即gpu0,你可以想象跟接力一样,a传b,b负责传给c
第一次传播如下:
那么结果就是:
那么,按照谁多谁往下传的原则,此时应该是gpu1往gpu2传a0和a1,gpu2往gpu3传b1和b2,以此类推
接下来再传播就会有gpu3 a的部分全有,gpu0上b的部分全有等,就再往下传
再来几遍便可以使得每块gpu上都获得了来自其他gpu的梯度啦
代码使用
基础概念
第一个是后端的选择,即数据传输协议,从下表可以看出[2],当使用cpu时可以选择gloo而gpu则可以是nccl
「backend」 「gloo」   「mpi」   「nccl」  
device cpu gpu cpu gpu cpu gpu
send ✓ ✘ ✓ ? ✘ ✓
recv ✓ ✘ ✓ ? ✘ ✓
broadcast ✓ ✓ ✓ ? ✘ ✓
all_reduce ✓ ✓ ✓ ? ✘ ✓
reduce ✓ ✘ ✓ ? ✘ ✓
all_gather ✓ ✘ ✓ ? ✘ ✓
gather ✓ ✘ ✓ ? ✘ ✓
scatter ✓ ✘ ✓ ? ✘ ✘
reduce_scatter ✘ ✘ ✘ ✘ ✘ ✓
all_to_all ✘ ✘ ✓ ? ✘ ✓
barrier ✓ ✘ ✓ ? ✘ ✓
接下来是一些参数的解释[3]:
arg meaning
group 一次发起的所有进程构成一个group,除非想更精细通信,创建new_group
world_size 一个group中进程数目,即为gpu的数量
rank 进程id,主节点rank=0,其他的在0和world_size-1之间
local_rank 进程在本地节点/机器的id
举个例子,假如你有两台服务器(又被称为node),每台服务器有4张gpu,那么,world_size即为8,rank=[0, 1, 2, 3, 4, 5, 6, 7], 每个服务器上的进程的local_rank为[0, 1, 2, 3]
然后是「初始化方法」的选择,有tcp和共享文件两种,一般指定rank=0为master节点
tcp显而易见是通过网络进行传输,需要指定主节点的ip(可以为主节点实际ip,或者是localhost)和空闲的端口
import torch.distributed as distdist.init_process_group(backend, init_method='tcp://ip:port',                        rank=rank, world_size=world_size)  
共享文件的话需要手动删除上次启动时残留的文件,加上官方有一堆警告,还是建议使用tcp
dist.init_process_group(backend, init_method='file://path',                         rank=rank, world_size=world_size)  
launch方法
「初始化」
这里先讲用launch的方法,关于torch.multiprocessing留到后面讲
在启动后,rank和world_size都会自动被ddp写入环境中,可以提前准备好参数类,如argparse这种
args.rank = int(os.environ['rank'])args.world_size = int(os.environ['world_size'])args.local_rank = int(os.environ['local_rank'])  
首先,在使用distributed包的任何其他函数之前,按照tcp方法进行初始化,需要注意的是需要手动指定一共可用的设备cuda_visible_devices
def dist_setup_launch(args):    # tell ddp available devices [necessary]    os.environ['cuda_visible_devices'] = args.devices    args.rank = int(os.environ['rank'])    args.world_size = int(os.environ['world_size'])    args.local_rank = int(os.environ['local_rank'])    dist.init_process_group(args.backend,                             args.init_method,                            rank=args.rank,                            world_size=args.world_size)    # this is optional, otherwise you may need to specify the     # device when you move something e.g., model.cuda(1)     # or model.to(args.rank)    # setting device makes things easy: model.cuda()    torch.cuda.set_device(args.rank)    print('the current rank is %d | the total ranks are %d'           %(args.rank, args.world_size))  
「distributedsampler」
接下来创建distributedsampler,是否pin_memory,根据你本机的内存决定。pin_memory的意思是提前在内存中申请一部分专门存放tensor。假如说你内存比较小,就会跟虚拟内存,即硬盘进行交换,这样转义到gpu上会比内存直接到gpu耗时。
因而,如果你的内存比较大,可以设置为true;然而,如果开了导致卡顿的情况,建议关闭
from torch.utils.data import dataloader, distributedsamplertrain_sampler = distributedsampler(train_dataset, seed=args.seed)train_dataloader = dataloader(train_dataset,                              pin_memory=true,                              shuffle=(train_sampler is none),                              batch_size=args.per_gpu_train_bs,                              num_workers=args.num_workers,                              sampler=train_sampler)eval_sampler = distributedsampler(eval_dataset, seed=args.seed)eval_dataloader = dataloader(eval_dataset,                             pin_memory=true,                             batch_size=args.per_gpu_eval_bs,                             num_workers=args.num_workers,                             sampler=eval_sampler)  
「加载模型」
然后加载模型,跟dataparallel不同的是需要提前放置到cuda上,还记得上面关于设置cuda_device的语句嘛,因为设置好之后每个进程只能看见一个gpu,所以直接model.cuda(),不需要指定device
同时,我们必须给ddp提示目前是哪个rank
from torch.nn.parallel import distributeddataparallel as ddpmodel = model.cuda()# tell ddp which rankmodel = ddp(model, find_unused_parameters=true, device_ids=[rank])  
注意,当模型带有batch norm时:
if args.syncbn:    nn.syncbatchnorm.convert_sync_batchnorm(model).cuda()  
「训练相关」
每个epoch开始训练的时候,记得用sampler的set_epoch,这样使得每个epoch打乱顺序是不一致的
关于梯度回传和参数更新,跟正常情况无异
for epoch in range(epochs):    # record epochs    train_dataloader.sampler.set_epoch(epoch)    outputs = model(inputs)    loss = loss_fct(outputs, labels)    loss.backward()    optimizer.step()    optimizer.zero_grad()  
这里有一点需要小心,这个loss是各个进程的loss之和,如果想要存储每个step平均损失,可以进行all_reduce操作,进行平均,不妨看官方的小例子来理解下:
>>> # all tensors below are of torch.int64 type.>>> # we have 2 process groups, 2 ranks.>>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank>>> tensortensor([1, 2]) # rank 0tensor([3, 4]) # rank 1>>> dist.all_reduce(tensor, op=reduceop.sum)>>> tensortensor([4, 6]) # rank 0tensor([4, 6]) # rank 1@torch.no_grad()def reduce_value(value, average=true):    world_size = get_world_size()    if world_size >> # all tensors below are of torch.int64 dtype.>>> # we have 2 process groups, 2 ranks.>>> tensor_list = [torch.zeros(2, dtype=torch.int64) for _ in range(2)]>>> tensor_list[tensor([0, 0]), tensor([0, 0])] # rank 0 and 1>>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank>>> tensortensor([1, 2]) # rank 0tensor([3, 4]) # rank 1>>> dist.all_gather(tensor_list, tensor)>>> tensor_list[tensor([1, 2]), tensor([3, 4])] # rank 0[tensor([1, 2]), tensor([3, 4])] # rank 1def sync_across_gpus(t, world_size):    gather_t_tensor = [torch.zeros_like(t) for _ in                        range(world_size)]    dist.all_gather(gather_t_tensor, t)    return torch.cat(gather_t_tensor, dim=0)  
可以简单参考我前面提供的源码的evaluate部分,我们首先将预测和标签比对,把结果为bool的张量存储下来,最终gather求和取平均。
这里还有个有趣的地方,tensor默认的类型可能是int,bool型的res拼接后自动转为0和1了,另外bool型的张量是不支持gather的
def eval(...)    results = torch.tensor([]).cuda()    for step, (inputs, labels) in enumerate(dataloader):        outputs = model(inputs)        res = (outputs.argmax(-1) == labels)        results = torch.cat([results, res], dim=0)    results = sync_across_gpus(results, world_size)    mean_acc = (results.sum() / len(results)).item()    return mean_acc  
「模型保存与加载」
模型保存,参考部分官方教程[7],我们只需要在主进程保存模型即可,注意,这里是被ddp包裹后的,ddp并没有state_dict,这里barrier的目的是为了让其他进程等待主进程保存模型,以防不同步
def save_checkpoint(rank, model, path):    if is_main_process(rank):     # all processes should see same parameters as they all         # start from same random parameters and gradients are         # synchronized in backward passes.        # therefore, saving it in one process is sufficient.        torch.save(model.module.state_dict(), path)        # use a barrier() to keep process 1 waiting for process 0    dist.barrier()  
加载的时候别忘了map_location,我们一开始会保存模型至主进程,这样就会导致cuda:0显存被占据,我们需要将模型remap到其他设备
def load_checkpoint(rank, model, path):    # remap the model from cuda:0 to other devices    map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}    model.module.load_state_dict(        torch.load(path, map_location=map_location)    )  
进程销毁
运行结束后记得销毁进程:
def cleanup():    dist.destroy_process_group()    cleanup()  
如何启动
在终端输入下列命令【单机多卡】
python -m torch.distributed.launch --nproc_per_node=num_gpus           main.py (--arg1 --arg2 --arg3 and all other           arguments of your training script)  
目前torch 1.10以后更推荐用run
torch.distributed.launch -> torch.distributed.run / torchrun  
多机多卡是这样的:
# 第一个节点启动python -m torch.distributed.launch     --nproc_per_node=num_gpus     --nnodes=2     --node_rank=0     --master_addr=192.168.1.1     --master_port=1234 main.py # 第二个节点启动python -m torch.distributed.launch     --nproc_per_node=num_gpus     --nnodes=2     --node_rank=1     --master_addr=192.168.1.1     --master_port=1234 main.py   
mp方法
第二个方法就是利用torch的多线程包
import torch.multiprocessing as mp# rank mp会自动填入def main(rank, arg1, ...):    passif __name__ == '__main__':    mp.spawn(main, nprocs=total_gpus, args=(arg1, ...))  
这种运行的时候就跟正常的python文件一致:
python main.py  
优缺点
「优点」:相比于dp而言,不需要反复创建和销毁线程;ring-allreduce算法提高通信效率;模型同步方便
「缺点」:操作起来可能有些复杂,一般可满足需求的可先试试看dataparallel。


机器人+人工智能 新松的这一步迈的有点大
科锐针对电动汽车和可再生能源市场推出E-系列碳化硅半导体器件
智慧办公前沿!10月24日OffiSmart峰会深圳站议程揭晓!
集成温度传感器LM3911构成的恒温控制电路
ET1100转换AX58100的说明文件-集佰睿
Distributed Data Parallel中的分布式训练
红外热像仪在变电站监测系统中的应用
五分钟看懂昂科威四驱系统
vivo X9Plus星空灰拆解:专治各种不服
数字显示的可调直流稳压电源设计
条码扫描头厂家:如何通过条码扫描头模组提高手持设备的实用价值?
鸿蒙系统为何迟迟不推广?看看任正非怎么说!
汽包液位计的清洗方法与注意事项
你知道MELF电阻的两种工作模式之间的区别吗?
RSL10智能拍摄相机平台如何实现超低功耗事件触发成像
利用高压电池管理架构降低组件成本
昆山同茂致力提供零缺陷音圈电机模组
苹果13手机几月份上市新品
支持Linux和C-like驱动程序的三种驱动方法
美能量子效率测试仪对太阳能电池光谱响应的深入测量