接著前兩篇關於 egg-core 源碼分析的文章 egg-core 源碼分析一 和 egg-core 源碼分析二,今天來看一下 egg-cluster 的源碼實現邏輯。

NodeJs 中 javascript 的執行是單線程的,所以一個進程只能使用一個 CPU,為了最大可能的使用伺服器資源,一般我們可以使用下面三種方式實現:

  • 同一台機器上部署多個 Node 服務,使用不同的埠,然後用 Nginx 做負載均衡將請求轉發到不同的 Node 實例;
  • 使用 PM2 進程管理工具,多個進程共用一個埠,PM2 負責進程重啟工作;
  • 利用 Node 自帶 child_process 和 cluster 模塊可以方便實現多進程之間的通信;

egg-cluster 是什麼

egg-cluster 是 Egg 的多進程模型的啟動模式,在使用 cluster 模式啟動 Egg 應用時,我們只需要配置相關啟動參數, egg-cluster 會自動創建相關進程,並管理各個進程之間的通信以及異常處理等問題。主進程 Master 通過 child_process 模塊的 fork 函數創建 Agent 子進程,並通過 cluster 模塊創建 Worker 子進程。Master/Agent/Worker 三者各司其職,共同保證 Egg 應用的正常運行:

  • Master:

Master 進程只有一個且第一個啟動,主要負責進程管理的工作,包括 Worker、Agent 進程的初始化和重啟以及進程之間的通信工作。Master 不運行任何業務代碼,它的穩定性特別重要,一旦掛掉整個 Node 服務就掛掉了;

  • Agent

Agent 進程也只有一個,一般在業務開發時我們不太會用到 Agent 進程,它的用處主要有兩方面:(1)如果想讓你的代碼只在一個進程上運行(2)Agent 進程可以將某個消息同時廣播到所有 Worker 進程進行處理;

  • Worker

Worker 進程根據用戶自己的設定可以有多個,主要負責處理業務邏輯和用戶的請求。當 Worker 進程異常退出時,Master 進程會重啟一個新的 Worker 進程;

Agent 子進程是 Egg.Agent 類的實例,Worker 子進程是 Egg.Application 的實例,而 Egg.Agent 和 Egg.Application 都是 EggApplication 的子類,而 EggApplication 類又是 EggCore 的子類,關於 EggCore 的源碼實現可以看一下我前面的文章 Egg 源碼分析之 egg-core,所以類與實例之間的關係圖如下:

+--------------+ 實例 +--------------+
| Agent 子進程 | --------> | Agent 類 |
+--------------+ +---------------+
/
child_process.fork / 繼承
/
+---------------+ +-------------------+ 繼承 +------------+
| Master 主進程 | | EggApplication 類 | ------> | EggCore 類 |
+-------------- + +------------------ + +------------+
/
/ 繼承
cluster.fork /
+---------------+ 實例 +----------------+
| Worker 子進程 | -------> | Application 類 |
+---------------+ +-----------------+

egg-cluster 源碼分析

egg-cluster 整個模塊的入口是 master.js,它的初始化流程如下:

  1. workerManager 實例和 messenger 實例的初始化
  2. 自動探測及獲取可用的 clusterPort,使用 cluster-client 讓 Agent 和 Worker 直接通信變為可能
  3. 啟動 Agent 子進程
  4. 啟動 Worker 子進程
  5. 啟動完畢,實時監測各個進程服務狀態

// egg-cluster 源碼 -> 啟動流程(為了更容易看清楚初始化流程,constructor函數中有些代碼先後順序做了調整)
// Master 繼承了 EventEmitter模塊,通過事件的監聽和訂閱方式,非常方便的進行事務處理
class Master extends EventEmitter {
constructor(options) {
super();
//步驟1
this.workerManager = new Manager(); // workManager 是一個進程管理的工具,記錄當前各個進程的狀態信息
this.messenger = new Messenger(this); // messenger 主要負責進程之間的通信工作

//步驟2:自動探測及獲取可用的 clusterPort
detectPort((err, port) => {
this.options.clusterPort = port;
//步驟3. 啟動 Agent 子進程
this.forkAgentWorker();
});

//步驟4. 啟動 Worker 子進程
this.once(agent-start, this.forkAppWorkers.bind(this));

//步驟5:通知啟動完畢,實時監測子進程服務狀態
this.ready(() => {
this.isStarted = true;
const action = egg-ready;
//通過 messenger 的 send 函數通知服務已經啟動就緒
this.messenger.send({ action, to: parent, data: { port: this[REALPORT], address: this[APP_ADDRESS] } });
this.messenger.send({ action, to: app, data: this.options });
this.messenger.send({ action, to: agent, data: this.options });
//使用 workerManager.startCheck 函數定時監控進程狀態
if (this.isProduction) {
this.workerManager.startCheck();
}
});
}
}

步驟1:子進程的管理和進程間的通信(manager 和 messenger)

  • manager

manager 實現比較簡單,主要通過兩個屬性 workers 和 agent 來維護進程的狀態信息,提供了多個函數用於獲取,刪除,設置相關進程。這裡主要看一下監聽進程存活狀態的 startCheck 函數的實現:

// egg-cluster 源碼 -> startCheck實現
class Manager extends EventEmitter {
startCheck() {
this.exception = 0;
// 每隔 10s 鍾監聽 Worker 和 Agent 的數量
this.timer = setInterval(() => {
const count = this.count();
// Agent 只有一個且必須處於存活狀態,Worker 至少要有一個處於存活狀態服務才可用
if (count.agent && count.worker) {
this.exception = 0;
return;
}
this.exception++;
// 如果連續三次檢查發現服務都不可用就觸發 exception 事件,master.workerManager 監聽到該事件後會退出服務
if (this.exception >= 3) {
this.emit(exception, count);
clearInterval(this.timer);
}
}, 10000);
}
}

  • messenger

Worker 子進程與 Agent 子進程都可以通過 IPC 與 Master 進程進行通信,而 Worker 子進程與 Agent 子進程之間是無法直接通信的,必須通過 Master 進程作為中間橋樑進行通信。每個 Master/Agent/Worker 進程都有一個 messenger 實例,該 messenger 實例用於管理與其它進程的通信工作。

這裡需要注意的是 Master.messenger 實例對應的 Messenger 類的定義是在 egg-cluster 源碼 中,而 Agent.messenger 和 Worker.messenger 實例對應的 Messenger 類的定義是在 egg 源碼 中。前者定義了主進程如何給子進程發送消息,而後者定義了子進程如何給父進程發送消息,而兩者都基於了另外一個模塊 sendmessage 的基礎上抽象實現的。這裡有點奇怪的既然 sendmessage 函數已經兼容了 Master/Agent/Worker 三者之間的通信方式的區別,為什麼沒有把兩個 Messenger 類放在一個模塊里實現,有些許重複的代碼。下面看一下 sendmessage 函數的實現部分:

// egg-cluster 源碼 -> sendmessage 函數實現
module.exports = function send(child, message) {
// 如果沒有send函數,說明 child 是一個 Master 進程,直接 emit 一個 message 事件給 child 進程本身
if (typeof child.send !== function) {
return setImmediate(child.emit.bind(child, message, message));
}

// Agent 子進程通過 child_process.fork() 函數創建出來的
// Worker 子進程通過 cluster.fork() 函數創建出來的,而 cluster.fork() 函數又會調用 child_process.fork(),將其返回對象綁定在 worker.process 上
// 所以 Worker 子進程對應的是 child.process.connected,而 Agent 子進程對應的是 child.connected
var connected = child.process ? child.process.connected : child.connected;
// 通過子進程的 send 函數向父進程發送消息
if (connected) {
return child.send(message);
}
};

當 Agent 收到一個消息後,雖然一定是 Master 進程發過來的,但是我們無法確定這個消息的傳輸過程是 Master -> Agent 還是 Worker -> Master -> Agent,所以為了標誌消息從哪裡來到哪裡去,一個消息體里主要包含了以下幾個欄位:

  • from: 消息從哪裡來
  • to: 消息去哪裡
  • action: 消息是幹什麼的
  • data: 消息的具體內容

Messenger 中的函數實現都比較簡單,這裡主要列一下它們分別提供了哪些函數。

Master.messenger 提供的函數:

  1. sendToAgentWorker:發送給 Agent 進程
  2. sendToAppWorker: 發送給 Worker 進程
  3. sendToMaster:發送給 Master 進程自己
  4. send:可以動態指定 from/to/action 進行發送

Agent.messenger 和 Worker.messenger 提供的函數:

  1. broadcast: 發送消息給所有的 Agent 和 Worker 包括自己
  2. sendToApp: 發送給所有的 Worker
  3. sendToAgent:發送給 Agent,Agent 發送給 Agent 自己本身
  4. sendRandom:Agent 隨機發送給一個 Worker
  5. sendTo:可以指定發送給誰

步驟2:cluster-client 增強 Agent 和 Worker 進程間通信

從前面的分析我們知道,如果進程間只採用 IPC 進行通信,那麼 Agent 和 Worker 之間通信必須通過 Master 作中轉,尤其是在客戶端與服務端有長連接的情況下,如果我們通過 Agent 進程與客戶端建立長連接,然後 Agent 再與 Worker 建立長連接,那麼比起客戶端直接與 Worker 建立長連接,連接個數可以減少 N(Worker 個數)倍。為此 Egg 提供了 Agent 與 Worker 之間直接進行長連接的渠道,採用 Leader/Follower 模式,Agent(Leader)負責與遠程客戶端維持長連接,而 Worker 與 Agent 之間的長連接通信通過」訂閱/發布「的模式使開發非常簡單,具體的實現步驟如下:

+-------+
| start |
+---+---+
|
+--------+---------+
__| port competition |__
win / +------------------+ lose
/
+---------------+ tcp conn +-------------------+
| Leader(Agent) |<---------------->| Follower(Worker1) |
+---------------+ +-------------------+
| tcp conn
|
+--------+ +-------------------+
| Client | | Follower(Worker2) |
+--------+ +-------------------+

  • 服務啟動時,系統自動探測一個可用 clusterPort 作為建立 Agent 和 Worker 長連接的埠號,源碼如下:

// egg-cluster 源碼 -> detectPort函數
module.exports = (port, callback) => {
let maxPort = port + 10;
if (typeof callback === function) {
// tryListen 用於探測 port 到 maxPort 之間的可用的埠號
return tryListen(port, maxPort, callback);
}
};
// listen 函數 使用 net 模塊建立連接來測試埠可用性,
// net.Server().listen(port) 函數在 port=0 可以隨便分配一個可用的埠
function listen(port, hostname, callback) {
const server = new net.Server();
server.on(error, err => {
server.close();
// 忽略 "ENOTFOUND" 報錯,表示埠仍然可用
if (err.code === ENOTFOUND) {
return callback(null, port);
}
return callback(err);
});
server.listen(port, hostname, () => {
port = server.address().port;
server.close();
return callback(null, port);
});
}

// tryListen 函數會調用 listen 函數,
//拿到可用埠後檢查該埠對應 hostname 為 0.0.0.0,localhost,ip 時是否都可以使用,否則調用 handleError 繼續進行探測
function tryListen(port, maxPort, callback) {
function handleError() {
tryListen(port, maxPort, callback);
}

listen(port, null, (err, realPort) => {
listen(port, 0.0.0.0, err => {
if (err) {
return handleError(err);
}
listen(port, localhost, err => {
if (err && err.code !== EADDRNOTAVAIL) {
return handleError(err);
}
listen(port, address.ip(), (err, realPort) => {
if (err) {
return handleError(err);
}
callback(null, realPort);
});
});
});
});
}

  • 將 clusterPort 分配給 Agent 進程(Leader),Agent 進程通過 cluster-client 模塊初始化 cluster 屬性
  • Worker 進程也分別根據 clusterPort 通過 cluster-client 模塊初始化 cluster 屬性(Follower)

上述兩步的代碼不是寫在 egg-cluster 模塊里的,而是在 EggApplication 類的定義中實現的,這裡和上面講到的 Messenger 類的實現一樣有點繞,因為同樣的邏輯寫在了兩個地方,源碼如下:

// egg-core源碼 -> Agent/Worker 的 cluster 屬性初始化
const cluster = require(cluster-client);
class EggApplication extends EggCore {
constructor(options) {
this.cluster = (clientClass, options) => {
options = Object.assign({}, this.config.clusterClient, options, {
port: this.options.clusterPort, //將在上一步 detectPort 中獲取到的 clusterPort 傳入
isLeader: this.type === agent //指定 Agent 進程為 leader, Worker 進程為 follower
});
const client = cluster(clientClass, options);
this._patchClusterClient(client);
return client;
};
}
}

  • 現在 Agent 和 Worker 實例上都有了 cluster 屬性,然後業務開發者就可以按照一定的約定規範實現 Agent 和 Worker 之間的長連接通信了,關於 cluster-client 如何實現 Leader/Follower 長連接模式以及如何在業務開發中使用這裡就不詳細介紹了,可以看相關文檔:cluster-client 源碼 和 Egg 官方文檔-多進程研發模式增強。

步驟3:啟動 Agent 子進程

在拿到 clusterPort 後,egg-cluster 立即調用 forkAgentWorker 函數啟動 Agent 子進程:

// egg-cluster 源碼 -> forkAgentWorker
forkAgentWorker() {
// Agent 進程參數初始化
const args = [ JSON.stringify(this.options) ];
//通過 childprocess 模塊的 fork 函數創建 Agent 子進程
const agentWorker = childprocess.fork(this.getAgentWorkerFile(), args, opt);
this.workerManager.setAgent(agentWorker); //註冊到 workerManager 中
//監聽 Agent 子進程消息,並通過 this.messenger.send 進行轉發
agentWorker.on(message, msg => {
if (typeof msg === string) msg = { action: msg, data: msg };
msg.from = agent;
this.messenger.send(msg);
});
//前面我們講過服務中的 Agent 進程必須處於存活狀態,所以當Agent 監聽到異常消息進行日誌記錄,但不退出
agentWorker.on(error, err => {
err.name = AgentWorkerError;
err.id = agentWorker.id;
err.pid = agentWorker.pid;
this.logger.error(err);
});
//異常情況下退出時,通知 Master 進程
agentWorker.once(exit, (code, signal) => {
this.messenger.send({
action: agent-exit,
data: { code, signal },
to: master,
from: agent,
});
});
}

forkAgentWorker 函數只是在 Master 進程中 fork 出子進程,而子進程真正的運行及初始化工作是在 agent_worker.js 中實現的,這裡使用了 graceful-process 這個模塊優雅的退出進程,需要等待所有已有連接的關閉後再退出進程,但不會接收新的連接,同樣 app_worker.js 中也會使用該函數。新建子進程代碼如下:

// egg-cluster -> agent_worker.js 新建 agent 子進程
const gracefulExit = require(graceful-process);
const options = JSON.parse(process.argv[2]);
//根據 Egg 框架 export 出的 Agent 類,新建 Agent 實例
const Agent = require(options.framework).Agent;
const agent = new Agent(options);

agent.ready(err => {
if (err) return;
agent.removeListener(error, startErrorHandler);
//告訴主進程 agent 進程啟動完畢,主進程收到消息後,緊接著步驟4:啟動 Worker 子進程
process.send({ action: agent-start, to: master });
});
//設置優雅的退出進程方式
gracefulExit({
logger: consoleLogger,
label: agent_worker,
beforeExit: () => agent.close(),
});

步驟4:啟動 Worker 子進程

Worker 的啟動基本與 Agent 的啟動流程一致,主要有以下兩點區別: - 啟動方式不一樣:Agent 使用 child_process.fork 啟動,而 Worker 使用 cluster.fork 啟動 - Worker 需要對外提供 http 服務,而 Agent 不需要

需要注意的是在創建子進程的時候使用 cfork 這個模塊,它是對 cluster.fork 的一個封裝,用於管理創建多個子進程以及退出時重新創建子進程的工作,forkAppWorkers源碼如下:

// egg-cluser 源碼 -> forkAppWorkers 函數
const cfork = require(cfork);
forkAppWorkers() {
const args = [ JSON.stringify(this.options) ];
//通過 cfork 函數創建多個 cluster
cfork({
exec: this.getAppWorkerFile(), // app_worker.js
args,
count: this.options.workers, //子進程個數
refork: this.isProduction, //退出時是否 refork
});
//監聽 fork 事件
cluster.on(fork, worker => {
this.workerManager.setWorker(worker); //將子進程註冊到 workerManager 中
worker.on(message, msg => { //通知 Master 主進程某個子進程已經創建好了
if (typeof msg === string) msg = { action: msg, data: msg };
msg.from = app;
this.messenger.send(msg);
});
});
}

接下來我們看一下 app_worker.js 中是如何初始化 worker 實例以及啟動 http 服務的,這裡需要區別 sticky 模式和非 sticky 模式:

  • sticky 模式:Master 負責統一監聽對外埠,然後根據用戶 ip 轉發到固定的 Worker 子進程上,每個 Worker 自己啟動了一個新的本地服務
  • 非 sticky 模式:每個Worker 都統一啟動服務監聽外部埠

// egg-cluster 源碼 -> app_worker.js 新建 worker 子進程
//新建 Egg.Application 實例
const options = JSON.parse(process.argv[2]);
const Application = require(options.framework).Application;
const app = new Application(options);

//獲取服務對外埠,並告知 Master
process.send({ to: master, action: realport, data: port });
//等待一切 ready 好以後啟動服務
app.ready(startServer);

// 啟動 http 服務
function startServer(err) {
let server;
// 針對 https 和 http 的不同進行處理
if (options.https) {
server = require(https).createServer(httpsOptions, app.callback());
} else {
server = require(http).createServer(app.callback());
}

if (options.sticky) {
// sticky 模式下,每個 Worker 都會隨機選擇一個埠啟動服務,這個服務用於接收 Master 服務轉發過來的 connection 並進行處理
server.listen(0, 127.0.0.1);
// Master 服務只會發送sticky-session:connection消息
process.on(message, (message, connection) => {
if (message !== sticky-session:connection) {
return;
}
server.emit(connection, connection);
connection.resume();
});
} else {
//非 sticky 情況下監聽對外埠,啟動服務
if (listenConfig.path) {
server.listen(listenConfig.path);
} else {
const args = [ port ];
if (listenConfig.hostname) args.push(listenConfig.hostname);
server.listen(...args);
}
}
}
//設置優雅的退出進程方式
gracefulExit({
logger: consoleLogger,
label: app_worker,
beforeExit: () => app.close(),
});

在 Worker 啟動後會發送 app-start 事件給 Master ,此時 Master 會執行 onAppStart 函數,這個函數除了對啟動後的服務做一些初始化工作以外,最主要的一件事情就是在 sticky 模式下啟動 MasterSocketServer 用於轉發請求到各個 Worker,源碼如下:

// egg-cluster 源碼 -> onAppStart 函數和 startMasterSocketServer 函數
onAppStart(data) {
//告訴 Agent 目前啟動的所有 WorkerIds
this.messenger.send({
action: egg-pids,
to: agent,
data: this.workerManager.getListeningWorkerIds(),
});
//這裡如果是 sticky 模式那麼會先創建一個 MasterSocketServer,然後才正式完成 Master 進程的啟動,
if (this.options.sticky) {
this.startMasterSocketServer(err => {
if (err) return this.ready(err);
this.ready(true);
});
} else {
this.ready(true);
}
}

startMasterSocketServer(cb) {
// Master 進程創建一個 TCP 服務並監聽真正的對外埠
//這裡的 pauseOnConnect=true 表示不會消費和使用傳入的 connection,因為這個 connection 只有真正的傳遞給某個 Worker 時才會被解析使用
require(net).createServer({ pauseOnConnect: true }, connection => {
//開發者必須在 Nginx 配置了 remoteAddress 相關信息才能根據用戶 ip 實現 sticky 模式.
if (!connection.remoteAddress) {
connection.close();
} else {
//根據用戶 ip 解析到指定的 Worker 實例,並將這個 connection 轉發給這個 Worker 子進程,Worker 負責真正處理請求
const worker = this.stickyWorker(connection.remoteAddress);
worker.send(sticky-session:connection, connection);
}
}).listen(this[REALPORT], cb);
}

步驟5:啟動完畢,實時監測各個進程服務狀態

在啟動完畢後,為了保證服務正常穩定運行,Master 主進程需要監聽各種異常事件:

  • agent-exit 事件:

Agent 子進程的退出事件,對應的處理函數是onAgentExit,Agent 子進程必須存活,Agent 子進程一旦退出,意味著整個服務就需要退出或者需要重啟 Agent 子進程,退出時需要 removeAllListeners 防止內存泄漏。

  • app-exit 事件:

Worker 子進程的退出事件,對應的處理函數是onAppExit,清理 Master.manager 中對於該進程的註冊消息,並 removeAllListeners,並根據不同的啟動環境以及參數判斷是否退出服務,生產環境會繼續 fork 出新的 Worker 子進程而不退出服務

  • SIGINT/SIGQUIT/SIGTERM 事件

當 Master 進程收到 Ctrl-C 或者 process.exit() 退出信號時會觸發這些事件,對應的處理函數是 onSignal 函數, onSignal 函數會調用 close 函數,close 函數又會調用 _doClose 函數,真正的退出操作都是在 _doClose 中實現的:

// egg-cluster 源碼 -> _doClose 實現
_doClose() {
//退出所有的 Worker 子進程
try {
yield this.killAppWorkers(appTimeout);
} catch (e) {
this.logger.error([master] app workers exit error: , e);
}
// 退出 Agent 子進程
try {
yield this.killAgentWorker(agentTimeout);
} catch (e) {
this.logger.error([master] agent worker exit error: , e);
}
}

而 killAppWorkers 函數和 killAgentWorker 函數在退出各個進程時,都會調用 terminate 函數。我們知道當 kill 掉一個進程後,依賴該進程創建的子進程將成為孤兒進程,而 terminate 函數就是負責包括進程本身以及子進程的退出工作:

// egg-cluster 源碼 -> terminate 函數實現
module.exports = function* (subProcess, timeout) {
const pid = subProcess.process ? subProcess.process.pid : subProcess.pid;
const childPids = yield getChildPids(pid); //獲取所有子進程的 pid
yield [
killProcess(subProcess, timeout), // kill 當前進程
killChildren(childPids, timeout), // kill 所有子進程
];
};

//如果 SIGTERM 信號不能工作,則使用 SIGKILL 信號進行 kill
function* killProcess(subProcess, timeout) {
subProcess.kill(SIGTERM);
yield Promise.race([
awaitEvent(subProcess, exit),
sleep(timeout),
]);
if (subProcess.killed) return;
(subProcess.process || subProcess).kill(SIGKILL);
}

//刪除所有對應的子進程
function* killChildren(children, timeout) {
if (!children.length) return;
kill(children, SIGTERM);
const start = Date.now();
const checkInterval = 400;
let unterminated = [];
while (Date.now() - start < timeout - checkInterval) {
yield sleep(checkInterval);
unterminated = getUnterminatedProcesses(children);
if (!unterminated.length) return;
}
kill(unterminated, SIGKILL);
}

參考文章

  • Egg 多進程模型和進程間通訊
  • Egg 源碼分析之 egg-core
  • Egg 源碼分析之 egg-cluster
  • egg-cluster 源碼

推薦閱讀:

查看原文 >>
相关文章