1. 程式人生 > >swoole深入學習 2. tcp Server和tcp Client

swoole深入學習 2. tcp Server和tcp Client

握手 star 操作 rand 表示 ews bar pan 個數

這節來學習Swoole最基礎的ServerClient。會通過創建一個tcp Server來講解。

server

<?php
class Server
{
    private $serv;

    public function __construct()
    {
        $this->serv = new Swoole\Server(‘127.0.0.1‘, 9501);

        //當啟動一個Swoole應用時,一共會創建2 + n + m個進程,2為一個Master進程和一個Manager進程,其中n為Worker進程數。m為TaskWorker進程數。

        //默認如果不設置,swoole底層會根據當前機器有多少CPU核數,啟動對應數量的Reactor線程和Worker進程。我機器為4核的。Worker為4。TaskWorker為0。 

        //下面我來設置worker_num = 10。看下啟動了多少個進程

        $this->serv->set([
            ‘worker_num‘ => 10,
            //‘task_worker_num‘ => 2,
            ‘deamonize‘ => true,
        ]);

        //啟動10個work,總共12個進程。
        /*
        ?  Event git:(master) pstree |grep server.php
    |   \-+= 54172 yangyi php server.php  #Master進程
    |     \-+- 54173 yangyi php server.php  # Manager 進程
    |       |--- 54174 yangyi php server.php  #Work 進程
    |       |--- 54175 yangyi php server.php
    |       |--- 54176 yangyi php server.php
    |       |--- 54177 yangyi php server.php
    |       |--- 54178 yangyi php server.php
    |       |--- 54179 yangyi php server.php
    |       |--- 54180 yangyi php server.php
    |       |--- 54181 yangyi php server.php
    |       |--- 54182 yangyi php server.php
    |       \--- 54183 yangyi php server.php
         *
         */

        //增加新的監控的ip:post:mode
        $this->serv->addlistener("::1", 9500, SWOOLE_SOCK_TCP);

        //監聽事件
        /*
         *
         * - onStart
         * - onShutdown
         * - onWorkerStart
         * - onWorkerStop
         * - onTimer
         * - onConnect
         * - onReceive
         * - onClose
         * - onTask
         * - onFinish
         * - onPipeMessage
         * - onWorkerError
         * - onManagerStart
         * - onManagerStop
         */

        $this->serv->on(‘Start‘, array($this, ‘onStart‘));
        $this->serv->on(‘Connect‘, array($this, ‘onConnect‘));
        $this->serv->on(‘Receive‘, array($this, ‘onReceive‘));
        $this->serv->on(‘Close‘, array($this, ‘onClose‘));

        //master進程啟動後, fork出Manager進程, 然後觸發ManagerStart
        $this->serv->on(‘ManagerStart‘, function (\swoole_server $server){
            echo "On manager start.";
        });

        //manager進程啟動,啟動work進程的時候調用 workid表示第幾個id, 從0開始。
        $this->serv->on(‘WorkerStart‘, function($serv, $workerId) {
            echo $workerId . ‘---‘;
        });

        //當一個work進程死掉後,會觸發
        $this->serv->on(‘WorkerStop‘, function() {
            echo ‘--stop‘;
        });

        //啟動
        $this->serv->start();
    }

    //啟動server時候會觸發。
    public function onStart( $serv ) {
        echo "Start\n";
    }

    //client連接成功後觸發。
    public function onConnect( $serv, $fd, $from_id ) {
        $a = $serv->send( $fd, "Hello {$fd}!" );
        //var_dump($a); //成功返回true
    }

    //接收client發過來的請求
    public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
        echo "Get Message From Client {$fd}:{$data}\n";
        //$serv->send($fd, $data);
        //關閉該work進程
        //$serv->stop();
        //宕機
        //$serv->shutdown();

        //主動關閉 客戶端連接,也會觸發onClose事件
        //$serv->close($fd);

            $serv->send($fd, $data);

            //$list = $serv->connection_list();
//           foreach ($list as $fd) {
//               $serv->send($fd, $data);
//           }
        }
    }

    //客戶端斷開觸發
    public function onClose( $serv, $fd, $from_id ) {
        echo "Client {$fd} close connection\n";
    }

}

//輸出swoole的版本
echo swoole_version(); // 1.9.0

//輸出本機iP
var_dump(swoole_get_local_ip()); 

/**
    array(1) {
      ‘en4‘ =>
      string(13) "172.16.71.149"
}
*/


// 啟動服務器
$server = new Server();

我們啟動服務端server:

$ php server.php
0--start
2--start
Start
1--start
3--start
4--start
5--start
6--start
manager start.
7--start
9--start
8--start

我們來分析整個server 啟動的步驟:

  1. 啟動php server.php後,當前進程fork出Master進程,然後退出。
  2. Master進程啟動成功之後,fork出Manager進程,並觸發OnManagerStart事件。
  3. Manager進程啟動成功時候,fork出Worker進程,並觸發OnWorkerStart事件。

同步client

server端好了,那麽就會需要client端來連接,swoole裏面client分為同步和異步,先來一個同步clent客戶端。

<?php

// sync 同步客戶端
class client
{
    private $client;

    public function __construct()
    {
        $this->client = new Swoole\Client(SWOOLE_SOCK_TCP | SWOOLE_KEEP);
        $this->client->connect(‘127.0.0.1‘, 9501, 1);
    }

    public function connect()
    {
        //fwrite(STDOUT, "請輸入消息:");
        //$msg = trim(fgets(STDIN));
        $msg = rand(1,12);

        //發送給消息到服務端
        $this->client->send( $msg );

        //接受服務端發來的信息
        $message = $this->client->recv();
        echo "Get Message From Server:{$message}\n";

        //關閉客戶端
        $this->client->close();

    }
}
$client = new Client();
$client->connect();

同步client是同步阻塞的。一整套connect->send()->rev()->close()是同步進行的。

所以,如果是大量的循環數據,就不適合同步client了:

比如下面:

<?php


// sync 同步客戶端
class client
{
    private $client;

    public function __construct()
    {
        var_dump(swoole_get_local_ip());
        $this->client = new Swoole\Client(SWOOLE_SOCK_TCP | SWOOLE_KEEP);
        $this->client->connect(‘127.0.0.1‘, 9501, 1);

        $i = 0;
        while ($i < 100) {
            $this->client->send($i."\n");
            $message = $this->client->recv();
            echo "Get Message From Server:{$message}\n";
            $i++;
        }
    }
}
$client = new Client();

打印的結果就是順序執行。要是想要異步了。

異步client

<?php

//異步客戶端

$client = new Swoole\Client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
$client->on("connect", function($cli) {

    var_dump($cli->isConnected()); // true
    var_dump($cli->getsockname()); //[‘port‘ => 57305, ‘host‘=> ‘127.0.0.1‘]
    var_dump($cli->sock); // 5

    $i = 0;
    while ($i < 100) {
        $cli->send($i."\n");
        $i++;
    }
    //關閉
    //$cli->close();
});

$client->on("receive", function($cli, $data){
    echo "Receive: $data";
});

$client->on("error", function(swoole_client $cli){
    echo "error\n" . $cli->errCode;
});

$client->on("close", function(swoole_client $cli){
    echo "Connection close\n";
});

$client->connect(‘127.0.0.1‘, 9501);

這樣就是一個異步的client了,處理更快,但是只支持php的cli模式

server與client交互

總結一下client與server的連接過程:

  1. Client主動Connect的時候,Client實際上是與Master進程中的某個Reactor線程發生了連接。
  2. 當TCP的三次握手成功了以後,由這個Reactor線程將連接成功的消息告訴Manager進程,再由Manager進程轉交給Worker進程。
  3. 在這個Worker進程中觸發了OnConnect的方法。
  4. 當Client向Server發送了一個數據包的時候,首先收到數據包的是Reactor線程,同時Reactor線程會完成組包,再將組好的包交給Manager進程,由Manager進程轉交給Worker。
  5. 此時Worker進程觸發OnReceive事件。
  6. 如果在Worker進程中做了什麽處理,然後再用Send方法將數據發回給客戶端時,數據則會沿著這個路徑逆流而上。

關於上面說到的幾個進程,解釋下:

Master進程是一個多線程進程,其中有一組非常重要的線程,叫做Reactor線程(組),每當一個客戶端連接上服務器的時候,都會由Master進程從已有的Reactor線程中,根據一定規則挑選一個,專門負責向這個客戶端提供維持鏈接、處理網絡IO與收發數據等服務。

而Manager進程,某種意義上可以看做一個代理層,它本身並不直接處理業務,其主要工作是將Master進程中收到的數據轉交給Worker進程,或者將Worker進程中希望發給客戶端的數據轉交給Master進程進行發送。另外,Manager進程還負責監控Worker進程,如果Worker進程因為某些意外掛了,Manager進程會重新拉起新的Worker進程,有點像Supervisor的工作。

Worker進程了,顧名思義,Worker進程其實就是處理各種業務工作的進程,Manager將數據包轉交給Worker進程,然後Worker進程進行具體的處理,並根據實際情況將結果反饋給客戶端。

task_worker

在swoole中work進程分為EventWorker和TaskWorker,對應的配置文件設置為:

$this->serv->set([
    ‘worker_num‘ => 10,  #EventWorker
    ‘task_worker_num‘ => 2,  #TaskWorker
    ‘deamonize‘ => true,
]);

worker是基於event觸發,而task則是manager直接生成的子進程。那麽他們有什麽區別呢?

共同點是:他們都是最底層負責處理業務的進程。

Swoole的業務邏輯部分是同步阻塞運行的,如果遇到一些耗時較大的操作,例如訪問數據庫、廣播消息等,就會影響服務器的響應速度。因此Swoole提供了Task功能,將這些耗時操作放到另外的進程去處理,當前woker進程繼續執行後面的邏輯。運行Task,需要在swoole服務中配置參數task_worker_num,即可開啟task功能。此外,必須給swoole_server綁定兩個回調函數:onTaskonFinish。這兩個回調函數分別用於執行Task任務和處理Task任務的返回結果。

先來寫一個demo,來如何用 taskWoker來處理業務。

taskServer.php

<?php
/**
 * Created by PhpStorm.
 * User: yangyi
 * Date: 2016/12/7
 * Time: 16:16
 */

class taskServer
{
    private $serv;

    /**
     * [__construct description]
     * 構造方法中,初始化 $serv 服務
     */
    public function __construct() {
        $this->serv = new Swoole\Server(‘0.0.0.0‘, 9501);
        //初始化swoole服務
        $this->serv->set(array(
            ‘worker_num‘  => 8,
            ‘daemonize‘   => false, //是否作為守護進程,此配置一般配合log_file使用
            ‘max_request‘ => 1000,
            ‘log_file‘    => ‘./swoole.log‘,
            ‘task_worker_num‘ => 8
        ));

        //設置監聽
        $this->serv->on(‘Start‘, array($this, ‘onStart‘));
        $this->serv->on(‘Connect‘, array($this, ‘onConnect‘));
        $this->serv->on("Receive", array($this, ‘onReceive‘));
        $this->serv->on("Close", array($this, ‘onClose‘));
        $this->serv->on("Task", array($this, ‘onTask‘));
        $this->serv->on("Finish", array($this, ‘onFinish‘));

        //開啟
        $this->serv->start();
    }

    public function onStart($serv) {
        echo SWOOLE_VERSION . " onStart\n";
    }

    public function onConnect($serv, $fd) {
        echo $fd."Client Connect.\n";
    }

    public function onReceive($serv, $fd, $from_id, $data) {
        echo "Get Message From Client {$fd}:{$data}\n";
        // send a task to task worker.
        $param = array(
            ‘fd‘ => $fd
        );
        // start a task
        $serv->task(json_encode($param));

        echo "Continue Handle Worker\n";
    }

    public function onClose($serv, $fd) {
        echo "Client Close.\n";
    }

    public function onTask($serv, $task_id, $from_id, $data) {
        echo "This Task {$task_id} from Worker {$from_id}\n";
        echo "Data: {$data}\n";
        for($i = 0 ; $i < 200 ; $i ++ ) {
            sleep(1);
            echo "Task {$task_id} Handle {$i} times...\n";
        }
        $fd = json_decode($data, true);
        $serv->send($fd[‘fd‘] , "Data in Task {$task_id}");
        return "Task {$task_id}‘s result";
    }

    public function onFinish($serv,$task_id, $data) {
        echo "Task {$task_id} finish\n";
        echo "Result: {$data}\n";
    }
}

$server = new taskServer();

taskClient.php 異步的客戶端

<?php
/**
 * Created by PhpStorm.
 * User: yangyi
 * Date: 2016/12/7
 * Time: 16:18
 */
class taskClient
{
    private $client;

    public function __construct() {
        $this->client = new Swoole\Client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
        $this->client->on(‘Connect‘, array($this, ‘onConnect‘));
        $this->client->on(‘Receive‘, array($this, ‘onReceive‘));
        $this->client->on(‘Close‘, array($this, ‘onClose‘));
        $this->client->on(‘Error‘, array($this, ‘onError‘));
    }

    public function connect() {
        if(!$fp = $this->client->connect("127.0.0.1", 9501 , 1)) {
            echo "Error: {$fp->errMsg}[{$fp->errCode}]\n";
            return;
        }
    }

    //connect之後,會調用onConnect方法
    public function onConnect($cli) {
        fwrite(STDOUT, "Enter Msg:");
        swoole_event_add(STDIN,function(){
            fwrite(STDOUT, "Enter Msg:");
            $msg = trim(fgets(STDIN));
            $this->send($msg);
        });
    }

    public function onClose($cli) {
        echo "Client close connection\n";
    }

    public function onError() {

    }

    public function onReceive($cli, $data) {
        echo "Received: ".$data."\n";
    }

    public function send($data) {
        $this->client->send($data);
    }

    public function isConnected($cli) {
        return $this->client->isConnected();
    }

}

$client = new taskClient();
$client->connect();

運行一下:

php taskServer.php
php taskClient.php

服務端打印:

$ php task_server.php
1.9.0 onStart
1Client Connect.
Get Message From Client 1:12345
Continue Handle Worker
This Task 0 from Worker 3
Data: {"fd":1}
Task 0 Handle 0 times...
Task 0 Handle 1 times...
Task 0 finish
Result: Task 0‘s result

客戶端打印:

$ php task_client.php
Enter Msg:12345
Enter Msg:Received: Data in Task 0

這裏面有幾點需要註意:

1. 運行Task,必須要在swoole服務中配置參數task_worker_num,此外,必須給swoole_server綁定兩個回調函數:onTaskonFinish
2. onTash 要return 數據
3. onFinish 會接收到onTash的數據,標記成完成。
4. swoole_event_add 把輸入綁定成事件,這個後續將,這樣client就可以連續的多次輸入。

swoole的架構

上面說了這麽,圖表總結一下swoole結構:

swoole采用 多線程Reactor+多進程Worker

技術分享圖片

swoole的處理連接流程圖如下:

技術分享圖片

當請求到達時,swoole是這樣處理的:

請求到達 Main Reactor
|
|
Main Reactor根據Reactor的情況,將請求註冊給對應的Reactor
(每個Reactor都有epoll。用來監聽客戶端的變化)
|
|
客戶端有變化時,交給worker來處理
|
|
worker處理完畢,通過進程間通信(比如管道、共享內存、消息隊列)發給對應的reactor。
|
|
reactor將響應結果發給相應的連接
|
|
請求處理完成

因為reactor基於epoll,所以每個reactor可以處理無數個連接請求。 如此,swoole就輕松的處理了高並發。

參考資料:

http://rango.swoole.com/archives/305

https://github.com/szyhf/swoole_study/blob/master/Swoole%E7%9A%84%E8%BF%9B%E7%A8%8B%E6%A8%A1%E5%9E%8B.md

http://happyliu.blog.51cto.com/501986/1574923

https://segmentfault.com/a/1190000007614502

swoole深入學習 2. tcp Server和tcp Client