1. 程式人生 > >NodeJs: 使用cluster建立nodejs單機多核叢集(多程序)

NodeJs: 使用cluster建立nodejs單機多核叢集(多程序)

前言:

nodejs提供了cluster叢集(支援埠共享的多程序),cluster基於child_process,process二次封裝,方便我們使用該功能實現單機nodejs的web叢集。

1、cluster的處理機制

都知道單執行緒的nodejs遇到cpu密集型操作會很容易出現CPU滿載,服務阻塞的問題;通過類似nginx的master-worker多程序負載處理方式進一步壓榨硬體效能,提升nodejs單機服務處理效能。

     m a s t e r(主程序,分發請求)

        |           |            |            |

   worker  worker  worker  worker(子程序,處理請求)

2、官方cluster實現

nodejs官方文件中cluster的實現demo:

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
  for (var i = 0; i < numCPUs; i++) {
    cluster.fork();
  }
  cluster.on('exit', (worker, code, signal) => {
    console.log(`worker ${worker.process.pid} died`);
  });
} else {
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);
}

3、實現自己的單機nodejs叢集,實現多程序埠共享

3.1、程式碼實現

[javascript] view plain copy  print?
  1. //開啟叢集服務
  2. var startClusterSever = function(port, numCPUs) {  
  3.     if (cluster.isMaster) {  
  4.         for (var i = 0; i < numCPUs; i++) {  
  5.             const work = cluster.fork();  
  6.             console.log(work.process.pid);  
  7.             workers[i] = work;  
  8.         }  
  9.         cluster.on('exit', (worker, code, signal) => {  
  10.             console.log(`worker ${worker.process.pid} died`);  
  11.         });  
  12.     } else {  
  13.         console.log(cluster.worker.id);  
  14.         http.createServer((req, res) => {  
  15.             console.log("子程序:" + cluster.worker.id + "正在處理請求...");  
  16.             routeHandler(req, res);  
  17.         }).listen(port);  
  18.     }  
  19. }  

3.2、基於eguidRoute路由實現

注:在上一章的eguidRoute基礎上增加開啟單機叢集功能

使用:

eguid.start(8081, 8);//監聽8081埠,多執行緒數量8

[javascript] view plain copy  print?
  1. const http = require('http');  
  2. const url = require('url');  
  3. const path = require('path');  
  4. const fs = require('fs');  
  5. const cluster = require('cluster');  
  6. //路由表
  7. var routeArr = {};  
  8. //程序列表
  9. var workers = {};  
  10. //程序數量
  11. var clusterNum;  
  12. //解析請求地址
  13. var getPathName = function(reqUrl) {  
  14.     var urlParse = getUrlParse(reqUrl);  
  15.     return urlParse.pathname;  
  16. };  
  17. //獲取url解析
  18. var getUrlParse = function(reqUrl) {  
  19.     return url.parse(reqUrl);  
  20. };  
  21. //是否是一個請求
  22. var isFunc = function(pathName) {  
  23.     returntypeof routeArr[pathName] === 'function';  
  24. };  
  25. /**靜態資源處理 param(req:請求,res:響應,pathName:路徑) */
  26. var resStatic = function(req, res, pathName) {  
  27.     fs.readFile(pathName.substr(1), function(err, data) {  
  28.         err ? endErrorReq(res, 501) : endStaticReq(res, pathName, data);  
  29.         res.end();  
  30.     });  
  31. };  
  32. //響應靜態資源
  33. var endStaticReq = function(res, pathName, data) {  
  34.     var suffix = path.extname(pathName);  
  35.     res.writeHead(200, { 'Content-Type': suffix === ".css" ? 'text/css' : 'text/html;' + 'charset=utf-8' });  
  36.     res.write(data);  
  37. };  
  38. //結束錯誤請求
  39. var endErrorReq = function(res, err) {  
  40.     res.writeHead(err);  
  41.     res.end();  
  42. };  
  43. /** 路由分發處理器 */
  44. var routeHandler = function(req, res) {  
  45.     var pathName = getPathName(req.url);  
  46.     isFunc(pathName) ? routeArr[pathName](req, res, pathName) : resStatic(req, res, pathName);  
  47.     console.log("處理了一個請求:" + pathName);  
  48. };  
  49. /** 新增動態路由解析   
  50.  * param{ 
  51.  * reqUrl:請求地址,  
  52.  * service:function(request:請求,response:響應,pathName:請求名)} 
  53.  */
  54. var addDynamicRoute = function(reqUrl, service) {  
  55.     console.log("新增的服務名:" + reqUrl);  
  56.     routeArr[reqUrl] = service;  
  57. };  
  58. /**  開啟伺服器並監聽埠  param{port:埠號}*/
  59. var startServer = function(port, num) {  
  60.     clusterNum = num;  
  61.     if (num) {  
  62.         startClusterSever(port, num);  
  63.     } else {  
  64.         //建立伺服器  
  65.         http.createServer(function(req, res) {  
  66.             routeHandler(req, res);  
  67.         }).listen(port); //注意這裡的埠改成了變數
  68.         //開啟後在控制檯顯示該服務正在執行  
  69.         console.log('Server running at http://127.0.0.1:' + port);  
  70.     }  
  71. };  
  72. /** 設定靜態頁面請求別名 param(newUrl:新的請求路徑, oldUrl:原始路徑) */
  73. var setIndex = function(newUrl, oldUrl) {  
  74.     addDynamicRoute(newUrl, function(req, res) {  
  75.         resStatic(req, res, oldUrl);  
  76.     });  
  77. };  
  78. /**自定義靜態頁面處理方式 staticHandlerService=function(req,res,pathName)*/
  79. var setresStaticFunc = function(staticHandlerService) {  
  80.     resStatic = staticHandlerService;  
  81. };  
  82. //開啟叢集服務
  83. var startClusterSever = function(port, numCPUs) {  
  84.     if (cluster.isMaster) {  
  85.         for (var i = 0; i < numCPUs; i++) {  
  86.             const work = cluster.fork();  
  87.             console.log(work.process.pid);  
  88.             workers[i] = work;  
  89.         }  
  90.         cluster.on('exit', (worker, code, signal) => {  
  91.             console.log(`worker ${worker.process.pid} died`);  
  92.         });  
  93.     } else {  
  94.         console.log(cluster.worker.id);  
  95.         http.createServer((req, res) => {  
  96.             console.log("子程序:" + cluster.worker.id + "正在處理請求...");  
  97.             routeHandler(req, res);  
  98.         }).listen(port);  
  99.     }  
  100. }  
  101. exports.route = routeHandler;  
  102. exports.add = addDynamicRoute;  
  103. exports.start = startServer;  
  104. exports.index = setIndex;  
  105. exports.modStatic = setresStaticFunc;  
  106. /** 
  107.  * eguidRouter快速靈活的路由 
  108.  * 功能實現: 
  109.  * 1、自動靜態路由解析 
  110.  * 2、支援手動設定靜態路由別名 
  111.  * 3、支援建立新的靜態路由實現(方便載入模板) 
  112.  * 4、動態路由解析 
  113.  * 5、自動錯誤響應 
  114.  * 6、使用原生API,無第三方框架 
  115.  * 7、支援cluster單機叢集(機器效能壓榨機) 
  116.  */