PyTorch 分布式 RPC 框架

2020-09-15 11:21 更新

原文:PyTorch 分布式 RPC 框架

分布式 RPC 框架通過(guò)一組原語(yǔ)提供了用于多機(jī)器模型訓(xùn)練的機(jī)制,以允許進(jìn)行遠(yuǎn)程通信;還提供了高級(jí) API,以自動(dòng)區(qū)分在多臺(tái)機(jī)器之間劃分的模型。

警告

RPC API 是試驗(yàn)性的,隨時(shí)可能更改。

RPC 和 RRef 框架

在使用 RPC 和分布式 autograd 原語(yǔ)之前,必須進(jìn)行初始化。 要初始化 RPC 框架,我們需要使用 init_rpc() 來(lái)初始化 RPC 框架,RRef 框架和分布式 autograd。 默認(rèn)情況下,這還將初始化 <cite>ProcessGroup</cite> (init_process_group())后端,以進(jìn)行 RPC 通信。 <cite>ProcessGroup</cite> 后端在內(nèi)部使用 gloo 進(jìn)行通信。

torch.distributed.rpc.init_rpc(name, backend=BackendType.PROCESS_GROUP, rank=-1, world_size=None, rpc_backend_options=None)?

初始化 RPC 原語(yǔ),例如本地 RPC 代理和分布式 autograd。

初始化本地 RPC 代理,該代理立即使當(dāng)前進(jìn)程準(zhǔn)備好發(fā)送和接收 RPC。 此方法還可以正確初始化使用 gloo 進(jìn)行集體通信的默認(rèn)進(jìn)程組后端。

參數(shù)

  • 后端(枚舉)– RPC 后端實(shí)現(xiàn)的類型。 當(dāng)前,進(jìn)程組后端是唯一可用的后端實(shí)現(xiàn)。 (默認(rèn):RpcBackend.PROCESS_GROUP)。
  • 名稱 (str )–此節(jié)點(diǎn)的全局唯一名稱。 (例如Trainer3,ParameterServer2,MasterWorker1)名稱只能包含數(shù)字,字母,下劃線和/或破折號(hào),并且必須少于 128 個(gè)字符。
  • 等級(jí) (python:int )–此節(jié)點(diǎn)的全局唯一 ID /等級(jí)。
  • world_size (python:int )–組中的工人數(shù)。
  • rpc_backend_options (RpcBackendOptions )–傳遞給 RpcAgent 構(gòu)造函數(shù)的選項(xiàng)。

參考

<cite>RRef</cite> (遠(yuǎn)程引用)是對(duì)遠(yuǎn)程工作人員上某個(gè)類型 <cite>T</cite> (例如<cite>張量</cite>)的值的引用。 該句柄使引用的遠(yuǎn)程值在所有者上保持活動(dòng)狀態(tài),但不暗示該值將來(lái)會(huì)轉(zhuǎn)移給本地工作人員。 通過(guò)保留對(duì)其他工作人員中存在的 nn.Modules 的引用,并在訓(xùn)練期間調(diào)用適當(dāng)?shù)暮瘮?shù)來(lái)檢索或修改其參數(shù),可以將 RRef 用于多機(jī)訓(xùn)練。

class torch.distributed.rpc.RRef?

在遠(yuǎn)程工作器上封裝對(duì)某個(gè)類型的值的引用的類。 該句柄將使引用的遠(yuǎn)程值在工作程序上保持活動(dòng)狀態(tài)。

is_owner(self: torch.distributed.rpc.RRef) → bool?

返回當(dāng)前節(jié)點(diǎn)是否是此RRef的所有者。

local_value(self: torch.distributed.rpc.RRef) → object?

如果當(dāng)前節(jié)點(diǎn)是所有者,則返回對(duì)本地值的引用。 否則,引發(fā)異常。

owner(self: torch.distributed.rpc.RRef) → torch.distributed.rpc.WorkerInfo?

返回?fù)碛写?code>RRef的節(jié)點(diǎn)的工作程序信息。

to_here(self: torch.distributed.rpc.RRef) → object?

將 RRef 的值從所有者復(fù)制到本地節(jié)點(diǎn)并返回它的阻塞調(diào)用。 如果當(dāng)前節(jié)點(diǎn)是所有者,則返回對(duì)本地值的引用。

RPC 和 RRef 原語(yǔ)

該庫(kù)提供了原語(yǔ),允許用戶創(chuàng)建和修改對(duì)遠(yuǎn)程數(shù)據(jù)的引用(RRef)以及遠(yuǎn)程執(zhí)行功能。

torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None)?

進(jìn)行 RPC 阻塞調(diào)用以在 worker to上運(yùn)行函數(shù)func。 RPC 消息的發(fā)送和接收與 Python 代碼的執(zhí)行并行。 此方法是線程安全的。

Parameters

  • (str WorkerInfo )–目標(biāo)工作線程的 ID 或名稱。
  • 函數(shù)(可調(diào)用)–任何可調(diào)用的函數(shù)。 內(nèi)置函數(shù)(例如 torch.add())可以更有效地通過(guò) RPC 發(fā)送。
  • args (元組)– func調(diào)用的參數(shù)元組。
  • kwargs (dict )–是func調(diào)用的關(guān)鍵字參數(shù)的字典。

退貨

返回在argskwargs上運(yùn)行func的結(jié)果。

例:

On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3))
>>> rpc.shutdown()


On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None)?

進(jìn)行非阻塞 RPC 調(diào)用以在 worker to上運(yùn)行函數(shù)func。 RPC 消息的發(fā)送和接收與 Python 代碼的執(zhí)行并行。 此方法是線程安全的。 此方法將立即返回可以等待的torch.distributed.FutureMessage。

Parameters

  • to (str or WorkerInfo) – id or name of the destination worker.
  • func (callable) – any callable function. builtin functions (like torch.add()) can be sent over RPC more efficiently.
  • args (tuple) – the argument tuple for the func invocation.
  • kwargs (dict) – is a dictionary of keyword arguments for the func invocation.

Returns

返回可以等待的torch.distributed.FutureMessage對(duì)象。 完成后,可以從FutureMessage對(duì)象中檢索argskwargsfunc的返回值。

Example:

On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3))
>>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2))
>>> result = fut1.wait() + fut2.wait()
>>> rpc.shutdown()


On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

torch.distributed.rpc.remote(to, func, args=None, kwargs=None)?

進(jìn)行遠(yuǎn)程調(diào)用以在工作線程to上運(yùn)行func,并立即將 RRef 返回到結(jié)果值。 工人to將是返回的 RRef 的所有者,而調(diào)用remote的工人是用戶。 所有者管理其 RRef 的全局引用計(jì)數(shù),而所有者RRef 僅在全局上沒(méi)有活動(dòng)引用時(shí)被銷毀。

Parameters

  • to (str or WorkerInfo) – id or name of the destination worker.
  • 函數(shù)(可調(diào)用)–內(nèi)置函數(shù)(例如 torch.add())。
  • args (tuple) – the argument tuple for the func invocation.
  • kwargs (dict) – is a dictionary of keyword arguments for the func invocation.

Returns

用戶 RRef 實(shí)例到結(jié)果值。 使用阻塞 API torch.distributed.rpc.RRef.to_here() 在本地檢索結(jié)果值。

Example:

On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1))
>>> x = rref1.to_here() + rref2.to_here()
>>> rpc.shutdown()


On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

torch.distributed.rpc.get_worker_info(worker_name=None)?

獲取給定工人名稱的WorkerInfo。 使用此WorkerInfo可以避免在每次調(diào)用時(shí)傳遞昂貴的字符串。

Parameters

worker_name (str )–工人的字符串名稱。 如果None,則返回當(dāng)前工作程序的 ID。 (默認(rèn)None

Returns

如果worker_nameNone,則給定當(dāng)前工作程序的worker_nameWorkerInfoWorkerInfo實(shí)例。

torch.distributed.rpc.shutdown(graceful=True)?

關(guān)閉 RPC 代理,然后銷毀 RPC 代理。 這將阻止本地代理接受未完成的請(qǐng)求,并通過(guò)終止所有 RPC 線程來(lái)關(guān)閉 RPC 框架。 如果 graceful = True,則它將阻塞,直到所有本地和遠(yuǎn)程 RPC 進(jìn)程都到達(dá)此方法并等待所有未完成的工作完成。 否則,如果 graceful = False,則這是本地關(guān)閉,并且它不等待其他 RPC 進(jìn)程到達(dá)此方法。

Parameters

正常 (bool )–是否進(jìn)行正常關(guān)機(jī)。 如果為 True,它將阻塞直到所有本地和遠(yuǎn)程 RPC 進(jìn)程都達(dá)到此方法并等待所有未完成的工作完成。

Example:

On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> # do some work
>>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1))
>>> # ready to shutdown
>>> rpc.shutdown()


On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> # wait for worker 0 to finish work, and then shutdown.
>>> rpc.shutdown()

分布式 Autograd 框架

此模塊提供了一個(gè)基于 RPC 的分布式 autograd 框架,該框架可用于模型并行訓(xùn)練等應(yīng)用程序。 簡(jiǎn)而言之,應(yīng)用程序可以通過(guò) RPC 發(fā)送和接收梯度記錄張量。 在前向傳遞中,我們記錄何時(shí)通過(guò) RPC 發(fā)送梯度記錄張量,而在后向傳遞過(guò)程中,我們使用此信息使用 RPC 執(zhí)行分布式后向傳遞。

class torch.distributed.autograd.context?

使用分布式 autograd 時(shí)要環(huán)繞前進(jìn)和后退傳遞的上下文對(duì)象。 需要with語(yǔ)句中生成的context_id來(lái)唯一標(biāo)識(shí)所有工作程序上的分布式反向傳遞。 每個(gè)工作人員都存儲(chǔ)與此context_id關(guān)聯(lián)的元數(shù)據(jù),這是正確執(zhí)行分布式自動(dòng)求導(dǎo)證件所必需的。

Example:

>> import torch.distributed.autograd as dist_autograd
>> with dist_autograd.context() as context_id:
>>   t1 = torch.rand((3, 3), requires_grad=True)
>>   t2 = torch.rand((3, 3), requires_grad=True)
>>   loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum()
>>   dist_autograd.backward([loss])

torch.distributed.autograd.backward(roots: List[Tensor]) → None?

使用提供的根啟動(dòng)分布式反向傳遞。 當(dāng)前,這實(shí)現(xiàn)了 FAST 模式算法,該算法假設(shè)在反向傳遞過(guò)程中,跨工作程序在同一分布式 autograd 上下文中發(fā)送的所有 RPC 消息將是 autograd 圖的一部分。

我們使用提供的根來(lái)發(fā)現(xiàn) autograd 圖并計(jì)算適當(dāng)?shù)囊蕾囮P(guān)系。 該方法將阻塞,直到完成整個(gè) autograd 計(jì)算。

我們?cè)诿總€(gè)節(jié)點(diǎn)上的適當(dāng) torch.distributed.autograd.context 中累積梯度。 當(dāng)調(diào)用 torch.distributed.autograd.backward() 時(shí),使用的 autograd 上下文是該節(jié)點(diǎn)的當(dāng)前 autograd 上下文。 如果沒(méi)有有效的 autograd 上下文,我們將引發(fā)錯(cuò)誤。 您可以使用 get_gradients() API 檢索累積的梯度。

Parameters

(列表)–代表自動(dòng)梯度計(jì)算根的張量。 所有張量應(yīng)為標(biāo)量。

Example:

>> import torch.distributed.autograd as dist_autograd
>> with dist_autograd.context() as context_id:
>>      pred = model.forward()
>>      loss = loss_func(pred, loss)
>>      dist_autograd.backward(loss)

torch.distributed.autograd.get_gradients(context_id: int) → Dict[Tensor, Tensor]?

從張量檢索映射,以獲取在提供的context_id中作為累積的 autograd 向后傳遞的一部分的張量所對(duì)應(yīng)的張量。

Parameters

context_id (python:int )–我們應(yīng)為其檢索梯度的 autograd 上下文 ID。

Returns

一個(gè)映射,其中鍵是張量,值是該張量的關(guān)聯(lián)漸變。

Example:

>> import torch.distributed.autograd as dist_autograd
>> with dist_autograd.context() as context_id:
>>      t1 = torch.rand((3, 3), requires_grad=True)
>>      t2 = torch.rand((3, 3), requires_grad=True)
>>      loss = t1 + t2
>>      dist_autograd.backward([loss.sum()])
>>      grads = dist_autograd.get_gradients(context_id)
>>      print (grads[t1])
>>      print (grads[t2])

分布式優(yōu)化器

torch.distributed.optim 公開(kāi) DistributedOptimizer,后者獲取遠(yuǎn)程參數(shù)列表 (RRef),并在參數(shù)所在的工作線程中本地運(yùn)行優(yōu)化器。 分布式優(yōu)化器可以使用任何本地優(yōu)化器算法來(lái)將梯度應(yīng)用于每個(gè)工作者。

class torch.distributed.optim.DistributedOptimizer(optimizer_class, params_rref, *args, **kwargs)?

DistributedOptimizer 遠(yuǎn)程引用分散在工作程序中的參數(shù),并為每個(gè)參數(shù)在本地應(yīng)用給定的優(yōu)化器。

此類使用 get_gradients() 來(lái)檢索特定參數(shù)的梯度。

來(lái)自同一客戶端或不同客戶端的對(duì) step() 的并發(fā)調(diào)用將在每個(gè)工作人員上進(jìn)行序列化-因?yàn)槊總€(gè)工作人員的優(yōu)化程序一次只能處理一組漸變。 但是,不能保證完整的前向后向優(yōu)化程序序列將一次為一個(gè)客戶端執(zhí)行。 這意味著所應(yīng)用的漸變可能不對(duì)應(yīng)于在給定工人上執(zhí)行的最新前向通過(guò)。 此外,也不能保證在所有工人之間訂購(gòu)。

Parameters

  • optimizer_class (optim.Optimizer)–在每個(gè) worker 上實(shí)例化的優(yōu)化器的類。
  • params_rref (列表 [ RRef ] )–本地或本地參考的 RRef 列表 遠(yuǎn)程參數(shù)進(jìn)行優(yōu)化。
  • args –傳遞給每個(gè)工作程序上的優(yōu)化器構(gòu)造函數(shù)的參數(shù)。
  • kwargs –傳遞給每個(gè)工作程序上的優(yōu)化器構(gòu)造函數(shù)的參數(shù)。

Example:

>> import torch.distributed.autograd as dist_autograd
>> import torch.distributed.rpc as rpc
>> from torch import optim
>> from torch.distributed.optim import DistributedOptimizer
>>
>> with dist_autograd.context() as context_id:
>>   # Forward pass.
>>   rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>   rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1))
>>   loss = rref1.to_here() + rref2.to_here()
>>
>>   # Backward pass.
>>   dist_autograd.backward([loss.sum()])
>>
>>   # Optimizer.
>>   dist_optim = DistributedOptimizer(
>>      optim.SGD,
>>      [rref1, rref2],
>>      lr=0.05,
>>   )
>>   dist_optim.step()

step()?

執(zhí)行一個(gè)優(yōu)化步驟。

這將在每個(gè)包含要優(yōu)化參數(shù)的工作程序上調(diào)用 torch.optim.Optimizer.step() ,并將阻塞直到所有工作程序返回。 當(dāng)前的分布式 autograd context 將在全球范圍內(nèi)使用。

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)