1. 程式人生 > >php socket select IO複用

php socket select IO複用

此篇部落格是接著上篇php socekt阻塞模型PHP程式碼(php socket IO阻塞方式的Server/Client)的進階,IO阻塞模型只能是同一個時刻只能由一個客戶端進行訪問,除非利用多程序或多執行緒才能達到多個使用者併發訪問的,因涉及到多程序和多執行緒,暫時跳過,

此片為linux的IO操作的5大模型第三種模型:IO複用,而IO複用又有多種方式實現,常見的如select、poll、epoll函式。這幾個函式也會使程序阻塞,但是和阻塞I/O所不同的的,這些函式可以同時阻塞多個I/O操作。而且可以同時對多個讀操作,多個寫操作的I/O函式進行檢測,直到有資料可讀或可寫時,才真正呼叫I/O操作函式,這些定義網上資料都很多,我這就不一一描述,如有需要可參考:

socket阻塞與非阻塞,同步與非同步

下面是socket IO複用 select 模型程式碼PHP 的程式碼描述,講述如何使用PHP程式碼實現select模型。其中也對socket_select的作用,進行自我總結

select_server.php

<?php
/**
 * server.php.
 * User: lvfk
 * Date: 2017/12/1 0001
 * Time: 16:47
 * Desc:
 */
set_time_limit(0);
class SelectSocketServer
{
    private static $socket;
    private static $timeout = 60;
    private static $maxconns = 1024;
    private static $connections = array();
    function __construct($port)
    {
        global $errno, $errstr;
        if ($port < 1024) {
            die("Port must be a number which bigger than 1024\n");
        }

        $socket = socket_create_listen($port);
        if (!$socket) die("Listen $port failed");

        socket_set_nonblock($socket); // 非阻塞
        while (true)
        {
            $readfds = array_merge(self::$connections, array($socket));
            $writefds = array();
            // 選擇一個連線,獲取讀、寫連線通道
            $e = NULL;

            /*
             * socket_select是阻塞,有資料請求才處理,否則一直阻塞
             * 此處$readfds會讀取到當前活動的連線
             * 比如執行socket_select前的資料如下(描述socket的資源ID):
             * $socket = Resource id #4
             * $readfds = Array
             *       (
             *           [0] => Resource id #5 //客戶端1
             *           [1] => Resource id #4 //server繫結的埠的socket資源
             *       )
             * 呼叫socket_select之後,此時有兩種情況:
             * 情況一:如果是新客戶端2連線,那麼 $readfds = array([1] => Resource id #4),此時用於接收新客戶端2連線
             * 情況二:如果是客戶端1(Resource id #5)傳送訊息,那麼$readfds = array([1] => Resource id #5),使用者接收客戶端1的資料
             *
             * 通過以上的描述可以看出,socket_select有兩個作用,這也是實現了IO複用
             * 1、新客戶端來了,通過 Resource id #4 介紹新連線,如情況一
             * 2、已有連線傳送資料,那麼實時切換到當前連線,接收資料,如情況二
            */
            if (socket_select($readfds, $writefds, $e, self::$timeout))
            {
                // 如果是當前服務端的監聽連線
                if (in_array($socket, $readfds)) {
                    echo "socket_accept\n";
                    // 接受客戶端連線
                    $newconn = socket_accept($socket);
                    $i = (int) $newconn;
                    $reject = '';
                    if (count(self::$connections) >= self::$maxconns) {
                        $reject = "Server full, Try again later.\n";
                    }
                    // 將當前客戶端連線放入 socket_select 選擇
                    self::$connections[$i] = $newconn;
                    // 輸入的連線資源快取容器
                    $writefds[$i] = $newconn;
                    // 連線不正常
                    if ($reject) {
                        socket_write($writefds[$i], $reject);
                        unset($writefds[$i]);
                        self::close($i);
                    } else {
                        echo "Client $i come.\n";
                    }
                    // remove the listening socket from the clients-with-data array
                    $key = array_search($socket, $readfds);
                    unset($readfds[$key]);
                }
                // 輪循讀通道
                foreach ($readfds as $rfd) {
                    // 客戶端連線
                    $i = (int) $rfd;
                    // 從通道讀取
                    $line = @socket_read($rfd, 2048, PHP_NORMAL_READ);
                    if ($line === false) {
                        // 讀取不到內容,結束連線
                        echo "Connection closed on socket $i.\n";
                        self::close($i);
                        continue;
                    }
                    $tmp = substr($line, -1);
                    if ($tmp != "\r" && $tmp != "\n") {
                        // 等待更多資料
                        continue;
                    }
                    // 處理邏輯
                    $line = trim($line);
                    if ($line == "quit") {
                        echo "Client $i quit.\n";
                        self::close($i);
                        break;
                    }
                    if ($line) {
                        echo "Client $i >>" . $line . "\n";
                        //傳送客戶端
                        socket_write($rfd,  "$i=>$line\n");
                    }
                }

                // 輪循寫通道
                foreach ($writefds as $wfd) {
                    $i = (int) $wfd;
                    socket_write($wfd, "Welcome Client $i!\n");
                }
            }
        }
    }

    function close ($i)
    {
        socket_shutdown(self::$connections[$i]);
        socket_close(self::$connections[$i]);
        unset(self::$connections[$i]);
    }
}
new SelectSocketServer(3000);

select_client.php
<?php
/**
 * client.php.
 * User: lvfk
 * Date: 2017/12/1 0001
 * Time: 17:05
 * Desc:
 */
function debug ($msg)
{
    error_log($msg, 3, '/tmp/socket.log');
}
if ($argv[1]) {

    $socket_client = stream_socket_client('tcp://127.0.0.1:3000', $errno, $errstr, 30);

//	stream_set_timeout($socket_client, 0, 100000);

    if (!$socket_client) {
        die("$errstr ($errno)");
    } else {
        $msg = trim($argv[1]);
        for ($i = 0; $i < 5; $i++) {
            $res = fwrite($socket_client, "$msg($i)\n");
            usleep(100000);
			debug(fread($socket_client, 1024)); // 將產生死鎖,因為 fread 在阻塞模式下未讀到資料時將等待
        }
        fwrite($socket_client, "quit\n"); // add end token
        debug(fread($socket_client, 1024));
        fclose($socket_client);
    }
}
else {

    $phArr = array();
    for ($i = 0; $i < 5; $i++) {
        $phArr[$i] = popen("php ".__FILE__." '{$i}:test'", 'r');
    }
    foreach ($phArr as $ph) {
        pclose($ph);
    }
}