点对点通信

一、send/recv

下面通过torch.distributed的send/recv接口实现一个简易的ping-pong 程序。程序功能如下:

  • tensor 初始值为0
  • process 0 (或叫rank 0):对tensor加1,然后发送给process 1(或叫rank1);
  • process 1:接收到tensor后,对tensor 加2,然后在发送给process 0;
  • process 0:接收process1发送的tensor;

img

1.1 初始化

pytorch中在分布式通信原语使用之前,需要对分布式模块进行初始化。pytorch的分布式模块通过torch.distributed.init_process_group来完成

  • 通过环境变量MASTER_ADDRMASTER_PORT设置rank0的IP和PORT信息,rank0的作用相当于是协调节点,需要其他所有节点知道其访问地址;

  • 本例中后端选择的是gloo,通过设置NCCL_DEBUG环境变量为INFO,输出NCCL的调试信息;

  • init_process_group:执行网络通信模块的初始化工作

    • backend:设置后端网络通信的实现库,可选的为gloo、nccl和mpi;本例选择gloo作为backend(注:nccl不支持p2p通信,mpi需要重新编译pytorch源码才能使用);
    • rank:为当前rank的index,用于标记当前是第几个rank,取值为0到work_size - 1之间的值;
    • world_size: 有多少个进程参与到分布式训练中;
def init_process(rank_id, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank_id, world_size=size)
    fn(rank_id, size)

1.2 通信逻辑

下面的代码展示了rank0和rank1进行ping-pong通信的实现:

  • 通过rank_id来区分当前应该执行哪一个rank的业务逻辑;

  • pytorch 中通过torch.distributed.send(tensor, dst, group=None, tag=0)torch.distributed.isend(tensor, dst, group=None, tag=0) 来实现tensor的发送,其中send是同步函数,isend是异步函数;

    • tensor:要发送的数据
    • dst:目标rank,填写目标rank id即可
  • pytorch中通过torch.distributed.recv(tensor, src=None, group=None, tag=0)torch.distributed.irecv(tensor, src=None, group=None, tag=0)来实现tensor的接收,其中recv是同步函数,irecv是异步函数;

    • tensor:接收的数据
    • src:接收数据来源的rank id
def run(rank_id, size):
    tensor = torch.zeros(1)
    if rank_id == 0:
        tensor += 1
        # Send the tensor to process 1
        dist.send(tensor=tensor, dst=1)
        print('after send, Rank ', rank_id, ' has data ', tensor[0])

        dist.recv(tensor=tensor, src=1)
        print('after recv, Rank ', rank_id, ' has data ', tensor[0])
    else:
        # Receive tensor from process 0
        dist.recv(tensor=tensor, src=0)
        print('after recv, Rank ', rank_id, ' has data ', tensor[0])

        tensor += 1
        dist.send(tensor=tensor, dst=0)
        print('after send, Rank ', rank_id, ' has data ', tensor[0])

1.3 任务启动

通过下面的代码来启动两个process进行ping-pong通信:

  • 这里使用torch.multiprocessing来启动多进程,torch.multiprocessing是python库中multiprocessing的封装,并且兼容了所有的接口
  • multiprocessing.set_start_method : 用于指定创建child process的方式,可选的值为forkspawnforkserver。使用spawn,child process仅会继承parent process的必要resource,file descriptor和handle均不会继承。
  • multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) : 用来启动child process
if __name__ == "__main__":
    size = 2
    processes = []
    mp.set_start_method("spawn")
    for rank in range(size):
        p = mp.Process(target=init_process, args=(rank, size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

1.4 测试

完整代码如下:

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def run(rank_id, size):
    tensor = torch.zeros(1)
    if rank_id == 0:
        tensor += 1
        # Send the tensor to process 1
        dist.send(tensor=tensor, dst=1)
        print('after send, Rank ', rank_id, ' has data ', tensor[0])
        dist.recv(tensor=tensor, src=1)
        print('after recv, Rank ', rank_id, ' has data ', tensor[0])
    else:
        # Receive tensor from process 0
        dist.recv(tensor=tensor, src=0)
        print('after recv, Rank ', rank_id, ' has data ', tensor[0])
        tensor += 1
        dist.send(tensor=tensor, dst=0)
        print('after send, Rank ', rank_id, ' has data ', tensor[0])


def init_process(rank_id, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank_id, world_size=size)
    fn(rank_id, size)

执行效果如下:

root@g48r13:/workspace/communication# python sync_p2p.py
after send, Rank  0  has data  tensor(1.)
after recv Rank  1  has data  tensor(1.)
after send Rank  1  has data  tensor(2.)
after recv, Rank  0  has data  tensor(2.)

参考链接

Pytorch - 分布式通信原语(附源码) - 颜挺帅的文章 - 知乎

results matching ""

    No results matching ""