PyTorch 用 PyTorch 編寫分布式應(yīng)用程序

2020-09-10 10:30 更新
原文: https://pytorch.org/tutorials/intermediate/dist_tuto.html

作者:SébArnold

在這個簡短的教程中,我們將介紹 PyTorch 的分布式軟件包。 我們將了解如何設(shè)置分布式設(shè)置,使用不同的交流策略以及如何仔細(xì)查看軟件包的內(nèi)部結(jié)構(gòu)。

設(shè)定

PyTorch 中包含的分布式軟件包(即torch.distributed)使研究人員和從業(yè)人員可以輕松地并行化他們在跨進(jìn)程和機(jī)器集群的計算。 為此,它利用了傳遞消息的語義,從而允許每個進(jìn)程將數(shù)據(jù)傳遞給其他任何進(jìn)程。 與并行處理(HTG1)包相反,進(jìn)程可以使用不同的通信后端,而不僅限于在同一臺計算機(jī)上執(zhí)行。

為了開始,我們需要能夠同時運(yùn)行多個進(jìn)程的能力。 如果您有權(quán)訪問計算群集,則應(yīng)咨詢本地系統(tǒng)管理員或使用您喜歡的協(xié)調(diào)工具。 (例如 pdsh , clustershell 或其他)。出于本教程的目的,我們將使用以下模板使用一臺計算機(jī)并分叉多個進(jìn)程。

"""run.py:"""
#!/usr/bin/env python
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process


def run(rank, size):
    """ Distributed function to be implemented later. """
    pass


def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)


if __name__ == "__main__":
    size = 2
    processes = []
    for rank in range(size):
        p = Process(target=init_process, args=(rank, size, run))
        p.start()
        processes.append(p)


    for p in processes:
        p.join()

上面的腳本產(chǎn)生了兩個進(jìn)程,每個進(jìn)程將設(shè)置分布式環(huán)境,初始化進(jìn)程組(dist.init_process_group),最后執(zhí)行給定的run函數(shù)。

讓我們看一下init_process功能。 它確保每個進(jìn)程將能夠使用相同的 IP 地址和端口通過主機(jī)進(jìn)行協(xié)調(diào)。 請注意,我們使用了gloo后端,但其他后端也可用。 (請參閱 5.1 節(jié)),我們將在本教程的結(jié)尾部分介紹dist.init_process_group中發(fā)生的魔術(shù),但實(shí)際上,它允許進(jìn)程通過共享位置相互進(jìn)行通信。

點(diǎn)對點(diǎn)通訊

Send and Recv

發(fā)送和接收

數(shù)據(jù)從一個進(jìn)程到另一個進(jìn)程的傳輸稱為點(diǎn)對點(diǎn)通信。 這些是通過sendrecv功能或它們的直接對應(yīng)部分isendirecv實(shí)現(xiàn)的。

"""Blocking point-to-point communication."""


def run(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        dist.send(tensor=tensor, dst=1)
    else:
        # Receive tensor from process 0
        dist.recv(tensor=tensor, src=0)
    print('Rank ', rank, ' has data ', tensor[0])

在上面的示例中,兩個進(jìn)程都從零張量開始,然后進(jìn)程 0 遞增張量并將其發(fā)送到進(jìn)程 1,以便它們都以 1.0 結(jié)尾。 請注意,進(jìn)程 1 需要分配內(nèi)存以存儲它將接收的數(shù)據(jù)。

另請注意,send / recv被阻塞:兩個過程都停止,直到通信完成。 另一方面,無阻塞; 腳本繼續(xù)執(zhí)行,方法返回Work對象,我們可以選擇wait()對象。

"""Non-blocking point-to-point communication."""


def run(rank, size):
    tensor = torch.zeros(1)
    req = None
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        req = dist.isend(tensor=tensor, dst=1)
        print('Rank 0 started sending')
    else:
        # Receive tensor from process 0
        req = dist.irecv(tensor=tensor, src=0)
        print('Rank 1 started receiving')
    req.wait()
    print('Rank ', rank, ' has data ', tensor[0])

使用立即數(shù)時,我們必須謹(jǐn)慎使用已發(fā)送和已接收的張量。 由于我們不知道何時將數(shù)據(jù)傳遞給其他進(jìn)程,因此在req.wait()完成之前,我們既不應(yīng)該修改發(fā)送的張量也不應(yīng)該訪問接收的張量。 換一種說法,

  • dist.isend()之后寫入tensor將導(dǎo)致不確定的行為。
  • dist.irecv()之后從tensor讀取將導(dǎo)致不確定的行為。

但是,在執(zhí)行req.wait()之后,我們可以確保進(jìn)行了通信,并且tensor[0]中存儲的值為 1.0。

當(dāng)我們希望對流程的通信進(jìn)行精細(xì)控制時,點(diǎn)對點(diǎn)通信非常有用。 它們可用于實(shí)現(xiàn)精美的算法,例如百度的 DeepSpeech 或 Facebook 的大規(guī)模實(shí)驗(yàn)中使用的算法。(請參閱  4.1 節(jié)

集體交流

Scatter 分散 |  Gather 收集 | |  Reduce 降低 |  All-Reduce 全減少 | |  Broadcast 廣播 |  All-Gather 全聚 |

與點(diǎn)對點(diǎn)通信相反,集合允許跨組中所有進(jìn)程的通信模式。 小組是我們所有過程的子集。 要創(chuàng)建組,我們可以將等級列表傳遞給dist.new_group(group)。 默認(rèn)情況下,集合在所有進(jìn)程(也稱為世界)上執(zhí)行。 例如,為了獲得所有過程中所有張量的總和,我們可以使用dist.all_reduce(tensor, op, group)集合。

""" All-Reduce example."""
def run(rank, size):
    """ Simple point-to-point communication. """
    group = dist.new_group([0, 1])
    tensor = torch.ones(1)
    dist.all_reduce(tensor, op=dist.reduce_op.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor[0])

由于我們需要組中所有張量的總和,因此我們將dist.reduce_op.SUM用作化簡運(yùn)算符。 一般來說,任何可交換的數(shù)學(xué)運(yùn)算都可以用作運(yùn)算符。 PyTorch 開箱即用,帶有 4 個這樣的運(yùn)算符,它們都在元素級運(yùn)行:

  • dist.reduce_op.SUM
  • dist.reduce_op.PRODUCT
  • dist.reduce_op.MAX,
  • dist.reduce_op.MIN。

除了dist.all_reduce(tensor, op, group)之外,PyTorch 中目前共有 6 個集合體。

  • dist.broadcast(tensor, src, group):將tensorsrc復(fù)制到所有其他進(jìn)程。
  • dist.reduce(tensor, dst, op, group):將op應(yīng)用于所有tensor,并將結(jié)果存儲在dst中。
  • dist.all_reduce(tensor, op, group):與 reduce 相同,但是結(jié)果存儲在所有進(jìn)程中。
  • dist.scatter(tensor, src, scatter_list, group):將 張量scatter_list[i]復(fù)制到 過程。
  • dist.gather(tensor, dst, gather_list, group):從dst中的所有進(jìn)程復(fù)制tensor。
  • dist.all_gather(tensor_list, tensor, group):將所有進(jìn)程中的tensor從所有進(jìn)程復(fù)制到tensor_list。
  • dist.barrier(group):阻止<cite>組</cite>中的所有進(jìn)程,直到每個進(jìn)程都進(jìn)入此功能。

分布式訓(xùn)練

注意:您可以在此 GitHub 存儲庫的中找到本節(jié)的示例腳本。

現(xiàn)在我們了解了分布式模塊的工作原理,讓我們用它編寫一些有用的東西。 我們的目標(biāo)是復(fù)制 DistributedDataParallel 的功能。 當(dāng)然,這將是一個教學(xué)示例,在現(xiàn)實(shí)世界中,您應(yīng)該使用上面鏈接的經(jīng)過官方測試,優(yōu)化的最佳版本。

很簡單,我們想要實(shí)現(xiàn)隨機(jī)梯度下降的分布式版本。 我們的腳本將允許所有進(jìn)程在其數(shù)據(jù)批次上計算其模型的梯度,然后平均其梯度。 為了在更改進(jìn)程數(shù)時確保相似的收斂結(jié)果,我們首先必須對數(shù)據(jù)集進(jìn)行分區(qū)。 (您也可以使用 tnt.dataset.SplitDataset 代替下面的代碼段。)

""" Dataset partitioning helper """
class Partition(object):


    def __init__(self, data, index):
        self.data = data
        self.index = index


    def __len__(self):
        return len(self.index)


    def __getitem__(self, index):
        data_idx = self.index[index]
        return self.data[data_idx]


class DataPartitioner(object):


    def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
        self.data = data
        self.partitions = []
        rng = Random()
        rng.seed(seed)
        data_len = len(data)
        indexes = [x for x in range(0, data_len)]
        rng.shuffle(indexes)


        for frac in sizes:
            part_len = int(frac * data_len)
            self.partitions.append(indexes[0:part_len])
            indexes = indexes[part_len:]


    def use(self, partition):
        return Partition(self.data, self.partitions[partition])

使用上面的代碼片段,我們現(xiàn)在可以使用以下幾行簡單地對任何數(shù)據(jù)集進(jìn)行分區(qū):

""" Partitioning MNIST """
def partition_dataset():
    dataset = datasets.MNIST('./data', train=True, download=True,
                             transform=transforms.Compose([
                                 transforms.ToTensor(),
                                 transforms.Normalize((0.1307,), (0.3081,))
                             ]))
    size = dist.get_world_size()
    bsz = 128 / float(size)
    partition_sizes = [1.0 / size for _ in range(size)]
    partition = DataPartitioner(dataset, partition_sizes)
    partition = partition.use(dist.get_rank())
    train_set = torch.utils.data.DataLoader(partition,
                                         batch_size=bsz,
                                         shuffle=True)
    return train_set, bsz

假設(shè)我們有 2 個副本,則每個進(jìn)程的train_set為 60000/2 = 30000 個樣本。 我們還將批量大小除以副本數(shù),以使整體批量大小保持為 128。

現(xiàn)在,我們可以編寫通常的向前-向后優(yōu)化訓(xùn)練代碼,并添加一個函數(shù)調(diào)用以平均模型的梯度。 (以下內(nèi)容主要是受 PyTorch MNIST 官方示例的啟發(fā))。

""" Distributed Synchronous SGD Example """
def run(rank, size):
    torch.manual_seed(1234)
    train_set, bsz = partition_dataset()
    model = Net()
    optimizer = optim.SGD(model.parameters(),
                          lr=0.01, momentum=0.5)


    num_batches = ceil(len(train_set.dataset) / float(bsz))
    for epoch in range(10):
        epoch_loss = 0.0
        for data, target in train_set:
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            epoch_loss += loss.item()
            loss.backward()
            average_gradients(model)
            optimizer.step()
        print('Rank ', dist.get_rank(), ', epoch ',
              epoch, ': ', epoch_loss / num_batches)

仍然需要執(zhí)行average_gradients(model)函數(shù),該函數(shù)只需要一個模型并在整個世界上平均其梯度即可。

""" Gradient averaging. """
def average_gradients(model):
    size = float(dist.get_world_size())
    for param in model.parameters():
        dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM)
        param.grad.data /= size

等! 我們成功實(shí)現(xiàn)了分布式同步 SGD,并且可以在大型計算機(jī)集群上訓(xùn)練任何模型。

注意:盡管從技術(shù)上來說最后一句話是是正確的,但要實(shí)現(xiàn)同步 SGD 的生產(chǎn)級實(shí)現(xiàn),還需要更多技巧。 同樣,請使用經(jīng)過測試和優(yōu)化的

我們自己的環(huán)減少

另一個挑戰(zhàn)是,假設(shè)我們要實(shí)現(xiàn) DeepSpeech 的高效環(huán)網(wǎng)減少。 使用點(diǎn)對點(diǎn)集合很容易實(shí)現(xiàn)。

""" Implementation of a ring-reduce with addition. """
def allreduce(send, recv):
    rank = dist.get_rank()
    size = dist.get_world_size()
    send_buff = th.zeros(send.size())
    recv_buff = th.zeros(send.size())
    accum = th.zeros(send.size())
    accum[:] = send[:]


    left = ((rank - 1) + size) % size
    right = (rank + 1) % size


    for i in range(size - 1):
        if i % 2 == 0:
            # Send send_buff
            send_req = dist.isend(send_buff, right)
            dist.recv(recv_buff, left)
            accum[:] += recv[:]
        else:
            # Send recv_buff
            send_req = dist.isend(recv_buff, right)
            dist.recv(send_buff, left)
            accum[:] += send[:]
        send_req.wait()
    recv[:] = accum[:]

在上面的腳本中,allreduce(send, recv)函數(shù)的簽名與 PyTorch 中的簽名略有不同。 它需要一個recv張量,并將所有send張量的總和存儲在其中。 作為練習(xí)留給讀者,我們的版本與 DeepSpeech 中的版本之間仍然有一個區(qū)別:它們的實(shí)現(xiàn)將梯度張量劃分為個塊,以便最佳地利用通信帶寬。 (提示: torch.chunk)

進(jìn)階主題

現(xiàn)在,我們準(zhǔn)備發(fā)現(xiàn)torch.distributed的一些更高級的功能。 由于涉及的內(nèi)容很多,因此本節(jié)分為兩個小節(jié):

  1. 通訊后端:我們在這里學(xué)習(xí)如何使用 MPI 和 Gloo 進(jìn)行 GPU-GPU 通訊。
  2. 初始化方法:我們了解如何最好地設(shè)置dist.init_process_group()中的初始協(xié)調(diào)階段。

通訊后端

torch.distributed最優(yōu)雅的方面之一是它具有抽象能力,并且可以在不同的后端之上構(gòu)建。 如前所述,目前在 PyTorch 中實(shí)現(xiàn)了三個后端:Glo,NCCL 和 MPI。 它們各自具有不同的規(guī)格和權(quán)衡,具體取決于所需的用例。 可以在中找到支持功能的對照表。

Gloo 后端

到目前為止,我們已經(jīng)廣泛使用 Gloo 后端。 它作為開發(fā)平臺非常方便,因?yàn)樗寻陬A(yù)編譯的 PyTorch 二進(jìn)制文件中,并且可在 Linux(自 0.2 開始)和 macOS(自 1.3 開始)上運(yùn)行。 它支持 CPU 上的所有點(diǎn)對點(diǎn)和集合操作,以及 GPU 上的所有集合操作。 CUDA 張量的集體運(yùn)算的實(shí)現(xiàn)未像 NCCL 后端提供的那樣優(yōu)化。

如您所知,如果將model放在 GPU 上,我們的分布式 SGD 示例將無法正常工作。 為了使用多個 GPU,讓我們還進(jìn)行以下修改:

  1. 使用device = torch.device("cuda:{}".format(rank))
  2. model = Net()   model = Net().to(device)
  3. 使用data, target = data.to(device), target.to(device)

經(jīng)過上述修改,我們的模型現(xiàn)在可以在兩個 GPU 上訓(xùn)練,您可以使用watch nvidia-smi監(jiān)視其使用情況。

MPI 后端

消息傳遞接口(MPI)是來自高性能計算領(lǐng)域的標(biāo)準(zhǔn)化工具。 它允許進(jìn)行點(diǎn)對點(diǎn)和集體通信,并且是torch.distributed ;API 的主要靈感。 存在幾種針對不同目的而優(yōu)化的 MPI 實(shí)現(xiàn)(例如 Open-MPI , MVAPICH2 ,  Intel MPI)。 使用 MPI 后端的優(yōu)勢在于 MPI 在大型計算機(jī)群集上的廣泛可用性和高水平的優(yōu)化。 一些 最近的 實(shí)現(xiàn)也能夠利用 CUDA IPC 和 GPU Direct 技術(shù)來避免通過 CPU 進(jìn)行內(nèi)存復(fù)制。

不幸的是,PyTorch 的二進(jìn)制文件不能包含 MPI 實(shí)現(xiàn),我們將不得不手動對其進(jìn)行重新編譯。 幸運(yùn)的是,鑒于編譯后,PyTorch 會單獨(dú)查看以查找可用的 MPI 實(shí)現(xiàn),因此此過程相當(dāng)簡單。 以下步驟通過從源安裝 PyTorch 來安裝 MPI 后端。

  1. 創(chuàng)建并激活您的 Anaconda 環(huán)境,按照指南的要求安裝所有先決條件,但是尚未運(yùn)行。
  2. 選擇并安裝您喜歡的 MPI 實(shí)現(xiàn)。 請注意,啟用支持 CUDA 的 MPI 可能需要一些其他步驟。 在我們的情況下,我們將堅(jiān)持不支持 GPU 的 Open-MPI :conda install -c conda-forge openmpi
  3. 現(xiàn)在,轉(zhuǎn)到克隆的 PyTorch 存儲庫并執(zhí)行python setup.py install。

為了測試我們新安裝的后端,需要進(jìn)行一些修改。

  1. if __name__ == '__main__':下的內(nèi)容替換為init_process(0, 0, run, backend='mpi')
  2. 運(yùn)行mpirun -n 4 python myscript.py。

這些更改的原因是,MPI 需要在生成流程之前創(chuàng)建自己的環(huán)境。 MPI 也將生成自己的進(jìn)程,并執(zhí)行初始化方法中描述的握手,使init_process_groupranksize參數(shù)多余。 實(shí)際上,這非常強(qiáng)大,因?yàn)槟梢詫⒏郊訁?shù)傳遞給mpirun,以便為每個進(jìn)程定制計算資源。 (諸如每個進(jìn)程的內(nèi)核數(shù)量,將計算機(jī)手動分配給特定等級,以及 之類的東西。)這樣做,您應(yīng)該獲得與其他通信后端相同的熟悉輸出。

NCCL 后端

NCCL 后端提供了針對 CUDA 張量的集體操作的優(yōu)化實(shí)現(xiàn)。 如果僅將 CUDA 張量用于集體操作,請考慮使用此后端以獲得最佳性能。 NCCL 后端包含在具有 CUDA 支持的預(yù)構(gòu)建二進(jìn)制文件中。

初始化方法

為了完成本教程,我們來談?wù)勎覀兎Q為的第一個功能:dist.init_process_group(backend, init_method)。 特別是,我們將介紹負(fù)責(zé)每個過程之間初始協(xié)調(diào)步驟的不同初始化方法。 這些方法使您可以定義協(xié)調(diào)方式。 根據(jù)您的硬件設(shè)置,這些方法之一自然應(yīng)該比其他方法更合適。 除了以下各節(jié)之外,您還應(yīng)該查看官方文檔。

環(huán)境變量

在本教程中,我們一直在使用環(huán)境變量初始化方法。 通過在所有計算機(jī)上設(shè)置以下四個環(huán)境變量,所有進(jìn)程將能夠正確連接到主服務(wù)器,獲取有關(guān)其他進(jìn)程的信息,最后與它們握手。

  • MASTER_PORT:計算機(jī)上的空閑端口,它將托管等級為 0 的進(jìn)程。
  • MASTER_ADDR:將以等級 0 托管進(jìn)程的計算機(jī)的 IP 地址。
  • WORLD_SIZE:進(jìn)程總數(shù),這樣主機(jī)可以知道要等待多少個工人。
  • RANK:每個進(jìn)程的等級,因此他們將知道它是否是工人的主人。

共享文件系統(tǒng)

共享文件系統(tǒng)要求所有進(jìn)程都有權(quán)訪問共享文件系統(tǒng),并將通過共享文件進(jìn)行協(xié)調(diào)。 這意味著每個進(jìn)程都將打開文件,寫入文件信息,然后等到每個人都打開文件。 之后,所有必需的信息將可用于所有過程。 為了避免爭用情況,文件系統(tǒng)必須通過 fcntl 支持鎖定。

dist.init_process_group(
    init_method='file:///mnt/nfs/sharedfile',
    rank=args.rank,
    world_size=4)

TCP

通過提供等級 0 和可訪問的端口號的進(jìn)程的 IP 地址,可以實(shí)現(xiàn)通過 TCP 進(jìn)行初始化。 在這里,所有工作人員都可以連接到等級為 0 的流程,并交換有關(guān)如何相互聯(lián)系的信息。

dist.init_process_group(
    init_method='tcp://10.1.1.20:23456',
    rank=args.rank,
    world_size=4)




以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號