原文: https://pytorch.org/tutorials/intermediate/dist_tuto.html
作者:SébArnold
在這個簡短的教程中,我們將介紹 PyTorch 的分布式軟件包。 我們將了解如何設(shè)置分布式設(shè)置,使用不同的交流策略以及如何仔細(xì)查看軟件包的內(nèi)部結(jié)構(gòu)。
PyTorch 中包含的分布式軟件包(即torch.distributed
)使研究人員和從業(yè)人員可以輕松地并行化他們在跨進(jìn)程和機(jī)器集群的計算。 為此,它利用了傳遞消息的語義,從而允許每個進(jìn)程將數(shù)據(jù)傳遞給其他任何進(jìn)程。 與并行處理(HTG1)包相反,進(jìn)程可以使用不同的通信后端,而不僅限于在同一臺計算機(jī)上執(zhí)行。
為了開始,我們需要能夠同時運(yùn)行多個進(jìn)程的能力。 如果您有權(quán)訪問計算群集,則應(yīng)咨詢本地系統(tǒng)管理員或使用您喜歡的協(xié)調(diào)工具。 (例如 pdsh , clustershell 或其他)。出于本教程的目的,我們將使用以下模板使用一臺計算機(jī)并分叉多個進(jìn)程。
"""run.py:"""
#!/usr/bin/env python
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process
def run(rank, size):
""" Distributed function to be implemented later. """
pass
def init_process(rank, size, fn, backend='gloo'):
""" Initialize the distributed environment. """
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=size)
fn(rank, size)
if __name__ == "__main__":
size = 2
processes = []
for rank in range(size):
p = Process(target=init_process, args=(rank, size, run))
p.start()
processes.append(p)
for p in processes:
p.join()
上面的腳本產(chǎn)生了兩個進(jìn)程,每個進(jìn)程將設(shè)置分布式環(huán)境,初始化進(jìn)程組(dist.init_process_group
),最后執(zhí)行給定的run
函數(shù)。
讓我們看一下init_process
功能。 它確保每個進(jìn)程將能夠使用相同的 IP 地址和端口通過主機(jī)進(jìn)行協(xié)調(diào)。 請注意,我們使用了gloo
后端,但其他后端也可用。 (請參閱 5.1 節(jié)),我們將在本教程的結(jié)尾部分介紹dist.init_process_group
中發(fā)生的魔術(shù),但實(shí)際上,它允許進(jìn)程通過共享位置相互進(jìn)行通信。
發(fā)送和接收
數(shù)據(jù)從一個進(jìn)程到另一個進(jìn)程的傳輸稱為點(diǎn)對點(diǎn)通信。 這些是通過send
和recv
功能或它們的直接對應(yīng)部分isend
和irecv
實(shí)現(xiàn)的。
"""Blocking point-to-point communication."""
def run(rank, size):
tensor = torch.zeros(1)
if rank == 0:
tensor += 1
# Send the tensor to process 1
dist.send(tensor=tensor, dst=1)
else:
# Receive tensor from process 0
dist.recv(tensor=tensor, src=0)
print('Rank ', rank, ' has data ', tensor[0])
在上面的示例中,兩個進(jìn)程都從零張量開始,然后進(jìn)程 0 遞增張量并將其發(fā)送到進(jìn)程 1,以便它們都以 1.0 結(jié)尾。 請注意,進(jìn)程 1 需要分配內(nèi)存以存儲它將接收的數(shù)據(jù)。
另請注意,send
/ recv
被阻塞:兩個過程都停止,直到通信完成。 另一方面,無阻塞; 腳本繼續(xù)執(zhí)行,方法返回Work
對象,我們可以選擇wait()
對象。
"""Non-blocking point-to-point communication."""
def run(rank, size):
tensor = torch.zeros(1)
req = None
if rank == 0:
tensor += 1
# Send the tensor to process 1
req = dist.isend(tensor=tensor, dst=1)
print('Rank 0 started sending')
else:
# Receive tensor from process 0
req = dist.irecv(tensor=tensor, src=0)
print('Rank 1 started receiving')
req.wait()
print('Rank ', rank, ' has data ', tensor[0])
使用立即數(shù)時,我們必須謹(jǐn)慎使用已發(fā)送和已接收的張量。 由于我們不知道何時將數(shù)據(jù)傳遞給其他進(jìn)程,因此在req.wait()
完成之前,我們既不應(yīng)該修改發(fā)送的張量也不應(yīng)該訪問接收的張量。 換一種說法,
dist.isend()
之后寫入tensor
將導(dǎo)致不確定的行為。dist.irecv()
之后從tensor
讀取將導(dǎo)致不確定的行為。但是,在執(zhí)行req.wait()
之后,我們可以確保進(jìn)行了通信,并且tensor[0]
中存儲的值為 1.0。
當(dāng)我們希望對流程的通信進(jìn)行精細(xì)控制時,點(diǎn)對點(diǎn)通信非常有用。 它們可用于實(shí)現(xiàn)精美的算法,例如百度的 DeepSpeech 或 Facebook 的大規(guī)模實(shí)驗(yàn)中使用的算法。(請參閱 4.1 節(jié))
| 分散 | 收集 | | 降低 | 全減少 | | 廣播 | 全聚 |
與點(diǎn)對點(diǎn)通信相反,集合允許跨組中所有進(jìn)程的通信模式。 小組是我們所有過程的子集。 要創(chuàng)建組,我們可以將等級列表傳遞給dist.new_group(group)
。 默認(rèn)情況下,集合在所有進(jìn)程(也稱為世界)上執(zhí)行。 例如,為了獲得所有過程中所有張量的總和,我們可以使用dist.all_reduce(tensor, op, group)
集合。
""" All-Reduce example."""
def run(rank, size):
""" Simple point-to-point communication. """
group = dist.new_group([0, 1])
tensor = torch.ones(1)
dist.all_reduce(tensor, op=dist.reduce_op.SUM, group=group)
print('Rank ', rank, ' has data ', tensor[0])
由于我們需要組中所有張量的總和,因此我們將dist.reduce_op.SUM
用作化簡運(yùn)算符。 一般來說,任何可交換的數(shù)學(xué)運(yùn)算都可以用作運(yùn)算符。 PyTorch 開箱即用,帶有 4 個這樣的運(yùn)算符,它們都在元素級運(yùn)行:
dist.reduce_op.SUM
,dist.reduce_op.PRODUCT
,dist.reduce_op.MAX
,dist.reduce_op.MIN
。除了dist.all_reduce(tensor, op, group)
之外,PyTorch 中目前共有 6 個集合體。
dist.broadcast(tensor, src, group)
:將tensor
從src
復(fù)制到所有其他進(jìn)程。dist.reduce(tensor, dst, op, group)
:將op
應(yīng)用于所有tensor
,并將結(jié)果存儲在dst
中。dist.all_reduce(tensor, op, group)
:與 reduce 相同,但是結(jié)果存儲在所有進(jìn)程中。dist.scatter(tensor, src, scatter_list, group)
:將
張量scatter_list[i]
復(fù)制到
過程。dist.gather(tensor, dst, gather_list, group)
:從dst
中的所有進(jìn)程復(fù)制tensor
。dist.all_gather(tensor_list, tensor, group)
:將所有進(jìn)程中的tensor
從所有進(jìn)程復(fù)制到tensor_list
。dist.barrier(group)
:阻止<cite>組</cite>中的所有進(jìn)程,直到每個進(jìn)程都進(jìn)入此功能。注意:您可以在此 GitHub 存儲庫的中找到本節(jié)的示例腳本。
現(xiàn)在我們了解了分布式模塊的工作原理,讓我們用它編寫一些有用的東西。 我們的目標(biāo)是復(fù)制 DistributedDataParallel 的功能。 當(dāng)然,這將是一個教學(xué)示例,在現(xiàn)實(shí)世界中,您應(yīng)該使用上面鏈接的經(jīng)過官方測試,優(yōu)化的最佳版本。
很簡單,我們想要實(shí)現(xiàn)隨機(jī)梯度下降的分布式版本。 我們的腳本將允許所有進(jìn)程在其數(shù)據(jù)批次上計算其模型的梯度,然后平均其梯度。 為了在更改進(jìn)程數(shù)時確保相似的收斂結(jié)果,我們首先必須對數(shù)據(jù)集進(jìn)行分區(qū)。 (您也可以使用 tnt.dataset.SplitDataset 代替下面的代碼段。)
""" Dataset partitioning helper """
class Partition(object):
def __init__(self, data, index):
self.data = data
self.index = index
def __len__(self):
return len(self.index)
def __getitem__(self, index):
data_idx = self.index[index]
return self.data[data_idx]
class DataPartitioner(object):
def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
self.data = data
self.partitions = []
rng = Random()
rng.seed(seed)
data_len = len(data)
indexes = [x for x in range(0, data_len)]
rng.shuffle(indexes)
for frac in sizes:
part_len = int(frac * data_len)
self.partitions.append(indexes[0:part_len])
indexes = indexes[part_len:]
def use(self, partition):
return Partition(self.data, self.partitions[partition])
使用上面的代碼片段,我們現(xiàn)在可以使用以下幾行簡單地對任何數(shù)據(jù)集進(jìn)行分區(qū):
""" Partitioning MNIST """
def partition_dataset():
dataset = datasets.MNIST('./data', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
size = dist.get_world_size()
bsz = 128 / float(size)
partition_sizes = [1.0 / size for _ in range(size)]
partition = DataPartitioner(dataset, partition_sizes)
partition = partition.use(dist.get_rank())
train_set = torch.utils.data.DataLoader(partition,
batch_size=bsz,
shuffle=True)
return train_set, bsz
假設(shè)我們有 2 個副本,則每個進(jìn)程的train_set
為 60000/2 = 30000 個樣本。 我們還將批量大小除以副本數(shù),以使整體批量大小保持為 128。
現(xiàn)在,我們可以編寫通常的向前-向后優(yōu)化訓(xùn)練代碼,并添加一個函數(shù)調(diào)用以平均模型的梯度。 (以下內(nèi)容主要是受 PyTorch MNIST 官方示例的啟發(fā))。
""" Distributed Synchronous SGD Example """
def run(rank, size):
torch.manual_seed(1234)
train_set, bsz = partition_dataset()
model = Net()
optimizer = optim.SGD(model.parameters(),
lr=0.01, momentum=0.5)
num_batches = ceil(len(train_set.dataset) / float(bsz))
for epoch in range(10):
epoch_loss = 0.0
for data, target in train_set:
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
epoch_loss += loss.item()
loss.backward()
average_gradients(model)
optimizer.step()
print('Rank ', dist.get_rank(), ', epoch ',
epoch, ': ', epoch_loss / num_batches)
仍然需要執(zhí)行average_gradients(model)
函數(shù),該函數(shù)只需要一個模型并在整個世界上平均其梯度即可。
""" Gradient averaging. """
def average_gradients(model):
size = float(dist.get_world_size())
for param in model.parameters():
dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM)
param.grad.data /= size
等! 我們成功實(shí)現(xiàn)了分布式同步 SGD,并且可以在大型計算機(jī)集群上訓(xùn)練任何模型。
注意:盡管從技術(shù)上來說最后一句話是是正確的,但要實(shí)現(xiàn)同步 SGD 的生產(chǎn)級實(shí)現(xiàn),還需要更多技巧。 同樣,請使用經(jīng)過測試和優(yōu)化的。
另一個挑戰(zhàn)是,假設(shè)我們要實(shí)現(xiàn) DeepSpeech 的高效環(huán)網(wǎng)減少。 使用點(diǎn)對點(diǎn)集合很容易實(shí)現(xiàn)。
""" Implementation of a ring-reduce with addition. """
def allreduce(send, recv):
rank = dist.get_rank()
size = dist.get_world_size()
send_buff = th.zeros(send.size())
recv_buff = th.zeros(send.size())
accum = th.zeros(send.size())
accum[:] = send[:]
left = ((rank - 1) + size) % size
right = (rank + 1) % size
for i in range(size - 1):
if i % 2 == 0:
# Send send_buff
send_req = dist.isend(send_buff, right)
dist.recv(recv_buff, left)
accum[:] += recv[:]
else:
# Send recv_buff
send_req = dist.isend(recv_buff, right)
dist.recv(send_buff, left)
accum[:] += send[:]
send_req.wait()
recv[:] = accum[:]
在上面的腳本中,allreduce(send, recv)
函數(shù)的簽名與 PyTorch 中的簽名略有不同。 它需要一個recv
張量,并將所有send
張量的總和存儲在其中。 作為練習(xí)留給讀者,我們的版本與 DeepSpeech 中的版本之間仍然有一個區(qū)別:它們的實(shí)現(xiàn)將梯度張量劃分為個塊,以便最佳地利用通信帶寬。 (提示: torch.chunk)
現(xiàn)在,我們準(zhǔn)備發(fā)現(xiàn)torch.distributed
的一些更高級的功能。 由于涉及的內(nèi)容很多,因此本節(jié)分為兩個小節(jié):
dist.init_process_group()
中的初始協(xié)調(diào)階段。torch.distributed
最優(yōu)雅的方面之一是它具有抽象能力,并且可以在不同的后端之上構(gòu)建。 如前所述,目前在 PyTorch 中實(shí)現(xiàn)了三個后端:Glo,NCCL 和 MPI。 它們各自具有不同的規(guī)格和權(quán)衡,具體取決于所需的用例。 可以在中找到支持功能的對照表。
Gloo 后端
到目前為止,我們已經(jīng)廣泛使用 Gloo 后端。 它作為開發(fā)平臺非常方便,因?yàn)樗寻陬A(yù)編譯的 PyTorch 二進(jìn)制文件中,并且可在 Linux(自 0.2 開始)和 macOS(自 1.3 開始)上運(yùn)行。 它支持 CPU 上的所有點(diǎn)對點(diǎn)和集合操作,以及 GPU 上的所有集合操作。 CUDA 張量的集體運(yùn)算的實(shí)現(xiàn)未像 NCCL 后端提供的那樣優(yōu)化。
如您所知,如果將model放在 GPU 上,我們的分布式 SGD 示例將無法正常工作。 為了使用多個 GPU,讓我們還進(jìn)行以下修改:
device = torch.device("cuda:{}".format(rank))
model = Net()
model = Net().to(device)
data, target = data.to(device), target.to(device)
經(jīng)過上述修改,我們的模型現(xiàn)在可以在兩個 GPU 上訓(xùn)練,您可以使用watch nvidia-smi監(jiān)視其使用情況。
MPI 后端
消息傳遞接口(MPI)是來自高性能計算領(lǐng)域的標(biāo)準(zhǔn)化工具。 它允許進(jìn)行點(diǎn)對點(diǎn)和集體通信,并且是torch.distributed
;API 的主要靈感。 存在幾種針對不同目的而優(yōu)化的 MPI 實(shí)現(xiàn)(例如 Open-MPI , MVAPICH2 ,
Intel MPI)。 使用 MPI 后端的優(yōu)勢在于 MPI 在大型計算機(jī)群集上的廣泛可用性和高水平的優(yōu)化。 一些 最近的 實(shí)現(xiàn)也能夠利用 CUDA IPC 和 GPU Direct 技術(shù)來避免通過 CPU 進(jìn)行內(nèi)存復(fù)制。
不幸的是,PyTorch 的二進(jìn)制文件不能包含 MPI 實(shí)現(xiàn),我們將不得不手動對其進(jìn)行重新編譯。 幸運(yùn)的是,鑒于編譯后,PyTorch 會單獨(dú)查看以查找可用的 MPI 實(shí)現(xiàn),因此此過程相當(dāng)簡單。 以下步驟通過從源安裝 PyTorch 來安裝 MPI 后端。
conda install -c conda-forge openmpi
python setup.py install
。為了測試我們新安裝的后端,需要進(jìn)行一些修改。
if __name__ == '__main__':
下的內(nèi)容替換為init_process(0, 0, run, backend='mpi')
。mpirun -n 4 python myscript.py
。這些更改的原因是,MPI 需要在生成流程之前創(chuàng)建自己的環(huán)境。 MPI 也將生成自己的進(jìn)程,并執(zhí)行初始化方法中描述的握手,使init_process_group
的rank
和size
參數(shù)多余。 實(shí)際上,這非常強(qiáng)大,因?yàn)槟梢詫⒏郊訁?shù)傳遞給mpirun
,以便為每個進(jìn)程定制計算資源。
(諸如每個進(jìn)程的內(nèi)核數(shù)量,將計算機(jī)手動分配給特定等級,以及 等之類的東西。)這樣做,您應(yīng)該獲得與其他通信后端相同的熟悉輸出。
NCCL 后端
NCCL 后端提供了針對 CUDA 張量的集體操作的優(yōu)化實(shí)現(xiàn)。 如果僅將 CUDA 張量用于集體操作,請考慮使用此后端以獲得最佳性能。 NCCL 后端包含在具有 CUDA 支持的預(yù)構(gòu)建二進(jìn)制文件中。
為了完成本教程,我們來談?wù)勎覀兎Q為的第一個功能:dist.init_process_group(backend, init_method)
。 特別是,我們將介紹負(fù)責(zé)每個過程之間初始協(xié)調(diào)步驟的不同初始化方法。 這些方法使您可以定義協(xié)調(diào)方式。 根據(jù)您的硬件設(shè)置,這些方法之一自然應(yīng)該比其他方法更合適。 除了以下各節(jié)之外,您還應(yīng)該查看官方文檔。
環(huán)境變量
在本教程中,我們一直在使用環(huán)境變量初始化方法。 通過在所有計算機(jī)上設(shè)置以下四個環(huán)境變量,所有進(jìn)程將能夠正確連接到主服務(wù)器,獲取有關(guān)其他進(jìn)程的信息,最后與它們握手。
MASTER_PORT
:計算機(jī)上的空閑端口,它將托管等級為 0 的進(jìn)程。MASTER_ADDR
:將以等級 0 托管進(jìn)程的計算機(jī)的 IP 地址。WORLD_SIZE
:進(jìn)程總數(shù),這樣主機(jī)可以知道要等待多少個工人。RANK
:每個進(jìn)程的等級,因此他們將知道它是否是工人的主人。共享文件系統(tǒng)
共享文件系統(tǒng)要求所有進(jìn)程都有權(quán)訪問共享文件系統(tǒng),并將通過共享文件進(jìn)行協(xié)調(diào)。 這意味著每個進(jìn)程都將打開文件,寫入文件信息,然后等到每個人都打開文件。 之后,所有必需的信息將可用于所有過程。 為了避免爭用情況,文件系統(tǒng)必須通過 fcntl 支持鎖定。
dist.init_process_group(
init_method='file:///mnt/nfs/sharedfile',
rank=args.rank,
world_size=4)
TCP
通過提供等級 0 和可訪問的端口號的進(jìn)程的 IP 地址,可以實(shí)現(xiàn)通過 TCP 進(jìn)行初始化。 在這里,所有工作人員都可以連接到等級為 0 的流程,并交換有關(guān)如何相互聯(lián)系的信息。
dist.init_process_group(
init_method='tcp://10.1.1.20:23456',
rank=args.rank,
world_size=4)
更多建議: