点对点通信
一、send/recv
下面通过torch.distributed的send/recv接口实现一个简易的ping-pong 程序。程序功能如下:
- tensor 初始值为0
- process 0 (或叫rank 0):对tensor加1,然后发送给process 1(或叫rank1);
- process 1:接收到tensor后,对tensor 加2,然后在发送给process 0;
- process 0:接收process1发送的tensor;
1.1 初始化
pytorch中在分布式通信原语使用之前,需要对分布式模块进行初始化。pytorch的分布式模块通过torch.distributed.init_process_group
来完成
通过环境变量
MASTER_ADDR
和MASTER_PORT
设置rank0的IP和PORT信息,rank0的作用相当于是协调节点,需要其他所有节点知道其访问地址;本例中后端选择的是gloo,通过设置
NCCL_DEBUG
环境变量为INFO,输出NCCL的调试信息;init_process_group
:执行网络通信模块的初始化工作- backend:设置后端网络通信的实现库,可选的为gloo、nccl和mpi;本例选择gloo作为backend(注:nccl不支持p2p通信,mpi需要重新编译pytorch源码才能使用);
- rank:为当前rank的index,用于标记当前是第几个rank,取值为0到work_size - 1之间的值;
- world_size: 有多少个进程参与到分布式训练中;
def init_process(rank_id, size, fn, backend='gloo'):
""" Initialize the distributed environment. """
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank_id, world_size=size)
fn(rank_id, size)
1.2 通信逻辑
下面的代码展示了rank0和rank1进行ping-pong通信的实现:
通过rank_id来区分当前应该执行哪一个rank的业务逻辑;
pytorch 中通过
torch.distributed.send(tensor, dst, group=None, tag=0)
和torch.distributed.isend(tensor, dst, group=None, tag=0)
来实现tensor的发送,其中send是同步函数,isend是异步函数;- tensor:要发送的数据
- dst:目标rank,填写目标rank id即可
pytorch中通过
torch.distributed.recv(tensor, src=None, group=None, tag=0)
和torch.distributed.irecv(tensor, src=None, group=None, tag=0)
来实现tensor的接收,其中recv是同步函数,irecv是异步函数;- tensor:接收的数据
- src:接收数据来源的rank id
def run(rank_id, size):
tensor = torch.zeros(1)
if rank_id == 0:
tensor += 1
# Send the tensor to process 1
dist.send(tensor=tensor, dst=1)
print('after send, Rank ', rank_id, ' has data ', tensor[0])
dist.recv(tensor=tensor, src=1)
print('after recv, Rank ', rank_id, ' has data ', tensor[0])
else:
# Receive tensor from process 0
dist.recv(tensor=tensor, src=0)
print('after recv, Rank ', rank_id, ' has data ', tensor[0])
tensor += 1
dist.send(tensor=tensor, dst=0)
print('after send, Rank ', rank_id, ' has data ', tensor[0])
1.3 任务启动
通过下面的代码来启动两个process进行ping-pong通信:
- 这里使用
torch.multiprocessing
来启动多进程,torch.multiprocessing
是python库中multiprocessing的封装,并且兼容了所有的接口 - multiprocessing.set_start_method : 用于指定创建child process的方式,可选的值为
fork
、spawn
和forkserver
。使用spawn
,child process仅会继承parent process的必要resource,file descriptor和handle均不会继承。 multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
: 用来启动child process
if __name__ == "__main__":
size = 2
processes = []
mp.set_start_method("spawn")
for rank in range(size):
p = mp.Process(target=init_process, args=(rank, size, run))
p.start()
processes.append(p)
for p in processes:
p.join()
1.4 测试
完整代码如下:
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def run(rank_id, size):
tensor = torch.zeros(1)
if rank_id == 0:
tensor += 1
# Send the tensor to process 1
dist.send(tensor=tensor, dst=1)
print('after send, Rank ', rank_id, ' has data ', tensor[0])
dist.recv(tensor=tensor, src=1)
print('after recv, Rank ', rank_id, ' has data ', tensor[0])
else:
# Receive tensor from process 0
dist.recv(tensor=tensor, src=0)
print('after recv, Rank ', rank_id, ' has data ', tensor[0])
tensor += 1
dist.send(tensor=tensor, dst=0)
print('after send, Rank ', rank_id, ' has data ', tensor[0])
def init_process(rank_id, size, fn, backend='gloo'):
""" Initialize the distributed environment. """
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank_id, world_size=size)
fn(rank_id, size)
执行效果如下:
root@g48r13:/workspace/communication# python sync_p2p.py
after send, Rank 0 has data tensor(1.)
after recv Rank 1 has data tensor(1.)
after send Rank 1 has data tensor(2.)
after recv, Rank 0 has data tensor(2.)