Node.js的程序管理
眾所周知Node基於V8,而在V8中JavaScript是單執行緒執行的,這裡的單執行緒不是指Node啟動的時候就只有一個執行緒,而是說執行JavaScript程式碼是在單執行緒上,Node還有其他執行緒,比如進行非同步IO操作的IO執行緒。這種單執行緒模型帶來的好處就是系統排程過程中不會頻繁進行上下文切換,提升了單核CPU的利用率。
但是這種做法有個缺陷,就是我們無法利用伺服器CPU多核的效能,一個Node程序只能利用一個CPU。而且單執行緒模式下一旦程式碼崩潰就是整個程式崩潰。通常解決方案就是使用Node的cluster模組,通過master-worker
模式啟用多個程序例項。下面我們詳細講述下,Node如何使用多程序模型利用多核CPU,以及自帶的cluster模組具體的工作原理。
如何建立子程序
node提供了child_process
模組用來進行子程序的建立,該模組一共有四個方法用來建立子程序。
const { spawn, exec, execFile, fork } = require('child_process')
spawn(command[, args][, options])
exec(command[, options][, callback])
execFile(file[, args][, options][, callback])
fork(modulePath[, args][, options])
複製程式碼
spawn
首先認識一下spawn方法,下面是Node文件的官方例項。
const { spawn } = require('child_process');
const child = spawn('ls', ['-lh', '/home']);
child.on('close', (code) => {
console.log(`子程序退出碼:${code}`);
});
const { stdin, stdout, stderr } = child
stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
});
stderr.on('data', (data) => {
console .log(`stderr: ${data}`);
});
複製程式碼
通過spawn建立的子程序,繼承自EventEmitter,所以可以在上面進行事件(discount
,error
,close
,message
)的監聽。同時子程序具有三個輸入輸出流:stdin、stdout、stderr,通過這三個流,可以實時獲取子程序的輸入輸出和錯誤資訊。
這個方法的最終實現基於libuv,這裡不再展開討論,感興趣可以檢視原始碼。
// 呼叫libuv的api,初始化一個程序
int err = uv_spawn(env->event_loop(), &wrap->process_, &options);
複製程式碼
exec/execFile
之所以把這兩個放到一起,是因為exec最後呼叫的就是execFile方法,原始碼在這裡。唯一的區別是,exec中呼叫的normalizeExecArgs
方法會將opts的shell屬性預設設定為true。
exports.exec = function exec(/* command , options, callback */) {
const opts = normalizeExecArgs.apply(null, arguments);
return exports.execFile(opts.file, opts.options, opts.callback);
};
function normalizeExecArgs(command, options, callback) {
options = { ...options };
options.shell = typeof options.shell === 'string' ? options.shell : true;
return { options };
}
複製程式碼
在execFile中,最終呼叫的是spawn
方法。
exports.execFile = function execFile(file /* , args, options, callback */) {
let args = [];
let callback;
let options;
var child = spawn(file, args, {
// ... some options
});
return child;
}
複製程式碼
exec會將spawn的輸入輸出流轉換成String,預設使用UTF-8的編碼,然後傳遞給回撥函式,使用回撥方式在node中較為熟悉,比流更容易操作,所以我們能使用exec方法執行一些shell
命令,然後在回撥中獲取返回值。有點需要注意,這裡的buffer是有最大快取區的,如果超出會直接被kill掉,可用通過maxBuffer屬性進行配置(預設: 200*1024)。
const { exec } = require('child_process');
exec('ls -lh /home', (error, stdout, stderr) => {
console.log(`stdout: ${stdout}`);
console.log(`stderr: ${stderr}`);
});
複製程式碼
fork
fork最後也是呼叫spawn來建立子程序,但是fork是spawn的一種特殊情況,用於衍生新的 Node.js 程序,會產生一個新的V8例項,所以執行fork方法時需要指定一個js檔案。
exports.fork = function fork(modulePath /* , args, options */) {
// ...
options.shell = false;
return spawn(options.execPath, args, options);
};
複製程式碼
通過fork建立子程序之後,父子程序直接會建立一個IPC(程序間通訊)通道,方便父子程序直接通訊,在js層使用 process.send(message)
和 process.on('message', msg => {})
進行通訊。而在底層,實現程序間通訊的方式有很多,Node的程序間通訊基於libuv實現,不同作業系統實現方式不一致。在*unix系統中採用Unix Domain Socket方式實現,Windows中使用命名管道的方式實現。
常見程序間通訊方式:訊息佇列、共享記憶體、pipe、訊號量、套接字
下面是一個父子程序通訊的例項。
parent.js
const path = require('path')
const { fork } = require('child_process')
const child = fork(path.join(__dirname, 'child.js'))
child.on('message', msg => {
console.log('message from child', msg)
});
child.send('hello child, I\'m master')
複製程式碼
child.js
process.on('message', msg => {
console.log('message from master:', msg)
});
let counter = 0
setInterval(() => {
process.send({
child: true,
counter: counter++
})
}, 1000);
複製程式碼
小結
其實可以看到,這些方法都是對spawn方法的複用,然後spawn方法底層呼叫了libuv進行程序的管理,具體可以看下圖。
利用fork實現master-worker
模型
首先來看看,如果我們在child.js
中啟動一個http服務會發生什麼情況。
// master.js
const { fork } = require('child_process')
for (let i = 0; i < 2; i++) {
const child = fork('./child.js')
}
// child.js
const http = require('http')
http.createServer((req, res) => {
res.end('Hello World\n');
}).listen(8000)
複製程式碼
+--------------+
| |
| master |
| |
+--------+--------------+- -- -- -
| |
| Error: listen EADDRINUSE
| |
|
+----v----+ +-----v---+
| | | |
| worker1 | | worker2 |
| | | |
+---------+ +---------+
:8000 :8000
複製程式碼
我們fork了兩個子程序,因為兩個子程序同時對一個埠進行監聽,Node會直接丟擲一個異常(Error: listen EADDRINUSE
),如上圖所示。那麼我們能不能使用代理模式,同時監聽多個埠,讓master程序監聽80埠收到請求時,再將請求分發給不同服務,而且master程序還能做適當的負載均衡。
+--------------+
| |
| master |
| :80 |
+--------+--------------+---------+
| |
| |
| |
| |
+----v----+ +-----v---+
| | | |
| worker1 | | worker2 |
| | | |
+---------+ +---------+
:8000 :8001
複製程式碼
但是這麼做又會帶來另一個問題,代理模式中十分消耗檔案描述符(linux系統預設的最大檔案描述符限制是1024),檔案描述符在windows系統中稱為控制代碼(handle),習慣性的我們也可以稱linux中的檔案描述符為控制代碼。當用戶進行訪問,首先連線到master程序,會消耗一個控制代碼,然後master程序再代理到worker程序又會消耗掉一個控制代碼,所以這種做法十分浪費系統資源。為了解決這個問題,Node的程序間通訊可以傳送控制代碼,節省系統資源。
控制代碼是一種特殊的智慧指標 。當一個應用程式要引用其他系統(如資料庫、作業系統)所管理的記憶體塊或物件時,就要使用控制代碼。
我們可以在master程序啟動一個tcp服務,然後通過IPC將服務的控制代碼傳送給子程序,子程序再對服務的連線事件進行監聽,具體程式碼如下:
// master.js
var { fork } = require('child_process')
var server = require('net').createServer()
server.on('connection', function(socket) {
socket.end('handled by master') // 響應來自master
})
server.listen(3000, function() {
console.log('master listening on: ', 3000)
})
for (var i = 0; i < 2; i++) {
var child = fork('./child.js')
child.send('server', server) // 傳送控制代碼給worker
console.log('worker create, pid is ', child.pid)
}
// child.js
process.on('message', function (msg, handler) {
if (msg !== 'server') {
return
}
// 獲取到控制代碼後,進行請求的監聽
handler.on('connection', function(socket) {
socket.end('handled by worker, pid is ' + process.pid)
})
})
複製程式碼
下面我們通過curl
連續請求 5 次服務。
for varible1 in {1..5}
do
curl "localhost:3000"
done
複製程式碼
可以看到,響應請求的可以是父程序,也可以是不同子程序,多個程序對同一個服務響應的連線事件監聽,誰先搶佔,就由誰進行響應。這裡就會出現一個Linux網路程式設計中很常見的事件,當多個程序同時監聽網路的連線事件,當這個有新的連線到達時,這些程序被同時喚醒,這被稱為“驚群”。這樣導致的情況就是,一旦事件到達,每個程序同時去響應這一個事件,而最終只有一個程序能處理事件成功,其他的程序在處理該事件失敗後重新休眠,造成了系統資源的浪費。
ps:在windows系統上,永遠都是最後定義的子程序搶佔到控制代碼,這可能和libuv的實現機制有關,具體原因往有大佬能夠指點。
出現這樣的問題肯定是大家都不願意的嘛,這個時候我們就想起了nginx
的好了,這裡有篇文章講解了nginx是如何解決“驚群”的,利用nginx的反向代理可以有效地解決這個問題,畢竟nginx本來就很擅長這種問題。
http {
upstream node {
server 127.0.0.1:8000;
server 127.0.0.1:8001;
server 127.0.0.1:8002;
server 127.0.0.1:8003;
keepalive 64;
}
server {
listen 80;
server_name shenfq.com;
location / {
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $http_host;
proxy_set_header X-Nginx-Proxy true;
proxy_set_header Connection "";
proxy_pass http://node; # 這裡要和最上面upstream後的應用名一致,可以自定義
}
}
}
複製程式碼
小結
如果我們自己用Node原生來實現一個多程序模型,存在這樣或者那樣的問題,雖然最終我們藉助了nginx達到了這個目的,但是使用nginx的話,我們需要另外維護一套nginx的配置,而且如果有一個Node服務掛了,nginx並不知道,還是會將請求轉發到那個埠。
cluster模組
除了用nginx做反向代理,node本身也提供了一個cluster
模組,用於多核CPU環境下多程序的負載均衡。cluster模組建立子程序本質上是通過child_procee.fork,利用該模組可以很容易的建立共享同一埠的子程序伺服器。
上手指南
有了這個模組,你會感覺實現Node的單機叢集是多麼容易的一件事情。下面看看官方例項,短短的十幾行程式碼就實現了一個多程序的Node服務,且自帶負載均衡。
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) { // 判斷是否為主程序
console.log(`主程序 ${process.pid} 正在執行`);
// 衍生工作程序。
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log(`工作程序 ${worker.process.pid} 已退出`);
});
} else { // 子程序進行伺服器建立
// 工作程序可以共享任何 TCP 連線。
// 在本例子中,共享的是一個 HTTP 伺服器。
http.createServer((req, res) => {
res.writeHead(200);
res.end('hello world\n');
}).listen(8000);
console.log(`工作程序 ${process.pid} 已啟動`);
}
複製程式碼
cluster模組原始碼分析
首先看程式碼,通過isMaster
來判斷是否為主程序,如果是主程序進行fork操作,子程序建立伺服器。這裡cluster進行fork操作時,執行的是當前檔案。cluster.fork
最終呼叫的child_process.fork
,且第一個引數為process.argv.slice(2)
,在fork子程序之後,會對其internalMessage事件進行監聽,這個後面會提到,具體程式碼如下:
const { fork } = require('child_process');
cluster.fork = function(env) {
cluster.setupMaster();
const id = ++ids;
const workerProcess = createWorkerProcess(id, env);
const worker = new Worker({
id: id,
process: workerProcess
});
// 監聽子程序的訊息
worker.process.on('internalMessage', internal(worker, onmessage));
// ...
};
// 配置master程序
cluster.setupMaster = function(options) {
cluster.settings = {
args: process.argv.slice(2),
exec: process.argv[1],
execArgv: process.execArgv,
silent: false,
...cluster.settings,
...options
};
};
// 建立子程序
function createWorkerProcess(id, env) {
return fork(cluster.settings.exec, cluster.settings.args, {
// some options
});
}
複製程式碼
子程序埠監聽問題
這裡會有一個問題,子程序全部都在監聽同一個埠,我們之前已經試驗過,服務監聽同一個埠會出現端口占用的問題,那麼cluster模組如何保證埠不衝突的呢? 查閱原始碼發現,http模組的createServer繼承自net模組。
util.inherits(Server, net.Server);
複製程式碼
而在net模組中,listen方法會呼叫listenInCluster方法,listenInCluster判斷當前是否為master程序。
Server.prototype.listen = function(...args) {
// ...
if (typeof options.port === 'number' || typeof options.port === 'string') {
// 如果listen方法只傳入了埠號,最後會走到這裡
listenInCluster(this, null, options.port | 0, 4, backlog, undefined, options.exclusive);
return this;
}
// ...
};
function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) {
if (cluster === undefined) cluster = require('cluster');
if (cluster.isMaster) {
// 如果是主程序則啟動一個服務
// 但是主程序沒有呼叫過listen方法,所以沒有走這裡一步
server._listen2(address, port, addressType, backlog, fd, flags);
return;
}
const serverQuery = {
address: address,
port: port,
addressType: addressType,
fd: fd,
flags,
};
// 子程序獲取主程序服務的控制代碼
cluster._getServer(server, serverQuery, listenOnMasterHandle);
function listenOnMasterHandle(err, handle) {
server._handle = handle; // 重寫handle,對listen方法進行了hack
server._listen2(address, port, addressType, backlog, fd, flags);
}
}
複製程式碼
看上面程式碼可以知道,真正啟動服務的方法為server._listen2
。在_listen2
方法中,最終呼叫的是_handle
下的listen方法。
function setupListenHandle(address, port, addressType, backlog, fd, flags) {
// ...
this._handle.onconnection = onconnection;
var err = this._handle.listen(backlog || 511);
// ...
}
Server.prototype._listen2 = setupListenHandle; // legacy alias
複製程式碼
那麼cluster._getServer
方法到底做了什麼呢?
搜尋它的原始碼,首先向master程序傳送了一個訊息,訊息型別為queryServer
。
// child.js
cluster._getServer = function(obj, options, cb) {
// ...
const message = {
act: 'queryServer',
index,
data: null,
...options
};
// 傳送訊息到master程序,訊息型別為 queryServer
send(message, (reply, handle) => {
rr(reply, indexesKey, cb); // Round-robin.
});
// ...
};
複製程式碼
這裡的rr方法,對前面提到的_handle.listen
進行了hack,所有子程序的listen其實是不起作用的。
function rr(message, indexesKey, cb) {
if (message.errno)
return cb(message.errno, null);
var key = message.key;
function listen(backlog) { // listen方法直接返回0,不再進行埠監聽
return 0;
}
function close() {
send({ act: 'close', key });
}
function getsockname(out) {
return 0;
}
const handle = { close, listen, ref: noop, unref: noop };
handles.set(key, handle); // 根據key將工作程序的 handle 進行快取
cb(0, handle);
}
// 這裡的cb回撥就是前面_getServer方法傳入的。 參考之前net模組的listen方法
function listenOnMasterHandle(err, handle) {
server._handle = handle; // 重寫handle,對listen方法進行了hack
// 該方法呼叫後,會對handle繫結一個 onconnection 方法,最後會進行呼叫
server._listen2(address, port, addressType, backlog, fd, flags);
}
複製程式碼
主程序與子程序通訊
那麼到底在哪裡對埠進行了監聽呢?
前面提到過,fork子程序的時候,對子程序進行了internalMessage事件的監聽。
worker.process.on('internalMessage', internal(worker, onmessage));
複製程式碼
子程序向master程序傳送訊息,一般使用process.send
方法,會被監聽的message
事件所接收。這裡是因為傳送的message指定了cmd: 'NODE_CLUSTER'
,只要cmd欄位以NODE_
開頭,這樣訊息就會認為是內部通訊,被internalMessage事件所接收。
// child.js
function send(message, cb) {
return sendHelper(process, message, null, cb);
}
// utils.js
function sendHelper(proc, message, handle, cb) {
if (!proc.connected)
return false;
// Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js
message = { cmd: 'NODE_CLUSTER', ...message, seq };
if (typeof cb === 'function')
callbacks.set(seq, cb);
seq += 1;
return proc.send(message, handle);
}
複製程式碼
master程序接收到訊息後,根據act的型別開始執行不同的方法,這裡act為queryServer
。queryServer方法會構造一個key,如果這個key(規則主要為地址+埠+檔案描述符)之前不存在,則對RoundRobinHandle
建構函式進行了例項化,RoundRobinHandle建構函式中啟動了一個TCP服務,並對之前指定的埠進行了監聽。
// master.js
const handles = new Map();
function onmessage(message, handle) {
const worker = this;
if (message.act === 'online')
online(worker);
else if (message.act === 'queryServer')
queryServer(worker, message);
// other act logic
}
function queryServer(worker, message) {
// ...
const key = `${message.address}:${message.port}:${message.addressType}:` +
`${message.fd}:${message.index}`;
var handle = handles.get(key);
// 如果之前沒有對該key進行例項化,則進行例項化
if (handle === undefined) {
let address = message.address;
// const RoundRobinHandle = require('internal/cluster/round_robin_handle');
var constructor = RoundRobinHandle;
handle = new constructor(key,
address,
message.port,
message.addressType,
message.fd,
message.flags);
handles.set(key, handle);
}
// ...
}
// internal/cluster/round_robin_handle
function RoundRobinHandle(key, address, port, addressType, fd, flags) {
this.server = net.createServer(assert.fail);
// 這裡啟動一個TCP伺服器
this.server.listen({ port, host });
// TCP伺服器啟動時的事件
this.server.once('listening', () => {
this.handle = this.server._handle;
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
});
// ...
}
複製程式碼
可以看到TCP服務啟動後,立馬對connection
事件進行了監聽,會呼叫RoundRobinHandle的distribute方法。
// RoundRobinHandle
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
// distribute 對工作程序進行分發
RoundRobinHandle.prototype.distribute = function(err, handle) {
this.handles.push(handle); // 存入TCP服務的控制代碼
const worker = this.free.shift(); // 取出第一個工作程序
if (worker)
this.handoff(worker); // 切換到工作程序
};
RoundRobinHandle.prototype.handoff = function(worker) {
const handle = this.handles.shift(); // 獲取TCP服務控制代碼
if (handle === undefined) {
this.free.push(worker); // 將該工作程序重新放入佇列中
return;
}
const message = { act: 'newconn', key: this.key };
// 向工作程序傳送一個型別為 newconn 的訊息以及TCP服務的控制代碼
sendHelper(worker.process, message, handle, (reply) => {
if (reply.accepted)
handle.close();
else
this.distribute(0, handle); // 工作程序不能正常執行,啟動下一個
this.handoff(worker);
});
};
複製程式碼
在子程序中也有對內部訊息進行監聽,在cluster/child.js
中,有個cluster._setupWorker
方法,該方法會對內部訊息監聽,該方法的在lib/internal/bootstrap/node.js
中呼叫,這個檔案是每次啟動node命令後,由C++模組呼叫的。
function startup() {
// ...
startExecution();
}
function startExecution() {
// ...
prepareUserCodeExecution();
}
function prepareUserCodeExecution() {
if (process.argv[1] && process.env.NODE_UNIQUE_ID) {
const cluster = NativeModule.require('cluster');
cluster._setupWorker();
delete process.env.NODE_UNIQUE_ID;
}
}
startup()
複製程式碼
下面看看_setupWorker方法做了什麼。
cluster._setupWorker = function() {
// ...
process.on('internalMessage', internal(worker, onmessage));
function onmessage(message, handle) {
// 如果act為 newconn 呼叫onconnection方法
if (message.act === 'newconn')
onconnection(message, handle);
else if (message.act === 'disconnect')
_disconnect.call(worker, true);
}
};
function onconnection(message, handle) {
const key = message.key;
const server = handles.get(key);
const accepted = server !== undefined;
send({ ack: message.seq, accepted });
if (accepted)
server.onconnection(0, handle); // 呼叫net中的onconnection方法
}
複製程式碼
最後子程序獲取到客戶端控制代碼後,呼叫net模組的onconnection,對Socket進行例項化,後面就與其他http請求的邏輯一致了,不再細講。
至此,cluster模組的邏輯就走通了,關於Node.js的程序管理相關的知識點就介紹的這裡了。