剖析下聊天室
由來
環境:PHP7、Swoole、linux
對聊天室有點感興趣,對於網絡協議有一點一知半解,所以決定借助swoole實現個簡單的聊天室,來簡單剖析下原理,知道原理以後就可以考慮用其他語言或者自己造輪子寫個,當然這是後話。
源碼我放置github( https://github.com/WalkingSun/SwooleServer ),有興趣可以借鑒借鑒。
系統設計
即時通訊的網絡通信基於長連接,通信方式有TCP、UDP、socket、websocket等,本次實現是websocket,系統建立常駐內存的websocket服務,客戶端即瀏覽器兩者建立連接通信。通信過程如下:
關於客戶端連接websocket服務,本文不做細述,websocket服務的建立借助swoole,需要在服務端的open、recieve、send、close建立回調處理,為了方便我將連接的客戶端信息放入swoole_table(一個基於共享內存和鎖實現的超高性能,並發數據結構)。
代碼僅供參考:
websocket 服務類:
<?php /** * Created by PhpStorm. * User: WalkingSun * Date: 2018/10/28 * Time: 15:54 */ class WsServer { const host = '0.0.0.0'; const port = '9501'; public $swoole; public $config = ['gcSessionInterval' => 60000]; public $openCallback; //open回調 public $messageCallback; //message回調 public $runApp; //request回調 public $workStartCallback; //work回調 public $finishCallback; //finish回調 public $closeCallback; //close回調 public $taskCallback; //task回調 public function __construct( $host, $port, $mode, $socketType, $swooleConfig=[], $config=[]) { $host = $host?:self::host; $port = $port?:self::port; $this->swoole = new Swoole_websocket_server($host,$port,$mode,$socketType); $this->webRoot = $swooleConfig['document_root']; if( !empty($this->config) ) $this->config = array_merge($this->config, $config); $this->swoole->set($swooleConfig); $this->swoole->on('open',[$this,'onOpen']); $this->swoole->on('message',[$this,'onMessage']); $this->swoole->on('request',[$this,'onRequest']); $this->swoole->on('WorkerStart',[$this,'onWorkerStart']); //增加work進程 $this->swoole->on('task',[$this,'onTask']); //增加task任務進程 $this->swoole->on('finish',[$this,'onFinish']); $this->swoole->on('close',[$this,'onClose']); } public function run(){ $this->swoole->start(); } /** * 當WebSocket客戶端與服務器建立連接並完成握手後會回調此函數 * @param $serv swoole_websocket_server 服務對象 * @param $request swoole_http_server 服務對象 */ public function onOpen( swoole_websocket_server $serv, $request){ call_user_func_array( $this->openCallback, [ $serv, $request ] ); //定時器(異步執行) // if($request->fd == 1){ // swoole_timer_tick(2000,function($timer_id){ // echo time().PHP_EOL; // }); // } } /** *當服務器收到來自客戶端的數據幀時會回調此函數。 * @param $server swoole_websocket_server 服務對象 * @param $frame swoole_websocket_frame對象,包含了客戶端發來的數據幀信息 * $frame->fd,客戶端的socket id,使用$server->push推送數據時需要用到 $frame->data,數據內容,可以是文本內容也可以是二進制數據,可以通過opcode的值來判斷 $frame->opcode,WebSocket的OpCode類型,可以參考WebSocket協議標準文檔 $frame->finish, 表示數據幀是否完整,一個WebSocket請求可能會分成多個數據幀進行發送(底層已經實現了自動合並數據幀,現在不用擔心接收到的數據幀不完整) */ public function onMessage(swoole_websocket_server $serv, swoole_websocket_frame $frame ){ call_user_func_array( $this->messageCallback, [ $serv, $frame ]); } /** * @param $serv swoole_websocket_server 服務對象 * @param $fd 連接的文件描述符 * @param $reactorId 來自那個reactor線程 * onClose回調函數如果發生了致命錯誤,會導致連接泄漏。通過netstat命令會看到大量CLOSE_WAIT狀態的TCP連接 * 當服務器主動關閉連接時,底層會設置此參數為-1,可以通過判斷$reactorId < 0來分辨關閉是由服務器端還是客戶端發起的。 */ public function onClose( swoole_websocket_server $serv , $fd , $reactorId ){ call_user_func_array( $this->closeCallback ,[ $serv , $fd , $reactorId ]); } /** * 在task_worker進程內被調用。worker進程可以使用swoole_server_task函數向task_worker進程投遞新的任務。當前的Task進程在調用onTask回調函數時會將進程狀態切換為忙碌,這時將不再接收新的Task,當onTask函數返回時會將進程狀態切換為空閑然後繼續接收新的Task。 * @param $serv swoole_websocket_server 服務對象 * @param $task_id int 任務id,由swoole擴展內自動生成,用於區分不同的任務。$task_id和$src_worker_id組合起來才是全局唯一的,不同的worker進程投遞的任務ID可能會有相同 * @param $src_worker_id int 來自於哪個worker進程 * @param $data mixed 任務的內容 */ public function onTask(swoole_server $serv, $task_id, $src_worker_id, $data){ call_user_func_array( $this->taskCallback , [ $serv, $task_id, $src_worker_id, $data ]); // sleep(10); // onTask函數中 return字符串,表示將此內容返回給worker進程。worker進程中會觸發onFinish函數,表示投遞的task已完成。 // return "task {$src_worker_id}-{$task_id} success"; } /** * 當worker進程投遞的任務在task_worker中完成時,task進程會通過swoole_server->finish()方法將任務處理的結果發送給worker進程。 * @param $serv swoole_websocket_server 服務對象 * @param $task_id int 任務id * @param $data string task任務處理的結果內容 * task進程的onTask事件中沒有調用finish方法或者return結果,worker進程不會觸發onFinish 執行onFinish邏輯的worker進程與下發task任務的worker進程是同一個進程 */ public function onFinish(swoole_server $serv, $task_id, $data){ call_user_func_array( $this->finishCallback ,[ $serv,$task_id,$data]); // echo $data; // return $data; } public function onRequest( $request, $response ){ call_user_func_array( $this->runApp, [ $request, $response ]); } public function onWorkerStart( $server, $worker_id ){ call_user_func_array( $this->workStartCallback , [$server, $worker_id]); } }
起服務和回調設置:
class SwooleController extends BasicController{ public $host; public $port; public $swoole_config=[]; public static $table; public function actionStart(){ $config = include __DIR__ . '/../config/console.php'; if( isset($config['swoole']['log_file']) ) $this->swoole_config['log_file'] = $config['swoole']['log_file']; if( isset($config['swoole']['pid_file']) ) $this->swoole_config['pid_file'] = $config['swoole']['pid_file']; $this->swoole_config = array_merge( [ 'document_root' => $config['swoole']['document_root'], 'enable_static_handler' => true, // 'daemonize'=>1, 'worker_num'=>4, 'max_request'=>2000, // 'task_worker_num'=>100, //檢查死鏈接 使用操作系統提供的keepalive機制來踢掉死鏈接 'open_tcp_keepalive'=>1, 'tcp_keepidle'=> 1*60, //連接在n秒內沒有數據請求,將開始對此連接進行探測 'tcp_keepcount' => 3, //探測的次數,超過次數後將close此連接 'tcp_keepinterval' => 0.5*60, //探測的間隔時間,單位秒 //swoole實現的心跳機制,只要客戶端超過一定時間沒發送數據,不管這個連接是不是死鏈接,都會關閉這個連接 // 'heartbeat_check_interval' => 10*60, //每m秒偵測一次心跳 // 'heartbeat_idle_time' => 30*60, //一個TCP連接如果在n秒內未向服務器端發送數據,將會被切斷 ],$this->swoole_config ); $this->host = $config['swoole']['host']; $this->port = $config['swoole']['port']; $swooleServer = new WsServer( $this->host,$this->port,$config['swoole']['mode'],$config['swoole']['socketType'],$this->swoole_config,$config); //連接信息保存到swoole_table self::$table = new \swoole_table(10); self::$table->column('username',\Swoole\Table::TYPE_STRING, 10); self::$table->column('avatar',\Swoole\Table::TYPE_STRING, 255); self::$table->column('msg',\Swoole\Table::TYPE_STRING, 255); self::$table->column('fd',\Swoole\Table::TYPE_INT, 6); self::$table->create(); $swooleServer->openCallback = function( $server , $request ){ echo "server handshake with fd={$request->fd}\n"; }; $swooleServer->runApp = function( $request , $response ) use($config,$swooleServer){ //全局變量設置及app.log $this->globalParam( $request ); $_SERVER['SERVER_SWOOLE'] = $swooleServer; //記錄日誌 $apiData = $_SERVER; unset($apiData['SERVER_SWOOLE']); Common::addLog( $config['log'] , ($apiData) ); //解析路由 $r = $_GET['r']; $r = $r?:( isset($config['defaultRoute'])?$config['defaultRoute']:'index/index'); $params = explode('/',$r); $controller = __DIR__.'/../controllers/'.ucfirst($params[0]).'Controller.php'; $result = ''; if( file_exists( $controller ) ){ require_once $controller; $class = new ReflectionClass(ucfirst($params[0]).'Controller'); if( $class->hasMethod( 'action'.ucfirst($params[1]) ) ){ $instance = $class->newInstanceArgs(); $method = $class->getmethod('action'.ucfirst($params[1])); // 獲取類中方法 ob_start(); $method->invoke($instance); // 執行方法 $result = ob_get_contents(); ob_clean(); }else{ $result = 'NOT FOUND!'; } }else{ $result = "$controller not exist!"; } $response->end( $result ); }; $swooleServer->workStartCallback = function( $server, $worker_id ){ }; $swooleServer->taskCallback = function( $server , $request ){ //發送通知或者短信、郵件等 }; $swooleServer->finishCallback = function( $serv, $task_id, $data ){ // return $data; }; $swooleServer->messageCallback = function( $server, $iframe ){ //記錄客戶端信息 echo "Client connection fd {$iframe->fd} ".PHP_EOL; $data = json_decode( $iframe->data ,1 ); if( !empty($data['token']) ){ if( $data['token']== 'simplechat_open' ){ if( !self::$table->exist($iframe->fd) ){ $user = array_merge($data,['fd'=>$iframe->fd]); self::$table->set($iframe->fd,$user); //發送連接用戶信息 foreach (self::$table as $v){ if($v['fd']!=$iframe->fd){ $pushData = array_merge($user,['action'=>'connect']); $server->push($v['fd'],json_encode($pushData)); } } } } if( $data['token']=='simplechat' ){ //查詢所有連接用戶,分發消息 foreach (self::$table as $v){ if($v['fd']!=$iframe->fd){ $pushData = ['username'=>$data['username'],'avatar'=>$data['avatar'],'time'=>date('H:i'),'data'=>$data['data'],'action'=>'send']; $server->push($v['fd'],json_encode($pushData)); } } } } //接受消息,對消息進行解析,發送給組內人其他人 }; $swooleServer->closeCallback = function( $server, $fd, $reactorId ){ if( self::$table->exist($fd) ){ //退出房間處理 self::$table->del($fd); foreach (self::$table as $v){ $pushData = ['fd'=>$fd,'username'=>'','avatar'=>'','time'=>date('H:i'),'data'=>'','action'=>'remove']; $server->push($v['fd'],json_encode($pushData)); } } echo "Client close fd {$fd}".PHP_EOL; }; $this->stdout("server is running, listening {$this->host}:{$this->port}" . PHP_EOL); $swooleServer->run(); } public function actionStop(){ $r = $this->sendSignal( SIGTERM ); if( $r ){ $this->stdout("server is stopped, stop listening {$this->host}:{$this->port}" . PHP_EOL); } } public function actionRestart(){ $this->sendSignal(SIGTERM); //向主進程發送SIGTERM實現關閉服務器 $this->actionStart(); } public function actionReload(){ $this->sendSignal(SIGUSR1); //向主進程/管理進程發送SIGUSR1信號,將平穩地restart所有Worker進程 } }
起了服務,客戶端就可以連接通信了。
心跳檢測
起了半天後服務常會斷掉,查看監聽端口進程狀態,服務器輸入:
$ netstat -anp |grep 9501
發現大量CLOSE_WAIT狀態,常用狀態有 ESTABLISHED 表示正在通信,TIME_WAIT 表示主動關閉,CLOSE_WAIT 表示被動關閉。
TIME_WAIT和CLOSE_WAIT兩種狀態如果一直被保持,意味著對應數目的通道就一直被占用,且“占著茅坑不使勁”,一旦句柄數達到上限,新的請求就無法處理。而且因為swoole是master-worker模式,
基本上http、tcp通信都是在worker進程,CLOSE_WAIT一直在,子進程將一直無法釋放,隨著時間的推移CLOSE_WAIT狀態的進程越來越多,阻礙新的連接進來,websocket服務不可用。
主動關閉 和 被動關閉
TCP關閉 四次揮手過程如下:
揮手流程:
1、 客戶端是調用函數close(),這時,客戶端會發送一個FIN給服務器。
2、 服務器收到FIN,關閉套接字讀通道,並將自己狀態設置為CLOSE_WAIT(表示被動關閉),
並返回一個ACK給客戶端。
3、 客戶端收到ACK,關閉套接字寫通道
接下來,服務器會調用close():
1、 服務器close(),發送一個FIN到客戶端。
2、 客戶端收到FIN,關閉讀通道,並將自己狀態設置成TIME_WAIT,發送一個ACK給服務器。
3、 服務器收到ACK,關閉寫通道,並將自己狀態設置為CLOSE。
4、 客戶端等待兩個最大數據傳輸時間,然後將自己狀態設置成CLOSED。
由此我們看到CLOSE-WAIT 狀態,TIME-WAIT 狀態 產生的過程,產生的原因是復雜的,比如說網絡通信中斷、用戶手機網絡切換wifi網絡、網絡通信丟包等,故此tcp揮手過程會出現中斷,繼而
產生這些關閉狀態。
為了解決這些占用連接數的異常連接,需要檢測連接是否是活動的,對於死連接我們需要釋放關閉它。
TIME_WAIT 主動關閉
主動關閉的一方在發送最後一個ACK包後,無論對方是否收到都會進入狀態,等待2MSL(Maximum Segment Lifetime數據包的最大生命周期,是一個數據包能在互聯網上生存的最長時間,若超過這個時間則該數據包將會消失在網絡中)
的時間,才會釋放網絡資源。
TIME_WAIT狀態的存在主要有兩個原因:
1)可靠地實現TCP全雙工連接的終止。在關TCP閉連接時,最後的ACK包是由主動關閉方發出的,如果這個ACK包丟失,則被動關閉方將重發FIN包,因此主動方必須維護狀態信息,以允許它重發這個
ACK包。如果不維持這個狀態信息,那麽主動方將回到CLOSED狀態,並對被動方重發的FIN包響應RST包,而被動關閉方將此包解釋成一個錯誤。因而,要實現TCP全雙工連接的正常終止,必須能夠處
理四次握手協議中任意一個包丟失的情況,主動關閉方必須維持狀態信息進入TIME_WAIT狀態。
2)確保迷路重復數據包在網絡中消失,防止上一次連接中的包迷路後重新出現,影響新連接。TCP數據包可能由於路由器異常而迷路,在迷路期間,數據包發送方可能因超時而重發這個包,迷路的
數據包在路由器恢復後也會被送到目的地,這個迷路的數據包就稱為Lost Duplicate。在關閉一個TCP連接後,如果馬上使用相同的IP地址和端口建立新的TCP連接,那麽有可能出現前一個連接的迷
路重復數據包在前一個連接關閉後再次出現,影響新建立的連接。為了避免這一情況,TCP協議不允許使用處於TIME_WAIT狀態的連接的IP和端口啟動一個新連接,只有經過2MSL的時間,確保上一次
連接中所有的迷路重復數據包都已消失在網絡中,才能安全地建立新連接。
如果Server主動關閉連接,同樣會有大量的連接在關閉後處於TIME_WAIT狀態,等待2MSL的時間後才能釋放網絡資源。對於並發連接,出現大量等待連接,新的連接進不來,會降低系統性能。
time_wait問題可以通過調整內核參數和適當的設置web服務器的keep-Alive值來解決。因為time_wait是自己可控的,要麽就是對方連接的異常,要麽就是自己沒有快速的回收資源,總之不是由於自己程序錯誤引起的。
解決方式:
- 試圖讓Client主動關閉連接,由於每個Client的並發量都比較低,因而不會產生性能瓶頸
優化Server的系統TCP參數,使其網絡資源的最大值、消耗速度和恢復速度達到平衡;
修改/etc/sysctl.conf
net.ipv4.tcp_tw_recycle = 1 #啟用TIME-WAIT狀態sockets的快速回收 net.ipv4.tcp_tw_reuse = 1 #允許將TIME-WAIT sockets重新用於新的TCP連接,默認為0,表示關閉 #緩存每個連接最新的時間戳,後續請求中如果時間戳小於緩存的時間戳,即視為無效,相應的數據包會被丟棄,啟用這種行為取決於tcp_timestamps和tcp_tw_recycle net.ipv4.tcp_timestamps = 1
CLOSE_WAIT 被動關閉
對方發送一個FIN後,程序自己這邊沒有進一步發送ACK以確認。換句話說就是在對方關閉連接後,程序裏沒有檢測到,或者程序裏本身就已經忘了這個時候需要關閉連接,於是這個資源就一直被程序占用著。
解決辦法:
- 釋放關閉掉異常的連接;
- 修復程序的bug,重新發布;
Keep-Alive
TCP中有一個Keep-Alive的機制可以檢測死連接,LINUX內核包含對keepalive的支持,其中使用了三個參數:tcp_keepalive_time(開啟keepalive的閑置時長)tcp_keepalive_intvl(keepalive探測包的發送
間隔)和tcp_keepalive_probes(如果對方不予應答,探測包的發送次數);如此服務端會隔斷時間發送個探測包給客戶端,可以是多次,如果在超出設置閑置時長,內核會關閉這個連接。
客戶端主動發心跳
通過程序設置最大連接時長,如果客戶端在這段時間內沒有發送過數據,則關閉釋放這個連接。
解決關閉問題
TIME_WAIT 倒是沒有出現過, CLOSE_WAIT狀態總會出現。
就看看文檔,swoole有這些設置,當前使用的是TCP的keep-alive檢測,只需改配置即可:
...
//檢查死鏈接 使用操作系統提供的keepalive機制來踢掉死鏈接
'open_tcp_keepalive'=>1,
'tcp_keepidle'=> 1*60, //連接在n秒內沒有數據請求,將開始對此連接進行探測
'tcp_keepcount' => 3, //探測的次數,超過次數後將close此連接
'tcp_keepinterval' => 0.5*60, //探測的間隔時間,單位秒
...
我設置的周期比較短,方便測試。
設置了這些看似穩定了,卻還是會出現CLOSE_WAIT,後來查了日誌,發生錯誤中斷了,大概意思,代碼中出現exit、die,顯然常駐內存的swoole不支持這些,會立馬中斷程序。所以改些這些代碼,
剛開始借助YII2.0寫的,框架源碼的問題,所以swoole這塊服務需要單獨出來,嗯。。。所以索性直接自己擼個。現在看來,服務跑起來穩定多了,一直沒掛呢。
貼下臨時地址:http://47.99.189.105:91/
系統監控及優化
系統很簡單,但是作為研究,應該更透徹點。
我們的系統如何監控?如果說系統崩潰怎麽辦?能支撐多大並發?高並發下如何保持系統穩定。。。 一個高性能的即時通訊是如何架構的?
額,留待以後再研究下補充。
參考資料:
https://juejin.im/post/5c3b21e4e51d455231347349
剖析下聊天室