在前面的多進(jìn)程模型章節(jié)中,我們詳細(xì)講述了框架的多進(jìn)程模型,其中適合使用 Agent 進(jìn)程的有一類常見的場景:一些中間件客戶端需要和服務(wù)器建立長連接,理論上一臺服務(wù)器最好只建立一個(gè)長連接,但多進(jìn)程模型會導(dǎo)致 n 倍(n = Worker 進(jìn)程數(shù))連接被創(chuàng)建。
+--------+ +--------+ | Client | | Client | ... n +--------+ +--------+ | \ / | | \ / | n * m 個(gè)鏈接 | / \ | | / \ | +--------+ +--------+ | Server | | Server | ... m +--------+ +--------+
|
為了盡可能的復(fù)用長連接(因?yàn)樗鼈儗τ诜?wù)端來說是非常寶貴的資源),我們會把它放到 Agent 進(jìn)程里維護(hù),然后通過 messenger 將數(shù)據(jù)傳遞給各個(gè) Worker。這種做法是可行的,但是往往需要寫大量代碼去封裝接口和實(shí)現(xiàn)數(shù)據(jù)的傳遞,非常麻煩。
另外,通過 messenger 傳遞數(shù)據(jù)效率是比較低的,因?yàn)樗鼤ㄟ^ Master 來做中轉(zhuǎn);萬一 IPC 通道出現(xiàn)問題還可能將 Master 進(jìn)程搞掛。
那么有沒有更好的方法呢?答案是肯定的,我們提供一種新的模式來降低這類客戶端封裝的復(fù)雜度。通過建立 Agent 和 Worker 的 socket 直連跳過 Master 的中轉(zhuǎn)。Agent 作為對外的門面維持多個(gè) Worker 進(jìn)程的共享連接。
核心思想
- 受到 Leader/Follower 模式的啟發(fā)。
- 客戶端會被區(qū)分為兩種角色:Leader: 負(fù)責(zé)和遠(yuǎn)程服務(wù)端維持連接,對于同一類的客戶端只有一個(gè) Leader。Follower: 會將具體的操作委托給 Leader,常見的是訂閱模型(讓 Leader 和遠(yuǎn)程服務(wù)端交互,并等待其返回)。
- 如何確定誰是 Leader,誰是 Follower 呢?有兩種模式:自由競爭模式:客戶端啟動的時(shí)候通過本地端口的爭奪來確定 Leader。例如:大家都嘗試監(jiān)聽 7777 端口,最后只會有一個(gè)實(shí)例搶占到,那它就變成 Leader,其余的都是 Follower。強(qiáng)制指定模式:框架指定某一個(gè) Leader,其余的就是 Follower。
- 框架里面我們采用的是強(qiáng)制指定模式,Leader 只能在 Agent 里面創(chuàng)建,這也符合我們對 Agent 的定位
- 框架啟動的時(shí)候 Master 會隨機(jī)選擇一個(gè)可用的端口作為 Cluster Client 監(jiān)聽的通訊端口,并將它通過參數(shù)傳遞給 Agent 和 App Worker。
- Leader 和 Follower 之間通過 socket 直連(通過通訊端口),不再需要 Master 中轉(zhuǎn)。
新的模式下,客戶端的通信方式如下:
+-------+ | start | +---+---+ | +--------+---------+ __| port competition |__ win / +------------------+ \ lose / \ +---------------+ tcp conn +-------------------+ | Leader(Agent) |<---------------->| Follower(Worker1) | +---------------+ +-------------------+ | \ tcp conn | \ +--------+ +-------------------+ | Client | | Follower(Worker2) | +--------+ +-------------------+
|
客戶端接口類型抽象
我們將客戶端接口抽象為下面兩大類,這也是對客戶端接口的一個(gè)規(guī)范,對于符合規(guī)范的客戶端,我們可以自動將其包裝為 Leader/Follower 模式。
- 訂閱、發(fā)布類(subscribe / publish):subscribe(info, listener) 接口包含兩個(gè)參數(shù),第一個(gè)是訂閱的信息,第二個(gè)是訂閱的回調(diào)函數(shù)。publish(info) 接口包含一個(gè)參數(shù),就是訂閱的信息。
- 調(diào)用類 (invoke),支持 callback, promise 和 generator function 三種風(fēng)格的接口,但是推薦使用 generator function。
客戶端示例
const Base = require('sdk-base');
class Client extends Base { constructor(options) { super(options); // 在初始化成功以后記得 ready this.ready(true); }
/** * 訂閱 * * @param {Object} info - 訂閱的信息(一個(gè) JSON 對象,注意盡量不要包含 Function, Buffer, Date 這類屬性) * @param {Function} listener - 監(jiān)聽的回調(diào)函數(shù),接收一個(gè)參數(shù)就是監(jiān)聽到的結(jié)果對象 */ subscribe(info, listener) { // ... }
/** * 發(fā)布 * * @param {Object} info - 發(fā)布的信息,和上面 subscribe 的 info 類似 */ publish(info) { // ... }
/** * 獲取數(shù)據(jù) (invoke) * * @param {String} id - id * @return {Object} result */ async getData(id) { // ... } }
|
異常處理
- Leader 如果“死掉”會觸發(fā)新一輪的端口爭奪,爭奪到端口的那個(gè)實(shí)例被推選為新的 Leader。
- 為保證 Leader 和 Follower 之間的通道健康,需要引入定時(shí)心跳檢查機(jī)制,如果 Follower 在固定時(shí)間內(nèi)沒有發(fā)送心跳包,那么 Leader 會將 Follower 主動斷開,從而觸發(fā) Follower 的重新初始化。
協(xié)議和調(diào)用時(shí)序
Leader 和 Follower 通過下面的協(xié)議進(jìn)行數(shù)據(jù)交換:
0 1 2 4 12 +-------+-------+---------------+---------------------------------------------------------------+ |version|req/res| reserved | request id | +-------------------------------+-------------------------------+-------------------------------+ | timeout | connection object length | application object length | +-------------------------------+---------------------------------------------------------------+ | conn object (JSON format) ... | app object | +-----------------------------------------------------------+ | | ... | +-----------------------------------------------------------------------------------------------+
|
- 在通訊端口上 Leader 啟動一個(gè) Local Server,所有的 Leader/Follower 通訊都經(jīng)過 Local Server。
- Follower 連接上 Local Server 后,首先發(fā)送一個(gè) register channel 的 packet(引入 channel 的概念是為了區(qū)別不同類型的客戶端)。
- Local Server 會將 Follower 分配給指定的 Leader(根據(jù)客戶端類型進(jìn)行配對)。
- Follower 向 Leader 發(fā)送訂閱、發(fā)布請求。
- Leader 在訂閱數(shù)據(jù)變更時(shí)通過 subscribe result packet 通知 Follower。
- Follower 向 Leader 發(fā)送調(diào)用請求,Leader 收到后執(zhí)行相應(yīng)操作后返回結(jié)果。
+----------+ +---------------+ +---------+ | Follower | | Local Server | | Leader | +----------+ +---------------+ +---------+ | register channel | assign to | + -----------------------> | --------------------> | | | | | subscribe | + ------------------------------------------------> | | publish | + ------------------------------------------------> | | | | subscribe result | | <------------------------------------------------ + | | | invoke | + ------------------------------------------------> | | invoke result | | <------------------------------------------------ + | |
|
具體的使用方法
下面我用一個(gè)簡單的例子,介紹在框架里面如何讓一個(gè)客戶端支持 Leader/Follower 模式:
- 第一步,我們的客戶端最好是符合上面提到過的接口約定,例如:
// registry_client.js const URL = require('url'); const Base = require('sdk-base');
class RegistryClient extends Base { constructor(options) { super({ // 指定異步啟動的方法 initMethod: 'init', }); this._options = options; this._registered = new Map(); }
/** * 啟動邏輯 */ async init() { this.ready(true); }
/** * 獲取配置 * @param {String} dataId - the dataId * @return {Object} 配置 */ async getConfig(dataId) { return this._registered.get(dataId); }
/** * 訂閱 * @param {Object} reg * - {String} dataId - the dataId * @param {Function} listener - the listener */ subscribe(reg, listener) { const key = reg.dataId; this.on(key, listener);
const data = this._registered.get(key); if (data) { process.nextTick(() => listener(data)); } }
/** * 發(fā)布 * @param {Object} reg * - {String} dataId - the dataId * - {String} publishData - the publish data */ publish(reg) { const key = reg.dataId; let changed = false;
if (this._registered.has(key)) { const arr = this._registered.get(key); if (arr.indexOf(reg.publishData) === -1) { changed = true; arr.push(reg.publishData); } } else { changed = true; this._registered.set(key, [reg.publishData]); } if (changed) { this.emit(key, this._registered.get(key).map(url => URL.parse(url, true))); } } }
module.exports = RegistryClient;
|
- 第二步,使用 agent.cluster 接口對 RegistryClient 進(jìn)行封裝:
// agent.js const RegistryClient = require('registry_client');
module.exports = agent => { // 對 RegistryClient 進(jìn)行封裝和實(shí)例化 agent.registryClient = agent.cluster(RegistryClient) // create 方法的參數(shù)就是 RegistryClient 構(gòu)造函數(shù)的參數(shù) .create({});
agent.beforeStart(async () => { await agent.registryClient.ready(); agent.coreLogger.info('registry client is ready'); }); };
|
- 第三步,使用 app.cluster 接口對 RegistryClient 進(jìn)行封裝:
// app.js const RegistryClient = require('registry_client');
module.exports = app => { app.registryClient = app.cluster(RegistryClient).create({}); app.beforeStart(async () => { await app.registryClient.ready(); app.coreLogger.info('registry client is ready');
// 調(diào)用 subscribe 進(jìn)行訂閱 app.registryClient.subscribe({ dataId: 'demo.DemoService', }, val => { // ... });
// 調(diào)用 publish 發(fā)布數(shù)據(jù) app.registryClient.publish({ dataId: 'demo.DemoService', publishData: 'xxx', });
// 調(diào)用 getConfig 接口 const res = await app.registryClient.getConfig('demo.DemoService'); console.log(res); }); };
|
是不是很簡單?
當(dāng)然,如果你的客戶端不是那么『標(biāo)準(zhǔn)』,那你可能需要用到其他一些 API,比如,你的訂閱函數(shù)不叫 subscribe 而是叫 sub:
class MockClient extends Base { constructor(options) { super({ initMethod: 'init', }); this._options = options; this._registered = new Map(); }
async init() { this.ready(true); }
sub(info, listener) { const key = reg.dataId; this.on(key, listener);
const data = this._registered.get(key); if (data) { process.nextTick(() => listener(data)); } }
... }
|
你需要通過 delegate(API代理)手動設(shè)置此委托:
// agent.js module.exports = agent => { agent.mockClient = agent.cluster(MockClient) // 將 sub 代理到 subscribe 邏輯上 .delegate('sub', 'subscribe') .create();
agent.beforeStart(async () => { await agent.mockClient.ready(); }); };
|
// app.js module.exports = app => { app.mockClient = app.cluster(MockClient) // 將 sub 代理到 subscribe 邏輯上 .delegate('sub', 'subscribe') .create();
app.beforeStart(async () => { await app.mockClient.ready();
app.sub({ id: 'test-id' }, val => { // put your code here }); }); };
|
我們已經(jīng)理解,通過 cluster-client 可以讓我們在不理解多進(jìn)程模型的情況下開發(fā)『純粹』的 RegistryClient,只負(fù)責(zé)和服務(wù)端進(jìn)行交互,然后使用 cluster-client 進(jìn)行簡單的封裝就可以得到一個(gè)支持多進(jìn)程模型的 ClusterClient。這里的 RegistryClient 實(shí)際上是一個(gè)專門負(fù)責(zé)和遠(yuǎn)程服務(wù)通信進(jìn)行數(shù)據(jù)通信的 DataClient。
大家可能已經(jīng)發(fā)現(xiàn),ClusterClient 同時(shí)帶來了一些約束,如果想在各進(jìn)程暴露同樣的方法,那么 RegistryClient 上只能支持 sub/pub 模式以及異步的 API 調(diào)用。因?yàn)樵诙噙M(jìn)程模型中所有的交互都必須經(jīng)過 socket 通信,勢必帶來了這一約束。
假設(shè)我們要實(shí)現(xiàn)一個(gè)同步的 get 方法,訂閱過的數(shù)據(jù)直接放入內(nèi)存,使用 get 方法時(shí)直接返回。要怎么實(shí)現(xiàn)呢?而真實(shí)情況可能比這更復(fù)雜。
在這里,我們引入一個(gè) APIClient 的最佳實(shí)踐。對于有讀取緩存數(shù)據(jù)等同步 API 需求的模塊,在 RegistryClient 基礎(chǔ)上再封裝一個(gè) APIClient 來實(shí)現(xiàn)這些與遠(yuǎn)程服務(wù)端交互無關(guān)的 API,暴露給用戶使用到的是這個(gè) APIClient 的實(shí)例。
在 APIClient 內(nèi)部實(shí)現(xiàn)上:
- 異步數(shù)據(jù)獲取,通過調(diào)用基于 ClusterClient 的 RegistryClient 的 API 實(shí)現(xiàn)。
- 同步調(diào)用等與服務(wù)端無關(guān)的接口在 APIClient 上實(shí)現(xiàn)。由于 ClusterClient 的 API 已經(jīng)抹平了多進(jìn)程差異,所以在開發(fā) APIClient 調(diào)用到 RegistryClient 時(shí)也無需關(guān)心多進(jìn)程模型。
例如在模塊的 APIClient 中增加帶緩存的 get 同步方法:
// some-client/index.js const cluster = require('cluster-client'); const RegistryClient = require('./registry_client');
class APIClient extends Base { constructor(options) { super(options);
// options.cluster 用于給 Egg 的插件傳遞 app.cluster 進(jìn)來 this._client = (options.cluster || cluster)(RegistryClient).create(options); this._client.ready(() => this.ready(true));
this._cache = {};
// subMap: // { // foo: reg1, // bar: reg2, // } const subMap = options.subMap;
for (const key in subMap) { this.subscribe(subMap[key], value => { this._cache[key] = value; }); } }
subscribe(reg, listener) { this._client.subscribe(reg, listener); }
publish(reg) { this._client.publish(reg); }
get(key) { return this._cache[key]; } }
// 最終模塊向外暴露這個(gè) APIClient module.exports = APIClient;
|
那么我們就可以這么使用該模塊:
// app.js || agent.js const APIClient = require('some-client'); // 上面那個(gè)模塊 module.exports = app => { const config = app.config.apiClient; app.apiClient = new APIClient(Object.assign({}, config, { cluster: app.cluster }); app.beforeStart(async () => { await app.apiClient.ready(); }); };
// config.${env}.js exports.apiClient = { subMap: { foo: { id: '', }, // bar... } };
|
為了方便你封裝 APIClient,在 cluster-client 模塊中提供了一個(gè) APIClientBase 基類,那么上面的 APIClient 可以改寫為:
const APIClientBase = require('cluster-client').APIClientBase; const RegistryClient = require('./registry_client');
class APIClient extends APIClientBase { // 返回原始的客戶端類 get DataClient() { return RegistryClient; }
// 用于設(shè)置 cluster-client 相關(guān)參數(shù),等同于 cluster 方法的第二個(gè)參數(shù) get clusterOptions() { return { responseTimeout: 120 * 1000, }; }
subscribe(reg, listener) { this._client.subscribe(reg, listener); }
publish(reg) { this._client.publish(reg); }
get(key) { return this._cache[key]; } }
|
總結(jié)一下:
+------------------------------------------------+ | APIClient | | +----------------------------------------| | | ClusterClient | | | +---------------------------------| | | | RegistryClient | +------------------------------------------------+
|
- RegistryClient - 負(fù)責(zé)和遠(yuǎn)端服務(wù)通訊,實(shí)現(xiàn)數(shù)據(jù)的存取,只支持異步 API,不關(guān)心多進(jìn)程模型。
- ClusterClient - 通過 cluster-client 模塊進(jìn)行簡單 wrap 得到的 client 實(shí)例,負(fù)責(zé)自動抹平多進(jìn)程模型的差異。
- APIClient - 內(nèi)部調(diào)用 ClusterClient 做數(shù)據(jù)同步,無需關(guān)心多進(jìn)程模型,用戶最終使用的模塊。API 都通過此處暴露,支持同步和異步。
有興趣的同學(xué)可以看一下增強(qiáng)多進(jìn)程研發(fā)模式 討論過程。
在框架里面 cluster-client 相關(guān)的配置項(xiàng)
/** * @property {Number} responseTimeout - response timeout, default is 60000 * @property {Transcode} [transcode] * - {Function} encode - custom serialize method * - {Function} decode - custom deserialize method */ config.clusterClient = { responseTimeout: 60000, };
|
配置項(xiàng) | 類型 | 默認(rèn)值 | 描述 |
---|
responseTimeout | number | 60000 (一分鐘) | 全局的進(jìn)程間通訊的超時(shí)時(shí)長,不能設(shè)置的太短,因?yàn)榇淼慕涌诒旧硪灿谐瑫r(shí)設(shè)置 |
transcode | function | N/A | 進(jìn)程間通訊的序列化方式,默認(rèn)采用 serialize-json(建議不要自行設(shè)置) |
上面是全局的配置方式。如果,你想對一個(gè)客戶端單獨(dú)做設(shè)置
- 可以通過 app/agent.cluster(ClientClass, options) 的第二個(gè)參數(shù) options 進(jìn)行覆蓋
app.registryClient = app.cluster(RegistryClient, { responseTimeout: 120 * 1000, // 這里傳入的是和 cluster-client 相關(guān)的參數(shù) }).create({ // 這里傳入的是 RegistryClient 需要的參數(shù) });
|
- 也可以通過覆蓋 APIClientBase 的 clusterOptions 這個(gè) getter 屬性
const APIClientBase = require('cluster-client').APIClientBase; const RegistryClient = require('./registry_client');
class APIClient extends APIClientBase { get DataClient() { return RegistryClient; }
get clusterOptions() { return { responseTimeout: 120 * 1000, }; }
// ... }
module.exports = APIClient; |
更多建議: