Egg 多進(jìn)程研發(fā)模式增強(qiáng)

2020-02-06 14:12 更新

在前面的多進(jìn)程模型章節(jié)中,我們詳細(xì)講述了框架的多進(jìn)程模型,其中適合使用 Agent 進(jìn)程的有一類常見的場景:一些中間件客戶端需要和服務(wù)器建立長連接,理論上一臺服務(wù)器最好只建立一個(gè)長連接,但多進(jìn)程模型會導(dǎo)致 n 倍(n = Worker 進(jìn)程數(shù))連接被創(chuàng)建。

+--------+   +--------+
| Client | | Client | ... n
+--------+ +--------+
| \ / |
| \ / | n * m 個(gè)鏈接
| / \ |
| / \ |
+--------+ +--------+
| Server | | Server | ... m
+--------+ +--------+

為了盡可能的復(fù)用長連接(因?yàn)樗鼈儗τ诜?wù)端來說是非常寶貴的資源),我們會把它放到 Agent 進(jìn)程里維護(hù),然后通過 messenger 將數(shù)據(jù)傳遞給各個(gè) Worker。這種做法是可行的,但是往往需要寫大量代碼去封裝接口和實(shí)現(xiàn)數(shù)據(jù)的傳遞,非常麻煩。

另外,通過 messenger 傳遞數(shù)據(jù)效率是比較低的,因?yàn)樗鼤ㄟ^ Master 來做中轉(zhuǎn);萬一 IPC 通道出現(xiàn)問題還可能將 Master 進(jìn)程搞掛。

那么有沒有更好的方法呢?答案是肯定的,我們提供一種新的模式來降低這類客戶端封裝的復(fù)雜度。通過建立 Agent 和 Worker 的 socket 直連跳過 Master 的中轉(zhuǎn)。Agent 作為對外的門面維持多個(gè) Worker 進(jìn)程的共享連接。

核心思想

  • 受到 Leader/Follower 模式的啟發(fā)。
  • 客戶端會被區(qū)分為兩種角色:Leader: 負(fù)責(zé)和遠(yuǎn)程服務(wù)端維持連接,對于同一類的客戶端只有一個(gè) Leader。Follower: 會將具體的操作委托給 Leader,常見的是訂閱模型(讓 Leader 和遠(yuǎn)程服務(wù)端交互,并等待其返回)。
  • 如何確定誰是 Leader,誰是 Follower 呢?有兩種模式:自由競爭模式:客戶端啟動的時(shí)候通過本地端口的爭奪來確定 Leader。例如:大家都嘗試監(jiān)聽 7777 端口,最后只會有一個(gè)實(shí)例搶占到,那它就變成 Leader,其余的都是 Follower。強(qiáng)制指定模式:框架指定某一個(gè) Leader,其余的就是 Follower。
  • 框架里面我們采用的是強(qiáng)制指定模式,Leader 只能在 Agent 里面創(chuàng)建,這也符合我們對 Agent 的定位
  • 框架啟動的時(shí)候 Master 會隨機(jī)選擇一個(gè)可用的端口作為 Cluster Client 監(jiān)聽的通訊端口,并將它通過參數(shù)傳遞給 Agent 和 App Worker。
  • Leader 和 Follower 之間通過 socket 直連(通過通訊端口),不再需要 Master 中轉(zhuǎn)。

新的模式下,客戶端的通信方式如下:

             +-------+
| start |
+---+---+
|
+--------+---------+
__| port competition |__
win / +------------------+ \ lose
/ \
+---------------+ tcp conn +-------------------+
| Leader(Agent) |<---------------->| Follower(Worker1) |
+---------------+ +-------------------+
| \ tcp conn
| \
+--------+ +-------------------+
| Client | | Follower(Worker2) |
+--------+ +-------------------+

客戶端接口類型抽象

我們將客戶端接口抽象為下面兩大類,這也是對客戶端接口的一個(gè)規(guī)范,對于符合規(guī)范的客戶端,我們可以自動將其包裝為 Leader/Follower 模式。

  • 訂閱、發(fā)布類(subscribe / publish):subscribe(info, listener) 接口包含兩個(gè)參數(shù),第一個(gè)是訂閱的信息,第二個(gè)是訂閱的回調(diào)函數(shù)。publish(info) 接口包含一個(gè)參數(shù),就是訂閱的信息。
  • 調(diào)用類 (invoke),支持 callback, promise 和 generator function 三種風(fēng)格的接口,但是推薦使用 generator function。

客戶端示例

const Base = require('sdk-base');

class Client extends Base {
constructor(options) {
super(options);
// 在初始化成功以后記得 ready
this.ready(true);
}

/**
* 訂閱
*
* @param {Object} info - 訂閱的信息(一個(gè) JSON 對象,注意盡量不要包含 Function, Buffer, Date 這類屬性)
* @param {Function} listener - 監(jiān)聽的回調(diào)函數(shù),接收一個(gè)參數(shù)就是監(jiān)聽到的結(jié)果對象
*/
subscribe(info, listener) {
// ...
}

/**
* 發(fā)布
*
* @param {Object} info - 發(fā)布的信息,和上面 subscribe 的 info 類似
*/
publish(info) {
// ...
}

/**
* 獲取數(shù)據(jù) (invoke)
*
* @param {String} id - id
* @return {Object} result
*/
async getData(id) {
// ...
}
}

異常處理

  • Leader 如果“死掉”會觸發(fā)新一輪的端口爭奪,爭奪到端口的那個(gè)實(shí)例被推選為新的 Leader。
  • 為保證 Leader 和 Follower 之間的通道健康,需要引入定時(shí)心跳檢查機(jī)制,如果 Follower 在固定時(shí)間內(nèi)沒有發(fā)送心跳包,那么 Leader 會將 Follower 主動斷開,從而觸發(fā) Follower 的重新初始化。

協(xié)議和調(diào)用時(shí)序

Leader 和 Follower 通過下面的協(xié)議進(jìn)行數(shù)據(jù)交換:

0       1       2               4                                                              12
+-------+-------+---------------+---------------------------------------------------------------+
|version|req/res| reserved | request id |
+-------------------------------+-------------------------------+-------------------------------+
| timeout | connection object length | application object length |
+-------------------------------+---------------------------------------------------------------+
| conn object (JSON format) ... | app object |
+-----------------------------------------------------------+ |
| ... |
+-----------------------------------------------------------------------------------------------+
  1. 在通訊端口上 Leader 啟動一個(gè) Local Server,所有的 Leader/Follower 通訊都經(jīng)過 Local Server。
  2. Follower 連接上 Local Server 后,首先發(fā)送一個(gè) register channel 的 packet(引入 channel 的概念是為了區(qū)別不同類型的客戶端)。
  3. Local Server 會將 Follower 分配給指定的 Leader(根據(jù)客戶端類型進(jìn)行配對)。
  4. Follower 向 Leader 發(fā)送訂閱、發(fā)布請求。
  5. Leader 在訂閱數(shù)據(jù)變更時(shí)通過 subscribe result packet 通知 Follower。
  6. Follower 向 Leader 發(fā)送調(diào)用請求,Leader 收到后執(zhí)行相應(yīng)操作后返回結(jié)果。
+----------+             +---------------+          +---------+
| Follower | | Local Server | | Leader |
+----------+ +---------------+ +---------+
| register channel | assign to |
+ -----------------------> | --------------------> |
| | |
| subscribe |
+ ------------------------------------------------> |
| publish |
+ ------------------------------------------------> |
| |
| subscribe result |
| <------------------------------------------------ +
| |
| invoke |
+ ------------------------------------------------> |
| invoke result |
| <------------------------------------------------ +
| |

具體的使用方法

下面我用一個(gè)簡單的例子,介紹在框架里面如何讓一個(gè)客戶端支持 Leader/Follower 模式:

  • 第一步,我們的客戶端最好是符合上面提到過的接口約定,例如:
// registry_client.js
const URL = require('url');
const Base = require('sdk-base');

class RegistryClient extends Base {
constructor(options) {
super({
// 指定異步啟動的方法
initMethod: 'init',
});
this._options = options;
this._registered = new Map();
}

/**
* 啟動邏輯
*/
async init() {
this.ready(true);
}

/**
* 獲取配置
* @param {String} dataId - the dataId
* @return {Object} 配置
*/
async getConfig(dataId) {
return this._registered.get(dataId);
}

/**
* 訂閱
* @param {Object} reg
* - {String} dataId - the dataId
* @param {Function} listener - the listener
*/
subscribe(reg, listener) {
const key = reg.dataId;
this.on(key, listener);

const data = this._registered.get(key);
if (data) {
process.nextTick(() => listener(data));
}
}

/**
* 發(fā)布
* @param {Object} reg
* - {String} dataId - the dataId
* - {String} publishData - the publish data
*/
publish(reg) {
const key = reg.dataId;
let changed = false;

if (this._registered.has(key)) {
const arr = this._registered.get(key);
if (arr.indexOf(reg.publishData) === -1) {
changed = true;
arr.push(reg.publishData);
}
} else {
changed = true;
this._registered.set(key, [reg.publishData]);
}
if (changed) {
this.emit(key, this._registered.get(key).map(url => URL.parse(url, true)));
}
}
}

module.exports = RegistryClient;
  • 第二步,使用 agent.cluster 接口對 RegistryClient 進(jìn)行封裝:
// agent.js
const RegistryClient = require('registry_client');

module.exports = agent => {
// 對 RegistryClient 進(jìn)行封裝和實(shí)例化
agent.registryClient = agent.cluster(RegistryClient)
// create 方法的參數(shù)就是 RegistryClient 構(gòu)造函數(shù)的參數(shù)
.create({});

agent.beforeStart(async () => {
await agent.registryClient.ready();
agent.coreLogger.info('registry client is ready');
});
};
  • 第三步,使用 app.cluster 接口對 RegistryClient 進(jìn)行封裝:
// app.js
const RegistryClient = require('registry_client');

module.exports = app => {
app.registryClient = app.cluster(RegistryClient).create({});
app.beforeStart(async () => {
await app.registryClient.ready();
app.coreLogger.info('registry client is ready');

// 調(diào)用 subscribe 進(jìn)行訂閱
app.registryClient.subscribe({
dataId: 'demo.DemoService',
}, val => {
// ...
});

// 調(diào)用 publish 發(fā)布數(shù)據(jù)
app.registryClient.publish({
dataId: 'demo.DemoService',
publishData: 'xxx',
});

// 調(diào)用 getConfig 接口
const res = await app.registryClient.getConfig('demo.DemoService');
console.log(res);
});
};

是不是很簡單?

當(dāng)然,如果你的客戶端不是那么『標(biāo)準(zhǔn)』,那你可能需要用到其他一些 API,比如,你的訂閱函數(shù)不叫 subscribe 而是叫 sub:

class MockClient extends Base {
constructor(options) {
super({
initMethod: 'init',
});
this._options = options;
this._registered = new Map();
}

async init() {
this.ready(true);
}

sub(info, listener) {
const key = reg.dataId;
this.on(key, listener);

const data = this._registered.get(key);
if (data) {
process.nextTick(() => listener(data));
}
}

...
}

你需要通過 delegate(API代理)手動設(shè)置此委托:

// agent.js
module.exports = agent => {
agent.mockClient = agent.cluster(MockClient)
// 將 sub 代理到 subscribe 邏輯上
.delegate('sub', 'subscribe')
.create();

agent.beforeStart(async () => {
await agent.mockClient.ready();
});
};
// app.js
module.exports = app => {
app.mockClient = app.cluster(MockClient)
// 將 sub 代理到 subscribe 邏輯上
.delegate('sub', 'subscribe')
.create();

app.beforeStart(async () => {
await app.mockClient.ready();

app.sub({ id: 'test-id' }, val => {
// put your code here
});
});
};

我們已經(jīng)理解,通過 cluster-client 可以讓我們在不理解多進(jìn)程模型的情況下開發(fā)『純粹』的 RegistryClient,只負(fù)責(zé)和服務(wù)端進(jìn)行交互,然后使用 cluster-client 進(jìn)行簡單的封裝就可以得到一個(gè)支持多進(jìn)程模型的 ClusterClient。這里的 RegistryClient 實(shí)際上是一個(gè)專門負(fù)責(zé)和遠(yuǎn)程服務(wù)通信進(jìn)行數(shù)據(jù)通信的 DataClient。

大家可能已經(jīng)發(fā)現(xiàn),ClusterClient 同時(shí)帶來了一些約束,如果想在各進(jìn)程暴露同樣的方法,那么 RegistryClient 上只能支持 sub/pub 模式以及異步的 API 調(diào)用。因?yàn)樵诙噙M(jìn)程模型中所有的交互都必須經(jīng)過 socket 通信,勢必帶來了這一約束。

假設(shè)我們要實(shí)現(xiàn)一個(gè)同步的 get 方法,訂閱過的數(shù)據(jù)直接放入內(nèi)存,使用 get 方法時(shí)直接返回。要怎么實(shí)現(xiàn)呢?而真實(shí)情況可能比這更復(fù)雜。

在這里,我們引入一個(gè) APIClient 的最佳實(shí)踐。對于有讀取緩存數(shù)據(jù)等同步 API 需求的模塊,在 RegistryClient 基礎(chǔ)上再封裝一個(gè) APIClient 來實(shí)現(xiàn)這些與遠(yuǎn)程服務(wù)端交互無關(guān)的 API,暴露給用戶使用到的是這個(gè) APIClient 的實(shí)例。

在 APIClient 內(nèi)部實(shí)現(xiàn)上:

  • 異步數(shù)據(jù)獲取,通過調(diào)用基于 ClusterClient 的 RegistryClient 的 API 實(shí)現(xiàn)。
  • 同步調(diào)用等與服務(wù)端無關(guān)的接口在 APIClient 上實(shí)現(xiàn)。由于 ClusterClient 的 API 已經(jīng)抹平了多進(jìn)程差異,所以在開發(fā) APIClient 調(diào)用到 RegistryClient 時(shí)也無需關(guān)心多進(jìn)程模型。

例如在模塊的 APIClient 中增加帶緩存的 get 同步方法:

// some-client/index.js
const cluster = require('cluster-client');
const RegistryClient = require('./registry_client');

class APIClient extends Base {
constructor(options) {
super(options);

// options.cluster 用于給 Egg 的插件傳遞 app.cluster 進(jìn)來
this._client = (options.cluster || cluster)(RegistryClient).create(options);
this._client.ready(() => this.ready(true));

this._cache = {};

// subMap:
// {
// foo: reg1,
// bar: reg2,
// }
const subMap = options.subMap;

for (const key in subMap) {
this.subscribe(subMap[key], value => {
this._cache[key] = value;
});
}
}

subscribe(reg, listener) {
this._client.subscribe(reg, listener);
}

publish(reg) {
this._client.publish(reg);
}

get(key) {
return this._cache[key];
}
}

// 最終模塊向外暴露這個(gè) APIClient
module.exports = APIClient;

那么我們就可以這么使用該模塊:

// app.js || agent.js
const APIClient = require('some-client'); // 上面那個(gè)模塊
module.exports = app => {
const config = app.config.apiClient;
app.apiClient = new APIClient(Object.assign({}, config, { cluster: app.cluster });
app.beforeStart(async () => {
await app.apiClient.ready();
});
};

// config.${env}.js
exports.apiClient = {
subMap: {
foo: {
id: '',
},
// bar...
}
};

為了方便你封裝 APIClient,在 cluster-client 模塊中提供了一個(gè) APIClientBase 基類,那么上面的 APIClient 可以改寫為:

const APIClientBase = require('cluster-client').APIClientBase;
const RegistryClient = require('./registry_client');

class APIClient extends APIClientBase {
// 返回原始的客戶端類
get DataClient() {
return RegistryClient;
}

// 用于設(shè)置 cluster-client 相關(guān)參數(shù),等同于 cluster 方法的第二個(gè)參數(shù)
get clusterOptions() {
return {
responseTimeout: 120 * 1000,
};
}

subscribe(reg, listener) {
this._client.subscribe(reg, listener);
}

publish(reg) {
this._client.publish(reg);
}

get(key) {
return this._cache[key];
}
}

總結(jié)一下:

+------------------------------------------------+
| APIClient |
| +----------------------------------------|
| | ClusterClient |
| | +---------------------------------|
| | | RegistryClient |
+------------------------------------------------+
  • RegistryClient - 負(fù)責(zé)和遠(yuǎn)端服務(wù)通訊,實(shí)現(xiàn)數(shù)據(jù)的存取,只支持異步 API,不關(guān)心多進(jìn)程模型。
  • ClusterClient - 通過 cluster-client 模塊進(jìn)行簡單 wrap 得到的 client 實(shí)例,負(fù)責(zé)自動抹平多進(jìn)程模型的差異。
  • APIClient - 內(nèi)部調(diào)用 ClusterClient 做數(shù)據(jù)同步,無需關(guān)心多進(jìn)程模型,用戶最終使用的模塊。API 都通過此處暴露,支持同步和異步。

有興趣的同學(xué)可以看一下增強(qiáng)多進(jìn)程研發(fā)模式 討論過程。

在框架里面 cluster-client 相關(guān)的配置項(xiàng)

/**
* @property {Number} responseTimeout - response timeout, default is 60000
* @property {Transcode} [transcode]
* - {Function} encode - custom serialize method
* - {Function} decode - custom deserialize method
*/
config.clusterClient = {
responseTimeout: 60000,
};
配置項(xiàng)類型默認(rèn)值描述
responseTimeoutnumber60000 (一分鐘)全局的進(jìn)程間通訊的超時(shí)時(shí)長,不能設(shè)置的太短,因?yàn)榇淼慕涌诒旧硪灿谐瑫r(shí)設(shè)置
transcodefunctionN/A進(jìn)程間通訊的序列化方式,默認(rèn)采用 serialize-json(建議不要自行設(shè)置)

上面是全局的配置方式。如果,你想對一個(gè)客戶端單獨(dú)做設(shè)置

  • 可以通過 app/agent.cluster(ClientClass, options) 的第二個(gè)參數(shù) options 進(jìn)行覆蓋
app.registryClient = app.cluster(RegistryClient, {
responseTimeout: 120 * 1000, // 這里傳入的是和 cluster-client 相關(guān)的參數(shù)
}).create({
// 這里傳入的是 RegistryClient 需要的參數(shù)
});
  • 也可以通過覆蓋 APIClientBase 的 clusterOptions 這個(gè) getter 屬性
const APIClientBase = require('cluster-client').APIClientBase;
const RegistryClient = require('./registry_client');

class APIClient extends APIClientBase {
get DataClient() {
return RegistryClient;
}

get clusterOptions() {
return {
responseTimeout: 120 * 1000,
};
}

// ...
}

module.exports = APIClient;


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號