原文:https://pytorch.org/tutorials/intermediate/rpc_tutorial.html
作者:申力
警告
torch.distributed.rpc 程序包是實驗性的,隨時可能更改。 它還需要 PyTorch 1.4.0+才能運行,因為這是第一個支持 RPC 的版本。
本教程使用兩個簡單的示例來演示如何使用 torch.distributed.rpc 軟件包構(gòu)建分布式訓練,該軟件包首先在 PyTorch v1.4 中作為實驗功能引入。 這兩個示例的源代碼可以在 PyTorch 示例中找到。
先前的教程分布式數(shù)據(jù)并行入門和用 PyTorch 編寫分布式應(yīng)用程序,描述了 DistributedDataParallel ,該模型支持特定的訓練范例,其中模型可以在多個過程中復(fù)制 每個進程都會處理輸入數(shù)據(jù)的拆分。 有時,您可能會遇到需要不同訓練范例的場景。 例如:
torch.distributed.rpc 程序包可以幫助解決上述情況。 在情況 1 中, RPC 和 RRef 允許將數(shù)據(jù)從一個工作程序發(fā)送到另一個工作程序,同時輕松引用遠程數(shù)據(jù)對象。 在情況 2 中,分布式 autograd 和分布式優(yōu)化器使執(zhí)行反向傳遞和優(yōu)化器步驟就像本地訓練一樣。 在接下來的兩節(jié)中,我們將使用強化學習示例和語言模型示例來演示 torch.distributed.rpc 的 API。 請注意,本教程并非旨在構(gòu)建最準確或最有效的模型來解決給定的問題,相反,此處的主要目標是演示如何使用 torch.distributed.rpc 包來構(gòu)建分布式訓練 應(yīng)用程序。
本節(jié)介紹了使用 RPC 建立玩具分布式強化學習模型以解決 OpenAI Gym 中的 CartPole-v1 的步驟。 策略代碼主要是從現(xiàn)有的單線程示例中借用的,如下所示。
我們將跳過Policy
設(shè)計的詳細信息,并將重點介紹 RPC 的用法。
import torch.nn as nn
import torch.nn.functional as F
class Policy(nn.Module):
def __init__(self):
super(Policy, self).__init__()
self.affine1 = nn.Linear(4, 128)
self.dropout = nn.Dropout(p=0.6)
self.affine2 = nn.Linear(128, 2)
self.saved_log_probs = []
self.rewards = []
def forward(self, x):
x = self.affine1(x)
x = self.dropout(x)
x = F.relu(x)
action_scores = self.affine2(x)
return F.softmax(action_scores, dim=1)
首先,讓我們準備一個幫助程序,以在RRef
的所有者工作程序上遠程運行功能。 您將在本教程的示例中的多個地方找到該功能。 理想情況下, <cite>torch.distributed.rpc</cite> 程序包應(yīng)立即提供這些幫助程序功能。 例如,如果應(yīng)用程序可以直接調(diào)用RRef.some_func(*arg)
,然后將其轉(zhuǎn)換為RRef
所有者的 RPC,將會更容易。 在 pytorch / pytorch#31743 中跟蹤了此 API 的進度。
from torch.distributed.rpc import rpc_sync
def _call_method(method, rref, *args, **kwargs):
return method(rref.local_value(), *args, **kwargs)
def _remote_method(method, rref, *args, **kwargs):
args = [method, rref] + list(args)
return rpc_sync(rref.owner(), _call_method, args=args, kwargs=kwargs)
## to call a function on an rref, we could do the following
## _remote_method(some_func, rref, *args)
我們準備介紹觀察員。 在此示例中,每個觀察者創(chuàng)建自己的環(huán)境,并等待代理的命令來運行情節(jié)。 在每個情節(jié)中,一個觀察者最多循環(huán)n_steps
個迭代,并且在每個迭代中,它使用 RPC 將其環(huán)境狀態(tài)傳遞給代理并取回操作。 然后,它將該操作應(yīng)用于其環(huán)境,并從環(huán)境中獲取獎勵和下一個狀態(tài)。 之后,觀察者使用另一個 RPC 向代理報告獎勵。 同樣,請注意,這顯然不是最有效的觀察者實現(xiàn)。 例如,一個簡單的優(yōu)化可能是將當前狀態(tài)和最后的報酬打包到一個 RPC 中,以減少通信開銷。 但是,目標是演示
RPC API,而不是為 CartPole 構(gòu)建最佳的求解器。 因此,在此示例中,讓邏輯保持簡單,并明確兩個步驟。
import argparse
import gym
import torch.distributed.rpc as rpc
parser = argparse.ArgumentParser(
description="RPC Reinforcement Learning Example",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument('--world_size', default=2, help='Number of workers')
parser.add_argument('--log_interval', default=1, help='Log every log_interval episodes')
parser.add_argument('--gamma', default=0.1, help='how much to value future rewards')
parser.add_argument('--seed', default=1, help='random seed for reproducibility')
args = parser.parse_args()
class Observer:
def __init__(self):
self.id = rpc.get_worker_info().id
self.env = gym.make('CartPole-v1')
self.env.seed(args.seed)
def run_episode(self, agent_rref, n_steps):
state, ep_reward = self.env.reset(), 0
for step in range(n_steps):
# send the state to the agent to get an action
action = _remote_method(Agent.select_action, agent_rref, self.id, state)
# apply the action to the environment, and get the reward
state, reward, done, _ = self.env.step(action)
# report the reward to the agent for training purpose
_remote_method(Agent.report_reward, agent_rref, self.id, reward)
if done:
break
agent 的代碼稍微復(fù)雜一點,我們將其分為多部分。 在此示例中,代理既充當訓練者又充當主人,因此它向多個分布式觀察者發(fā)送命令以運行情節(jié),并且還記錄所有本地行為和獎勵,這些行為和獎賞將在每個情節(jié)之后的訓練階段中使用。 下面的代碼顯示了Agent
構(gòu)造函數(shù),其中大多數(shù)行都在初始化各種組件。 最后的循環(huán)在其他工作者上遠程初始化觀察者,并在本地將RRefs
保留給這些觀察者。 代理稍后將使用那些觀察者RRefs
發(fā)送命令。 應(yīng)用程序無需擔心RRefs
的壽命。
每個RRef
的所有者維護一個參考計數(shù)圖以跟蹤其生命周期,并保證只要該RRef
的任何活動用戶都不會刪除遠程數(shù)據(jù)對象。 有關(guān)詳細信息,請參考RRef
設(shè)計文檔。
import gym
import numpy as np
import torch
import torch.distributed.rpc as rpc
import torch.optim as optim
from torch.distributed.rpc import RRef, rpc_async, remote
from torch.distributions import Categorical
class Agent:
def __init__(self, world_size):
self.ob_rrefs = []
self.agent_rref = RRef(self)
self.rewards = {}
self.saved_log_probs = {}
self.policy = Policy()
self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
self.eps = np.finfo(np.float32).eps.item()
self.running_reward = 0
self.reward_threshold = gym.make('CartPole-v1').spec.reward_threshold
for ob_rank in range(1, world_size):
ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
self.ob_rrefs.append(remote(ob_info, Observer))
self.rewards[ob_info.id] = []
self.saved_log_probs[ob_info.id] = []
接下來,代理向觀察者公開兩個 API,以供他們選擇動作和報告獎勵。 這些功能僅在代理上本地運行,但是將由觀察者通過 RPC 觸發(fā)。
class Agent:
...
def select_action(self, ob_id, state):
state = torch.from_numpy(state).float().unsqueeze(0)
probs = self.policy(state)
m = Categorical(probs)
action = m.sample()
self.saved_log_probs[ob_id].append(m.log_prob(action))
return action.item()
def report_reward(self, ob_id, reward):
self.rewards[ob_id].append(reward)
讓我們在代理上添加run_episode
函數(shù),該函數(shù)告訴所有觀察者執(zhí)行片段。 在此函數(shù)中,它首先創(chuàng)建一個列表,以從異步 RPC 收集期貨,然后在所有觀察者RRefs
上循環(huán)以生成異步 RPC。 在這些 RPC 中,代理還將自身的RRef
傳遞給觀察者,以便觀察者也可以在代理上調(diào)用函數(shù)。 如上所示,每個觀察者都將 RPC 返回給代理,它們是嵌套的 RPC。 在每個情節(jié)之后,saved_log_probs
和rewards
將包含記錄的動作概率和獎勵。
class Agent:
...
def run_episode(self, n_steps=0):
futs = []
for ob_rref in self.ob_rrefs:
# make async RPC to kick off an episode on all observers
futs.append(
rpc_async(
ob_rref.owner(),
_call_method,
args=(Observer.run_episode, ob_rref, self.agent_rref, n_steps)
)
)
# wait until all obervers have finished this episode
for fut in futs:
fut.wait()
最后,在一集之后,代理需要訓練模型,該模型在下面的finish_episode
函數(shù)中實現(xiàn)。 此函數(shù)中沒有 RPC,并且大多數(shù)是從單線程示例中借用的。 因此,我們跳過描述其內(nèi)容。
class Agent:
...
def finish_episode(self):
# joins probs and rewards from different observers into lists
R, probs, rewards = 0, [], []
for ob_id in self.rewards:
probs.extend(self.saved_log_probs[ob_id])
rewards.extend(self.rewards[ob_id])
# use the minimum observer reward to calculate the running reward
min_reward = min([sum(self.rewards[ob_id]) for ob_id in self.rewards])
self.running_reward = 0.05 * min_reward + (1 - 0.05) * self.running_reward
# clear saved probs and rewards
for ob_id in self.rewards:
self.rewards[ob_id] = []
self.saved_log_probs[ob_id] = []
policy_loss, returns = [], []
for r in rewards[::-1]:
R = r + args.gamma * R
returns.insert(0, R)
returns = torch.tensor(returns)
returns = (returns - returns.mean()) / (returns.std() + self.eps)
for log_prob, R in zip(probs, returns):
policy_loss.append(-log_prob * R)
self.optimizer.zero_grad()
policy_loss = torch.cat(policy_loss).sum()
policy_loss.backward()
self.optimizer.step()
return min_reward
使用Policy
,Observer
和Agent
類,我們準備啟動多個進程來執(zhí)行分布式訓練。 在此示例中,所有進程都運行相同的run_worker
函數(shù),并且它們使用等級來區(qū)分其角色。 等級 0 始終是代理,其他所有等級都是觀察者。 代理通過重復(fù)調(diào)用run_episode
和finish_episode
充當主控,直到運行的獎勵超過環(huán)境指定的獎勵閾值為止。
所有觀察者都被動地等待來自代理的命令。 該代碼由 rpc.init_rpc 和 rpc.shutdown 包裝,它們分別初始化和終止
RPC 實例。 API 頁面中提供了更多詳細信息。
import os
from itertools import count
import torch.multiprocessing as mp
AGENT_NAME = "agent"
OBSERVER_NAME="obs"
TOTAL_EPISODE_STEP = 100
def run_worker(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
if rank == 0:
# rank0 is the agent
rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)
agent = Agent(world_size)
for i_episode in count(1):
n_steps = int(TOTAL_EPISODE_STEP / (args.world_size - 1))
agent.run_episode(n_steps=n_steps)
last_reward = agent.finish_episode()
if i_episode % args.log_interval == 0:
print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
i_episode, last_reward, agent.running_reward))
if agent.running_reward > agent.reward_threshold:
print("Solved! Running reward is now {}!".format(agent.running_reward))
break
else:
# other ranks are the observer
rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
# observers passively waiting for instructions from the agent
# block until all rpcs finish, and shutdown the RPC instance
rpc.shutdown()
mp.spawn(
run_worker,
args=(args.world_size, ),
nprocs=args.world_size,
join=True
)
以下是使用 <cite>world_size = 2</cite> 進行訓練時的一些示例輸出。
Episode 10 Last reward: 26.00 Average reward: 10.01
Episode 20 Last reward: 16.00 Average reward: 11.27
Episode 30 Last reward: 49.00 Average reward: 18.62
Episode 40 Last reward: 45.00 Average reward: 26.09
Episode 50 Last reward: 44.00 Average reward: 30.03
Episode 60 Last reward: 111.00 Average reward: 42.23
Episode 70 Last reward: 131.00 Average reward: 70.11
Episode 80 Last reward: 87.00 Average reward: 76.51
Episode 90 Last reward: 86.00 Average reward: 95.93
Episode 100 Last reward: 13.00 Average reward: 123.93
Episode 110 Last reward: 33.00 Average reward: 91.39
Episode 120 Last reward: 73.00 Average reward: 76.38
Episode 130 Last reward: 137.00 Average reward: 88.08
Episode 140 Last reward: 89.00 Average reward: 104.96
Episode 150 Last reward: 97.00 Average reward: 98.74
Episode 160 Last reward: 150.00 Average reward: 100.87
Episode 170 Last reward: 126.00 Average reward: 104.38
Episode 180 Last reward: 500.00 Average reward: 213.74
Episode 190 Last reward: 322.00 Average reward: 300.22
Episode 200 Last reward: 165.00 Average reward: 272.71
Episode 210 Last reward: 168.00 Average reward: 233.11
Episode 220 Last reward: 184.00 Average reward: 195.02
Episode 230 Last reward: 284.00 Average reward: 208.32
Episode 240 Last reward: 395.00 Average reward: 247.37
Episode 250 Last reward: 500.00 Average reward: 335.42
Episode 260 Last reward: 500.00 Average reward: 386.30
Episode 270 Last reward: 500.00 Average reward: 405.29
Episode 280 Last reward: 500.00 Average reward: 443.29
Episode 290 Last reward: 500.00 Average reward: 464.65
Solved! Running reward is now 475.3163778435275!
在此示例中,我們展示了如何使用 RPC 作為通信工具來跨工作人員傳遞數(shù)據(jù),以及如何使用 RRef 引用遠程對象。 的確,您可以直接在ProcessGroup
send
和recv
API 之上構(gòu)建整個結(jié)構(gòu),也可以使用其他通信/ RPC 庫。 但是,通過使用 <cite>torch.distributed.rpc</cite> ,您可以在后臺獲得本機支持并不斷優(yōu)化性能。
接下來,我們將展示如何將 RPC 和 RRef 與分布式 autograd 和分布式優(yōu)化器結(jié)合起來執(zhí)行分布式模型并行訓練。
在本節(jié)中,我們將使用 RNN 模型來展示如何使用 RPC API 構(gòu)建分布式模型并行訓練。 示例 RNN 模型非常小,可以輕松地放入單個 GPU 中,但是我們?nèi)詫⑵鋵觿澐譃閮蓚€不同的工作人員來演示這一想法。 開發(fā)人員可以應(yīng)用類似的技術(shù)在多個設(shè)備和機器上分布更大的模型。
RNN 模型設(shè)計是從 PyTorch 示例存儲庫中的詞語言模型中借用的,該存儲庫包含三個主要組件,一個嵌入表,一個LSTM
層和一個解碼器。 下面的代碼將嵌入表和解碼器包裝到子模塊中,以便它們的構(gòu)造函數(shù)可以傳遞給 RPC API。 在EmbeddingTable
子模塊中,我們有意將Embedding
層放在
GPU 上以涵蓋用例。 在 v1.4 中,RPC 始終在目標工作線程上創(chuàng)建 CPU 張量參數(shù)或返回值。 如果函數(shù)使用 GPU 張量,則需要將其顯式移動到適當?shù)脑O(shè)備。
class EmbeddingTable(nn.Module):
r"""
Encoding layers of the RNNModel
"""
def __init__(self, ntoken, ninp, dropout):
super(EmbeddingTable, self).__init__()
self.drop = nn.Dropout(dropout)
self.encoder = nn.Embedding(ntoken, ninp).cuda()
self.encoder.weight.data.uniform_(-0.1, 0.1)
def forward(self, input):
return self.drop(self.encoder(input.cuda()).cpu()
class Decoder(nn.Module):
def __init__(self, ntoken, nhid, dropout):
super(Decoder, self).__init__()
self.drop = nn.Dropout(dropout)
self.decoder = nn.Linear(nhid, ntoken)
self.decoder.bias.data.zero_()
self.decoder.weight.data.uniform_(-0.1, 0.1)
def forward(self, output):
return self.decoder(self.drop(output))
使用上述子模塊,我們現(xiàn)在可以使用 RPC 將它們組合在一起以創(chuàng)建 RNN 模型。 在下面的代碼中,ps
代表參數(shù)服務(wù)器,該服務(wù)器托管嵌入表和解碼器的參數(shù)。 構(gòu)造函數(shù)使用遠程 API 在參數(shù)服務(wù)器上創(chuàng)建EmbeddingTable
對象和Decoder
對象,并在本地創(chuàng)建LSTM子模塊。
在正向傳遞過程中,訓練師使用EmbeddingTable
RRef
查找遠程子模塊,然后使用 RPC 將輸入數(shù)據(jù)傳遞到EmbeddingTable
,并獲取查找結(jié)果。 然后,它通過本地LSTM
層運行嵌入,最后使用另一個 RPC 將輸出發(fā)送到Decoder
子模塊。 通常,要實施分布式模型并行訓練,開發(fā)人員可以將模型劃分為子模塊,調(diào)用 RPC 遠程創(chuàng)建子模塊實例,并在必要時使用RRef
查找它們。
如下面的代碼所示,它看起來與單機模型并行訓練非常相似。 主要區(qū)別是用 RPC 功能替換了Tensor.to(device)
。
class RNNModel(nn.Module):
def __init__(self, ps, ntoken, ninp, nhid, nlayers, dropout=0.5):
super(RNNModel, self).__init__()
# setup embedding table remotely
self.emb_table_rref = rpc.remote(ps, EmbeddingTable, args=(ntoken, ninp, dropout))
# setup LSTM locally
self.rnn = nn.LSTM(ninp, nhid, nlayers, dropout=dropout)
# setup decoder remotely
self.decoder_rref = rpc.remote(ps, Decoder, args=(ntoken, nhid, dropout))
def forward(self, input, hidden):
# pass input to the remote embedding table and fetch emb tensor back
emb = _remote_method(EmbeddingTable.forward, self.emb_table_rref, input)
output, hidden = self.rnn(emb, hidden)
# pass output to the rremote decoder and get the decoded output back
decoded = _remote_method(Decoder.forward, self.decoder_rref, output)
return decoded, hidden
在介紹分布式優(yōu)化器之前,讓我們添加一個輔助函數(shù)來生成模型參數(shù)的 RRef 列表,這些列表將由分布式優(yōu)化器使用。 在本地訓練中,應(yīng)用程序可以調(diào)用Module.parameters()
來獲取對所有參數(shù)張量的引用,并將其傳遞給本地優(yōu)化器以進行后續(xù)更新。 但是,由于某些參數(shù)存在于遠程計算機上,因此同一 API 在分布式訓練方案中不起作用。 因此,分布式優(yōu)化器不采用參數(shù)Tensors
的列表,而是采用RRefs
的列表,對于本地和遠程模型參數(shù),每個模型參數(shù)一個RRef
。
輔助函數(shù)非常簡單,只需調(diào)用Module.parameters()
并在每個參數(shù)上創(chuàng)建一個本地RRef。
def _parameter_rrefs(module):
param_rrefs = []
for param in module.parameters():
param_rrefs.append(RRef(param))
return param_rrefs
然后,由于RNNModel
包含三個子模塊,因此我們需要調(diào)用_parameter_rrefs
三次,并將其包裝到另一個輔助函數(shù)中。
class RNNModel(nn.Module):
...
def parameter_rrefs(self):
remote_params = []
# get RRefs of embedding table
remote_params.extend(_remote_method(_parameter_rrefs, self.emb_table_rref))
# create RRefs for local parameters
remote_params.extend(_parameter_rrefs(self.rnn))
# get RRefs of decoder
remote_params.extend(_remote_method(_parameter_rrefs, self.decoder_rref))
return remote_params
現(xiàn)在,我們準備實施訓練循環(huán)。 初始化模型參數(shù)后,我們創(chuàng)建RNNModel
和DistributedOptimizer
。 分布式優(yōu)化器將采用參數(shù)RRefs的列表,查找所有不同的所有者工作器,并在每個所有者工作器上創(chuàng)建給定的本地優(yōu)化器(即,在這種情況下,您也可以使用其他本地優(yōu)化器SGD
) 使用給定的參數(shù)(即lr=0.05
)。
在訓練循環(huán)中,它首先創(chuàng)建一個分布式 autograd 上下文,這將幫助分布式 autograd 引擎查找漸變和涉及的 RPC 發(fā)送/接收功能。 分布式 autograd 引擎的設(shè)計詳細信息可以在其設(shè)計說明中找到。 然后,它像本地模型一樣開始前進,并運行分布式后退。 對于后向分布,您只需要指定一個根列表,在這種情況下,就是損失Tensor
。
分布式 autograd 引擎將自動遍歷分布式圖形并正確編寫漸變。 接下來,它在分布式優(yōu)化器上運行step
函數(shù),該函數(shù)將與所有涉及的本地優(yōu)化器聯(lián)系以更新模型參數(shù)。 與本地訓練相比,一個較小的差異是您不需要運行zero_grad()
,因為每個 autograd 上下文都有專用的空間來存儲梯度,并且在每次迭代創(chuàng)建上下文時,來自不同迭代的那些梯度不會累積到 同一組Tensors
。
def run_trainer():
batch = 5
ntoken = 10
ninp = 2
nhid = 3
nindices = 3
nlayers = 4
hidden = (
torch.randn(nlayers, nindices, nhid),
torch.randn(nlayers, nindices, nhid)
)
model = rnn.RNNModel('ps', ntoken, ninp, nhid, nlayers)
# setup distributed optimizer
opt = DistributedOptimizer(
optim.SGD,
model.parameter_rrefs(),
lr=0.05,
)
criterion = torch.nn.CrossEntropyLoss()
def get_next_batch():
for _ in range(5):
data = torch.LongTensor(batch, nindices) % ntoken
target = torch.LongTensor(batch, ntoken) % nindices
yield data, target
# train for 10 iterations
for epoch in range(10):
# create distributed autograd context
for data, target in get_next_batch():
with dist_autograd.context():
hidden[0].detach_()
hidden[1].detach_()
output, hidden = model(data, hidden)
loss = criterion(output, target)
# run distributed backward pass
dist_autograd.backward([loss])
# run distributed optimizer
opt.step()
# not necessary to zero grads as each iteration creates a different
# distributed autograd context which hosts different grads
print("Training epoch {}".format(epoch))
最后,讓我們添加一些粘合代碼以啟動參數(shù)服務(wù)器和訓練師流程。
def run_worker(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
if rank == 1:
rpc.init_rpc("trainer", rank=rank, world_size=world_size)
_run_trainer()
else:
rpc.init_rpc("ps", rank=rank, world_size=world_size)
# parameter server do nothing
pass
# block until all rpcs finish
rpc.shutdown()
if __name__=="__main__":
world_size = 2
mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)
更多建議: