PyTorch 分布式 RPC 框架

2020-09-15 11:21 更新

原文:PyTorch 分布式 RPC 框架

分布式 RPC 框架通過一組原語提供了用于多機器模型訓練的機制,以允許進行遠程通信;還提供了高級 API,以自動區(qū)分在多臺機器之間劃分的模型。

警告

RPC API 是試驗性的,隨時可能更改。

RPC 和 RRef 框架

在使用 RPC 和分布式 autograd 原語之前,必須進行初始化。 要初始化 RPC 框架,我們需要使用 init_rpc() 來初始化 RPC 框架,RRef 框架和分布式 autograd。 默認情況下,這還將初始化 <cite>ProcessGroup</cite> (init_process_group())后端,以進行 RPC 通信。 <cite>ProcessGroup</cite> 后端在內(nèi)部使用 gloo 進行通信。

  1. torch.distributed.rpc.init_rpc(name, backend=BackendType.PROCESS_GROUP, rank=-1, world_size=None, rpc_backend_options=None)?

初始化 RPC 原語,例如本地 RPC 代理和分布式 autograd。

初始化本地 RPC 代理,該代理立即使當前進程準備好發(fā)送和接收 RPC。 此方法還可以正確初始化使用 gloo 進行集體通信的默認進程組后端。

參數(shù)

  • 后端(枚舉)– RPC 后端實現(xiàn)的類型。 當前,進程組后端是唯一可用的后端實現(xiàn)。 (默認:RpcBackend.PROCESS_GROUP)。
  • 名稱 (str )–此節(jié)點的全局唯一名稱。 (例如Trainer3ParameterServer2,MasterWorker1)名稱只能包含數(shù)字,字母,下劃線和/或破折號,并且必須少于 128 個字符。
  • 等級 (python:int )–此節(jié)點的全局唯一 ID /等級。
  • world_size (python:int )–組中的工人數(shù)。
  • rpc_backend_options (RpcBackendOptions )–傳遞給 RpcAgent 構(gòu)造函數(shù)的選項。

參考

<cite>RRef</cite> (遠程引用)是對遠程工作人員上某個類型 <cite>T</cite> (例如<cite>張量</cite>)的值的引用。 該句柄使引用的遠程值在所有者上保持活動狀態(tài),但不暗示該值將來會轉(zhuǎn)移給本地工作人員。 通過保留對其他工作人員中存在的 nn.Modules 的引用,并在訓練期間調(diào)用適當?shù)暮瘮?shù)來檢索或修改其參數(shù),可以將 RRef 用于多機訓練。

  1. class torch.distributed.rpc.RRef?

在遠程工作器上封裝對某個類型的值的引用的類。 該句柄將使引用的遠程值在工作程序上保持活動狀態(tài)。

  1. is_owner(self: torch.distributed.rpc.RRef) bool?

返回當前節(jié)點是否是此RRef的所有者。

  1. local_value(self: torch.distributed.rpc.RRef) object?

如果當前節(jié)點是所有者,則返回對本地值的引用。 否則,引發(fā)異常。

  1. owner(self: torch.distributed.rpc.RRef) torch.distributed.rpc.WorkerInfo?

返回擁有此RRef的節(jié)點的工作程序信息。

  1. to_here(self: torch.distributed.rpc.RRef) object?

將 RRef 的值從所有者復(fù)制到本地節(jié)點并返回它的阻塞調(diào)用。 如果當前節(jié)點是所有者,則返回對本地值的引用。

RPC 和 RRef 原語

該庫提供了原語,允許用戶創(chuàng)建和修改對遠程數(shù)據(jù)的引用(RRef)以及遠程執(zhí)行功能。

  1. torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None)?

進行 RPC 阻塞調(diào)用以在 worker to上運行函數(shù)func。 RPC 消息的發(fā)送和接收與 Python 代碼的執(zhí)行并行。 此方法是線程安全的。

Parameters

  • (str WorkerInfo )–目標工作線程的 ID 或名稱。
  • 函數(shù)(可調(diào)用)–任何可調(diào)用的函數(shù)。 內(nèi)置函數(shù)(例如 torch.add())可以更有效地通過 RPC 發(fā)送。
  • args (元組)– func調(diào)用的參數(shù)元組。
  • kwargs (dict )–是func調(diào)用的關(guān)鍵字參數(shù)的字典。

退貨

返回在argskwargs上運行func的結(jié)果。

例:

  1. On worker 0:
  2. >>> import torch.distributed.rpc as rpc
  3. >>> rpc.init_rpc("worker0", rank=0, world_size=2)
  4. >>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3))
  5. >>> rpc.shutdown()
  6. On worker 1:
  7. >>> import torch.distributed.rpc as rpc
  8. >>> rpc.init_rpc("worker1", rank=1, world_size=2)
  9. >>> rpc.shutdown()

  1. torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None)?

進行非阻塞 RPC 調(diào)用以在 worker to上運行函數(shù)func。 RPC 消息的發(fā)送和接收與 Python 代碼的執(zhí)行并行。 此方法是線程安全的。 此方法將立即返回可以等待的torch.distributed.FutureMessage

Parameters

  • to (str or WorkerInfo) – id or name of the destination worker.
  • func (callable) – any callable function. builtin functions (like torch.add()) can be sent over RPC more efficiently.
  • args (tuple) – the argument tuple for the func invocation.
  • kwargs (dict) – is a dictionary of keyword arguments for the func invocation.

Returns

返回可以等待的torch.distributed.FutureMessage對象。 完成后,可以從FutureMessage對象中檢索argskwargsfunc的返回值。

Example:

  1. On worker 0:
  2. >>> import torch.distributed.rpc as rpc
  3. >>> rpc.init_rpc("worker0", rank=0, world_size=2)
  4. >>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3))
  5. >>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2))
  6. >>> result = fut1.wait() + fut2.wait()
  7. >>> rpc.shutdown()
  8. On worker 1:
  9. >>> import torch.distributed.rpc as rpc
  10. >>> rpc.init_rpc("worker1", rank=1, world_size=2)
  11. >>> rpc.shutdown()

  1. torch.distributed.rpc.remote(to, func, args=None, kwargs=None)?

進行遠程調(diào)用以在工作線程to上運行func,并立即將 RRef 返回到結(jié)果值。 工人to將是返回的 RRef 的所有者,而調(diào)用remote的工人是用戶。 所有者管理其 RRef 的全局引用計數(shù),而所有者RRef 僅在全局上沒有活動引用時被銷毀。

Parameters

  • to (str or WorkerInfo) – id or name of the destination worker.
  • 函數(shù)(可調(diào)用)–內(nèi)置函數(shù)(例如 torch.add())。
  • args (tuple) – the argument tuple for the func invocation.
  • kwargs (dict) – is a dictionary of keyword arguments for the func invocation.

Returns

用戶 RRef 實例到結(jié)果值。 使用阻塞 API torch.distributed.rpc.RRef.to_here() 在本地檢索結(jié)果值。

Example:

  1. On worker 0:
  2. >>> import torch.distributed.rpc as rpc
  3. >>> rpc.init_rpc("worker0", rank=0, world_size=2)
  4. >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
  5. >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1))
  6. >>> x = rref1.to_here() + rref2.to_here()
  7. >>> rpc.shutdown()
  8. On worker 1:
  9. >>> import torch.distributed.rpc as rpc
  10. >>> rpc.init_rpc("worker1", rank=1, world_size=2)
  11. >>> rpc.shutdown()

  1. torch.distributed.rpc.get_worker_info(worker_name=None)?

獲取給定工人名稱的WorkerInfo。 使用此WorkerInfo可以避免在每次調(diào)用時傳遞昂貴的字符串。

Parameters

worker_name (str )–工人的字符串名稱。 如果None,則返回當前工作程序的 ID。 (默認None

Returns

如果worker_nameNone,則給定當前工作程序的worker_nameWorkerInfoWorkerInfo實例。

  1. torch.distributed.rpc.shutdown(graceful=True)?

關(guān)閉 RPC 代理,然后銷毀 RPC 代理。 這將阻止本地代理接受未完成的請求,并通過終止所有 RPC 線程來關(guān)閉 RPC 框架。 如果 graceful = True,則它將阻塞,直到所有本地和遠程 RPC 進程都到達此方法并等待所有未完成的工作完成。 否則,如果 graceful = False,則這是本地關(guān)閉,并且它不等待其他 RPC 進程到達此方法。

Parameters

正常 (bool )–是否進行正常關(guān)機。 如果為 True,它將阻塞直到所有本地和遠程 RPC 進程都達到此方法并等待所有未完成的工作完成。

Example:

  1. On worker 0:
  2. >>> import torch.distributed.rpc as rpc
  3. >>> rpc.init_rpc("worker0", rank=0, world_size=2)
  4. >>> # do some work
  5. >>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1))
  6. >>> # ready to shutdown
  7. >>> rpc.shutdown()
  8. On worker 1:
  9. >>> import torch.distributed.rpc as rpc
  10. >>> rpc.init_rpc("worker1", rank=1, world_size=2)
  11. >>> # wait for worker 0 to finish work, and then shutdown.
  12. >>> rpc.shutdown()

分布式 Autograd 框架

此模塊提供了一個基于 RPC 的分布式 autograd 框架,該框架可用于模型并行訓練等應(yīng)用程序。 簡而言之,應(yīng)用程序可以通過 RPC 發(fā)送和接收梯度記錄張量。 在前向傳遞中,我們記錄何時通過 RPC 發(fā)送梯度記錄張量,而在后向傳遞過程中,我們使用此信息使用 RPC 執(zhí)行分布式后向傳遞。

  1. class torch.distributed.autograd.context?

使用分布式 autograd 時要環(huán)繞前進和后退傳遞的上下文對象。 需要with語句中生成的context_id來唯一標識所有工作程序上的分布式反向傳遞。 每個工作人員都存儲與此context_id關(guān)聯(lián)的元數(shù)據(jù),這是正確執(zhí)行分布式自動求導證件所必需的。

Example:

  1. >> import torch.distributed.autograd as dist_autograd
  2. >> with dist_autograd.context() as context_id:
  3. >> t1 = torch.rand((3, 3), requires_grad=True)
  4. >> t2 = torch.rand((3, 3), requires_grad=True)
  5. >> loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum()
  6. >> dist_autograd.backward([loss])

  1. torch.distributed.autograd.backward(roots: List[Tensor]) None?

使用提供的根啟動分布式反向傳遞。 當前,這實現(xiàn)了 FAST 模式算法,該算法假設(shè)在反向傳遞過程中,跨工作程序在同一分布式 autograd 上下文中發(fā)送的所有 RPC 消息將是 autograd 圖的一部分。

我們使用提供的根來發(fā)現(xiàn) autograd 圖并計算適當?shù)囊蕾囮P(guān)系。 該方法將阻塞,直到完成整個 autograd 計算。

我們在每個節(jié)點上的適當 torch.distributed.autograd.context 中累積梯度。 當調(diào)用 torch.distributed.autograd.backward() 時,使用的 autograd 上下文是該節(jié)點的當前 autograd 上下文。 如果沒有有效的 autograd 上下文,我們將引發(fā)錯誤。 您可以使用 get_gradients() API 檢索累積的梯度。

Parameters

(列表)–代表自動梯度計算根的張量。 所有張量應(yīng)為標量。

Example:

  1. >> import torch.distributed.autograd as dist_autograd
  2. >> with dist_autograd.context() as context_id:
  3. >> pred = model.forward()
  4. >> loss = loss_func(pred, loss)
  5. >> dist_autograd.backward(loss)

  1. torch.distributed.autograd.get_gradients(context_id: int) Dict[Tensor, Tensor]?

從張量檢索映射,以獲取在提供的context_id中作為累積的 autograd 向后傳遞的一部分的張量所對應(yīng)的張量。

Parameters

context_id (python:int )–我們應(yīng)為其檢索梯度的 autograd 上下文 ID。

Returns

一個映射,其中鍵是張量,值是該張量的關(guān)聯(lián)漸變。

Example:

  1. >> import torch.distributed.autograd as dist_autograd
  2. >> with dist_autograd.context() as context_id:
  3. >> t1 = torch.rand((3, 3), requires_grad=True)
  4. >> t2 = torch.rand((3, 3), requires_grad=True)
  5. >> loss = t1 + t2
  6. >> dist_autograd.backward([loss.sum()])
  7. >> grads = dist_autograd.get_gradients(context_id)
  8. >> print (grads[t1])
  9. >> print (grads[t2])

分布式優(yōu)化器

torch.distributed.optim 公開 DistributedOptimizer,后者獲取遠程參數(shù)列表 (RRef),并在參數(shù)所在的工作線程中本地運行優(yōu)化器。 分布式優(yōu)化器可以使用任何本地優(yōu)化器算法來將梯度應(yīng)用于每個工作者。

  1. class torch.distributed.optim.DistributedOptimizer(optimizer_class, params_rref, *args, **kwargs)?

DistributedOptimizer 遠程引用分散在工作程序中的參數(shù),并為每個參數(shù)在本地應(yīng)用給定的優(yōu)化器。

此類使用 get_gradients() 來檢索特定參數(shù)的梯度。

來自同一客戶端或不同客戶端的對 step() 的并發(fā)調(diào)用將在每個工作人員上進行序列化-因為每個工作人員的優(yōu)化程序一次只能處理一組漸變。 但是,不能保證完整的前向后向優(yōu)化程序序列將一次為一個客戶端執(zhí)行。 這意味著所應(yīng)用的漸變可能不對應(yīng)于在給定工人上執(zhí)行的最新前向通過。 此外,也不能保證在所有工人之間訂購。

Parameters

  • optimizer_class (optim.Optimizer)–在每個 worker 上實例化的優(yōu)化器的類。
  • params_rref (列表 [ RRef ] )–本地或本地參考的 RRef 列表 遠程參數(shù)進行優(yōu)化。
  • args –傳遞給每個工作程序上的優(yōu)化器構(gòu)造函數(shù)的參數(shù)。
  • kwargs –傳遞給每個工作程序上的優(yōu)化器構(gòu)造函數(shù)的參數(shù)。

Example:

  1. >> import torch.distributed.autograd as dist_autograd
  2. >> import torch.distributed.rpc as rpc
  3. >> from torch import optim
  4. >> from torch.distributed.optim import DistributedOptimizer
  5. >>
  6. >> with dist_autograd.context() as context_id:
  7. >> # Forward pass.
  8. >> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
  9. >> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1))
  10. >> loss = rref1.to_here() + rref2.to_here()
  11. >>
  12. >> # Backward pass.
  13. >> dist_autograd.backward([loss.sum()])
  14. >>
  15. >> # Optimizer.
  16. >> dist_optim = DistributedOptimizer(
  17. >> optim.SGD,
  18. >> [rref1, rref2],
  19. >> lr=0.05,
  20. >> )
  21. >> dist_optim.step()

  1. step()?

執(zhí)行一個優(yōu)化步驟。

這將在每個包含要優(yōu)化參數(shù)的工作程序上調(diào)用 torch.optim.Optimizer.step() ,并將阻塞直到所有工作程序返回。 當前的分布式 autograd context 將在全球范圍內(nèi)使用。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號