1. 程式人生 > >剖析下聊天室

剖析下聊天室

速度 keepal ima unset 復數 openca 一點 blog nta

由來

環境:PHP7、Swoole、linux

對聊天室有點感興趣,對於網絡協議有一點一知半解,所以決定借助swoole實現個簡單的聊天室,來簡單剖析下原理,知道原理以後就可以考慮用其他語言或者自己造輪子寫個,當然這是後話。

源碼我放置github( https://github.com/WalkingSun/SwooleServer ),有興趣可以借鑒借鑒。

系統設計

即時通訊的網絡通信基於長連接,通信方式有TCP、UDP、socket、websocket等,本次實現是websocket,系統建立常駐內存的websocket服務,客戶端即瀏覽器兩者建立連接通信。通信過程如下:

技術分享圖片

關於客戶端連接websocket服務,本文不做細述,websocket服務的建立借助swoole,需要在服務端的open、recieve、send、close建立回調處理,為了方便我將連接的客戶端信息放入swoole_table(一個基於共享內存和鎖實現的超高性能,並發數據結構)。

代碼僅供參考:

websocket 服務類:

<?php
/**
 * Created by PhpStorm.
 * User: WalkingSun
 * Date: 2018/10/28
 * Time: 15:54
 */

class WsServer
{
    const host = '0.0.0.0';
    const port = '9501';

    public $swoole;

    public $config = ['gcSessionInterval' => 60000];

    public $openCallback;       //open回調

    public $messageCallback;    //message回調

    public $runApp;    //request回調

    public $workStartCallback;       //work回調

    public $finishCallback;     //finish回調

    public $closeCallback;       //close回調

    public $taskCallback;       //task回調

    public function __construct( $host, $port, $mode, $socketType, $swooleConfig=[], $config=[])
    {
        $host = $host?:self::host;
        $port = $port?:self::port;
        $this->swoole = new Swoole_websocket_server($host,$port,$mode,$socketType);
        $this->webRoot = $swooleConfig['document_root'];
        if( !empty($this->config) ) $this->config = array_merge($this->config, $config);
        $this->swoole->set($swooleConfig);

        $this->swoole->on('open',[$this,'onOpen']);
        $this->swoole->on('message',[$this,'onMessage']);
        $this->swoole->on('request',[$this,'onRequest']);
        $this->swoole->on('WorkerStart',[$this,'onWorkerStart']);            //增加work進程
        $this->swoole->on('task',[$this,'onTask']);            //增加task任務進程
        $this->swoole->on('finish',[$this,'onFinish']);
        $this->swoole->on('close',[$this,'onClose']);
    }

    public function run(){
        $this->swoole->start();
    }

    /**
     * 當WebSocket客戶端與服務器建立連接並完成握手後會回調此函數
     * @param $serv swoole_websocket_server 服務對象
     * @param $request swoole_http_server 服務對象
     */
    public function onOpen( swoole_websocket_server $serv,  $request){
        call_user_func_array( $this->openCallback, [ $serv, $request ] );

        //定時器(異步執行)
//        if($request->fd == 1){
//            swoole_timer_tick(2000,function($timer_id){
//                echo time().PHP_EOL;
//            });
//        }
    }

    /**
     *當服務器收到來自客戶端的數據幀時會回調此函數。
     * @param $server  swoole_websocket_server 服務對象
     * @param $frame  swoole_websocket_frame對象,包含了客戶端發來的數據幀信息
     * $frame->fd,客戶端的socket id,使用$server->push推送數據時需要用到
    $frame->data,數據內容,可以是文本內容也可以是二進制數據,可以通過opcode的值來判斷
    $frame->opcode,WebSocket的OpCode類型,可以參考WebSocket協議標準文檔
    $frame->finish, 表示數據幀是否完整,一個WebSocket請求可能會分成多個數據幀進行發送(底層已經實現了自動合並數據幀,現在不用擔心接收到的數據幀不完整)
     */
    public function onMessage(swoole_websocket_server $serv, swoole_websocket_frame $frame ){

        call_user_func_array( $this->messageCallback, [ $serv, $frame ]);

    }

    /**
     * @param $serv  swoole_websocket_server 服務對象
     * @param $fd  連接的文件描述符
     * @param $reactorId  來自那個reactor線程
     * onClose回調函數如果發生了致命錯誤,會導致連接泄漏。通過netstat命令會看到大量CLOSE_WAIT狀態的TCP連接
     * 當服務器主動關閉連接時,底層會設置此參數為-1,可以通過判斷$reactorId < 0來分辨關閉是由服務器端還是客戶端發起的。
     */
    public function onClose( swoole_websocket_server $serv , $fd , $reactorId ){

        call_user_func_array( $this->closeCallback ,[ $serv , $fd , $reactorId ]);

    }

    /**
     * 在task_worker進程內被調用。worker進程可以使用swoole_server_task函數向task_worker進程投遞新的任務。當前的Task進程在調用onTask回調函數時會將進程狀態切換為忙碌,這時將不再接收新的Task,當onTask函數返回時會將進程狀態切換為空閑然後繼續接收新的Task。
     * @param $serv  swoole_websocket_server 服務對象
     * @param $task_id int 任務id,由swoole擴展內自動生成,用於區分不同的任務。$task_id和$src_worker_id組合起來才是全局唯一的,不同的worker進程投遞的任務ID可能會有相同
     * @param $src_worker_id int 來自於哪個worker進程
     * @param $data mixed 任務的內容
     */
    public function onTask(swoole_server $serv,  $task_id,  $src_worker_id,  $data){

        call_user_func_array( $this->taskCallback , [ $serv, $task_id, $src_worker_id, $data ]);

//        sleep(10);
//        onTask函數中 return字符串,表示將此內容返回給worker進程。worker進程中會觸發onFinish函數,表示投遞的task已完成。
//        return "task {$src_worker_id}-{$task_id} success";
    }

    /**
     * 當worker進程投遞的任務在task_worker中完成時,task進程會通過swoole_server->finish()方法將任務處理的結果發送給worker進程。
     * @param $serv  swoole_websocket_server 服務對象
     * @param $task_id int  任務id
     * @param $data string  task任務處理的結果內容
     * task進程的onTask事件中沒有調用finish方法或者return結果,worker進程不會觸發onFinish
    執行onFinish邏輯的worker進程與下發task任務的worker進程是同一個進程
     */
    public function onFinish(swoole_server $serv,  $task_id,  $data){

        call_user_func_array( $this->finishCallback ,[ $serv,$task_id,$data]);
//        echo $data;
//        return $data;
    }

    public function onRequest( $request, $response ){
        call_user_func_array( $this->runApp, [ $request, $response ]);
    }

    public function onWorkerStart( $server,  $worker_id ){
        call_user_func_array( $this->workStartCallback , [$server,  $worker_id]);
    }
}

起服務和回調設置:

   class SwooleController extends BasicController{

       public $host;

       public $port;

       public  $swoole_config=[];

       public static $table;


       public function actionStart(){

           $config = include __DIR__ . '/../config/console.php';

           if( isset($config['swoole']['log_file']) ) $this->swoole_config['log_file'] = $config['swoole']['log_file'];

           if( isset($config['swoole']['pid_file']) ) $this->swoole_config['pid_file'] = $config['swoole']['pid_file'];

           $this->swoole_config = array_merge(
               [
                   'document_root' => $config['swoole']['document_root'],
                   'enable_static_handler'     => true,
   //                'daemonize'=>1,
                   'worker_num'=>4,
                   'max_request'=>2000,
   //            'task_worker_num'=>100,
                   //檢查死鏈接  使用操作系統提供的keepalive機制來踢掉死鏈接
                   'open_tcp_keepalive'=>1,
                   'tcp_keepidle'=> 1*60,     //連接在n秒內沒有數據請求,將開始對此連接進行探測
                   'tcp_keepcount' => 3,       //探測的次數,超過次數後將close此連接
                   'tcp_keepinterval' => 0.5*60,     //探測的間隔時間,單位秒

                   //swoole實現的心跳機制,只要客戶端超過一定時間沒發送數據,不管這個連接是不是死鏈接,都會關閉這個連接
   //            'heartbeat_check_interval' => 10*60,        //每m秒偵測一次心跳
   //            'heartbeat_idle_time' => 30*60,            //一個TCP連接如果在n秒內未向服務器端發送數據,將會被切斷
               ],$this->swoole_config
           );

           $this->host = $config['swoole']['host'];
           $this->port = $config['swoole']['port'];
           $swooleServer = new WsServer(  $this->host,$this->port,$config['swoole']['mode'],$config['swoole']['socketType'],$this->swoole_config,$config);

           //連接信息保存到swoole_table
           self::$table = new \swoole_table(10);
           self::$table->column('username',\Swoole\Table::TYPE_STRING, 10);
           self::$table->column('avatar',\Swoole\Table::TYPE_STRING, 255);
           self::$table->column('msg',\Swoole\Table::TYPE_STRING, 255);
           self::$table->column('fd',\Swoole\Table::TYPE_INT, 6);
           self::$table->create();

           $swooleServer->openCallback = function( $server , $request ){
               echo "server handshake with fd={$request->fd}\n";
           };

           $swooleServer->runApp = function( $request , $response ) use($config,$swooleServer){

               //全局變量設置及app.log
               $this->globalParam( $request );
               $_SERVER['SERVER_SWOOLE'] = $swooleServer;

               //記錄日誌
               $apiData = $_SERVER;
               unset($apiData['SERVER_SWOOLE']);
               Common::addLog( $config['log'] , ($apiData) );

               //解析路由
               $r = $_GET['r'];
               $r = $r?:( isset($config['defaultRoute'])?$config['defaultRoute']:'index/index');
               $params = explode('/',$r);
               $controller = __DIR__.'/../controllers/'.ucfirst($params[0]).'Controller.php';

               $result = '';
               if( file_exists( $controller ) ){
                   require_once $controller;
                   $class = new ReflectionClass(ucfirst($params[0]).'Controller');
                   if( $class->hasMethod( 'action'.ucfirst($params[1]) ) ){
                       $instance  = $class->newInstanceArgs();
                       $method = $class->getmethod('action'.ucfirst($params[1])); // 獲取類中方法
                       ob_start();
                       $method->invoke($instance);    // 執行方法
                       $result = ob_get_contents();
                       ob_clean();
                   }else{
                       $result = 'NOT FOUND!';
                   }
               }else{
                   $result = "$controller not exist!";
               }

               $response->end( $result );
           };

           $swooleServer->workStartCallback = function( $server,  $worker_id ){

           };

           $swooleServer->taskCallback = function( $server , $request ){
               //發送通知或者短信、郵件等

           };

           $swooleServer->finishCallback = function( $serv,  $task_id,  $data ){

   //            return $data;
           };

           $swooleServer->messageCallback = function( $server,  $iframe  ){
               //記錄客戶端信息
               echo "Client connection fd {$iframe->fd} ".PHP_EOL;

               $data = json_decode( $iframe->data ,1 );

               if( !empty($data['token']) ){
                   if( $data['token']== 'simplechat_open' ){
                       if( !self::$table->exist($iframe->fd) ){
                           $user = array_merge($data,['fd'=>$iframe->fd]);
                           self::$table->set($iframe->fd,$user);

                           //發送連接用戶信息
                           foreach (self::$table as $v){
                               if($v['fd']!=$iframe->fd){
                                   $pushData = array_merge($user,['action'=>'connect']);
                                   $server->push($v['fd'],json_encode($pushData));
                               }
                           }
                       }

                   }

                   if( $data['token']=='simplechat' ){
                       //查詢所有連接用戶,分發消息

                       foreach (self::$table as $v){
                           if($v['fd']!=$iframe->fd){
                               $pushData = ['username'=>$data['username'],'avatar'=>$data['avatar'],'time'=>date('H:i'),'data'=>$data['data'],'action'=>'send'];
                               $server->push($v['fd'],json_encode($pushData));
                           }
                       }
                   }
               }

               //接受消息,對消息進行解析,發送給組內人其他人

           };

           $swooleServer->closeCallback = function(  $server,  $fd,  $reactorId ){

               if(  self::$table->exist($fd) ){
                   //退出房間處理
                   self::$table->del($fd);
                   foreach (self::$table as $v){
                       $pushData = ['fd'=>$fd,'username'=>'','avatar'=>'','time'=>date('H:i'),'data'=>'','action'=>'remove'];
                       $server->push($v['fd'],json_encode($pushData));
                   }
               }

               echo  "Client close fd {$fd}".PHP_EOL;
           };

           $this->stdout("server is running, listening {$this->host}:{$this->port}" . PHP_EOL);
           $swooleServer->run();
       }


       public function actionStop(){
           $r = $this->sendSignal( SIGTERM );
           if( $r ){
               $this->stdout("server is stopped, stop listening {$this->host}:{$this->port}" . PHP_EOL);
           }
       }

       public function actionRestart(){
           $this->sendSignal(SIGTERM);     //向主進程發送SIGTERM實現關閉服務器
           $this->actionStart();
       }

       public function actionReload(){
           $this->sendSignal(SIGUSR1);   //向主進程/管理進程發送SIGUSR1信號,將平穩地restart所有Worker進程
       }

   }

起了服務,客戶端就可以連接通信了。

心跳檢測

起了半天後服務常會斷掉,查看監聽端口進程狀態,服務器輸入:

$ netstat -anp |grep 9501

發現大量CLOSE_WAIT狀態,常用狀態有 ESTABLISHED 表示正在通信,TIME_WAIT 表示主動關閉,CLOSE_WAIT 表示被動關閉。

TIME_WAIT和CLOSE_WAIT兩種狀態如果一直被保持,意味著對應數目的通道就一直被占用,且“占著茅坑不使勁”,一旦句柄數達到上限,新的請求就無法處理。而且因為swoole是master-worker模式,
基本上http、tcp通信都是在worker進程,CLOSE_WAIT一直在,子進程將一直無法釋放,隨著時間的推移CLOSE_WAIT狀態的進程越來越多,阻礙新的連接進來,websocket服務不可用。

主動關閉 和 被動關閉
TCP關閉 四次揮手過程如下:

技術分享圖片

揮手流程:

1、 客戶端是調用函數close(),這時,客戶端會發送一個FIN給服務器。

2、 服務器收到FIN,關閉套接字讀通道,並將自己狀態設置為CLOSE_WAIT(表示被動關閉),
並返回一個ACK給客戶端。

3、 客戶端收到ACK,關閉套接字寫通道
接下來,服務器會調用close():

1、 服務器close(),發送一個FIN到客戶端。

2、 客戶端收到FIN,關閉讀通道,並將自己狀態設置成TIME_WAIT,發送一個ACK給服務器。

3、 服務器收到ACK,關閉寫通道,並將自己狀態設置為CLOSE。

4、 客戶端等待兩個最大數據傳輸時間,然後將自己狀態設置成CLOSED。

由此我們看到CLOSE-WAIT 狀態,TIME-WAIT 狀態 產生的過程,產生的原因是復雜的,比如說網絡通信中斷、用戶手機網絡切換wifi網絡、網絡通信丟包等,故此tcp揮手過程會出現中斷,繼而
產生這些關閉狀態。

為了解決這些占用連接數的異常連接,需要檢測連接是否是活動的,對於死連接我們需要釋放關閉它。

TIME_WAIT 主動關閉

主動關閉的一方在發送最後一個ACK包後,無論對方是否收到都會進入狀態,等待2MSL(Maximum Segment Lifetime數據包的最大生命周期,是一個數據包能在互聯網上生存的最長時間,若超過這個時間則該數據包將會消失在網絡中)
的時間,才會釋放網絡資源。

TIME_WAIT狀態的存在主要有兩個原因:

1)可靠地實現TCP全雙工連接的終止。在關TCP閉連接時,最後的ACK包是由主動關閉方發出的,如果這個ACK包丟失,則被動關閉方將重發FIN包,因此主動方必須維護狀態信息,以允許它重發這個
ACK包。如果不維持這個狀態信息,那麽主動方將回到CLOSED狀態,並對被動方重發的FIN包響應RST包,而被動關閉方將此包解釋成一個錯誤。因而,要實現TCP全雙工連接的正常終止,必須能夠處
理四次握手協議中任意一個包丟失的情況,主動關閉方必須維持狀態信息進入TIME_WAIT狀態。

2)確保迷路重復數據包在網絡中消失,防止上一次連接中的包迷路後重新出現,影響新連接。TCP數據包可能由於路由器異常而迷路,在迷路期間,數據包發送方可能因超時而重發這個包,迷路的
數據包在路由器恢復後也會被送到目的地,這個迷路的數據包就稱為Lost Duplicate。在關閉一個TCP連接後,如果馬上使用相同的IP地址和端口建立新的TCP連接,那麽有可能出現前一個連接的迷
路重復數據包在前一個連接關閉後再次出現,影響新建立的連接。為了避免這一情況,TCP協議不允許使用處於TIME_WAIT狀態的連接的IP和端口啟動一個新連接,只有經過2MSL的時間,確保上一次
連接中所有的迷路重復數據包都已消失在網絡中,才能安全地建立新連接。

如果Server主動關閉連接,同樣會有大量的連接在關閉後處於TIME_WAIT狀態,等待2MSL的時間後才能釋放網絡資源。對於並發連接,出現大量等待連接,新的連接進不來,會降低系統性能。

time_wait問題可以通過調整內核參數和適當的設置web服務器的keep-Alive值來解決。因為time_wait是自己可控的,要麽就是對方連接的異常,要麽就是自己沒有快速的回收資源,總之不是由於自己程序錯誤引起的。

解決方式:

  • 試圖讓Client主動關閉連接,由於每個Client的並發量都比較低,因而不會產生性能瓶頸
  • 優化Server的系統TCP參數,使其網絡資源的最大值、消耗速度和恢復速度達到平衡;

    修改/etc/sysctl.conf

    net.ipv4.tcp_tw_recycle = 1       #啟用TIME-WAIT狀態sockets的快速回收
    
    net.ipv4.tcp_tw_reuse = 1         #允許將TIME-WAIT sockets重新用於新的TCP連接,默認為0,表示關閉
    
    #緩存每個連接最新的時間戳,後續請求中如果時間戳小於緩存的時間戳,即視為無效,相應的數據包會被丟棄,啟用這種行為取決於tcp_timestamps和tcp_tw_recycle
    net.ipv4.tcp_timestamps = 1

CLOSE_WAIT 被動關閉

對方發送一個FIN後,程序自己這邊沒有進一步發送ACK以確認。換句話說就是在對方關閉連接後,程序裏沒有檢測到,或者程序裏本身就已經忘了這個時候需要關閉連接,於是這個資源就一直被程序占用著。

解決辦法:

  • 釋放關閉掉異常的連接;
  • 修復程序的bug,重新發布;

Keep-Alive

TCP中有一個Keep-Alive的機制可以檢測死連接,LINUX內核包含對keepalive的支持,其中使用了三個參數:tcp_keepalive_time(開啟keepalive的閑置時長)tcp_keepalive_intvl(keepalive探測包的發送
間隔)和tcp_keepalive_probes(如果對方不予應答,探測包的發送次數);如此服務端會隔斷時間發送個探測包給客戶端,可以是多次,如果在超出設置閑置時長,內核會關閉這個連接。

客戶端主動發心跳

通過程序設置最大連接時長,如果客戶端在這段時間內沒有發送過數據,則關閉釋放這個連接。

解決關閉問題

TIME_WAIT 倒是沒有出現過, CLOSE_WAIT狀態總會出現。

就看看文檔,swoole有這些設置,當前使用的是TCP的keep-alive檢測,只需改配置即可:

   ...
    //檢查死鏈接  使用操作系統提供的keepalive機制來踢掉死鏈接
                'open_tcp_keepalive'=>1,
                'tcp_keepidle'=> 1*60,     //連接在n秒內沒有數據請求,將開始對此連接進行探測
                'tcp_keepcount' => 3,       //探測的次數,超過次數後將close此連接
                'tcp_keepinterval' => 0.5*60,     //探測的間隔時間,單位秒
   ...

我設置的周期比較短,方便測試。

設置了這些看似穩定了,卻還是會出現CLOSE_WAIT,後來查了日誌,發生錯誤中斷了,大概意思,代碼中出現exit、die,顯然常駐內存的swoole不支持這些,會立馬中斷程序。所以改些這些代碼,
剛開始借助YII2.0寫的,框架源碼的問題,所以swoole這塊服務需要單獨出來,嗯。。。所以索性直接自己擼個。現在看來,服務跑起來穩定多了,一直沒掛呢。

貼下臨時地址:http://47.99.189.105:91/

系統監控及優化

系統很簡單,但是作為研究,應該更透徹點。

我們的系統如何監控?如果說系統崩潰怎麽辦?能支撐多大並發?高並發下如何保持系統穩定。。。 一個高性能的即時通訊是如何架構的?

額,留待以後再研究下補充。

參考資料:

https://juejin.im/post/5c3b21e4e51d455231347349

剖析下聊天室