swoole深入學習 2. tcp Server和tcp Client
這節來學習Swoole最基礎的Server
和Client
。會通過創建一個tcp Server來講解。
server
<?php
class Server
{
private $serv;
public function __construct()
{
$this->serv = new Swoole\Server(‘127.0.0.1‘, 9501);
//當啟動一個Swoole應用時,一共會創建2 + n + m個進程,2為一個Master進程和一個Manager進程,其中n為Worker進程數。m為TaskWorker進程數。
//默認如果不設置,swoole底層會根據當前機器有多少CPU核數,啟動對應數量的Reactor線程和Worker進程。我機器為4核的。Worker為4。TaskWorker為0。
//下面我來設置worker_num = 10。看下啟動了多少個進程
$this->serv->set([
‘worker_num‘ => 10,
//‘task_worker_num‘ => 2,
‘deamonize‘ => true,
]);
//啟動10個work,總共12個進程。
/*
? Event git:(master) pstree |grep server.php
| \-+= 54172 yangyi php server.php #Master進程
| \-+- 54173 yangyi php server.php # Manager 進程
| |--- 54174 yangyi php server.php #Work 進程
| |--- 54175 yangyi php server.php
| |--- 54176 yangyi php server.php
| |--- 54177 yangyi php server.php
| |--- 54178 yangyi php server.php
| |--- 54179 yangyi php server.php
| |--- 54180 yangyi php server.php
| |--- 54181 yangyi php server.php
| |--- 54182 yangyi php server.php
| \--- 54183 yangyi php server.php
*
*/
//增加新的監控的ip:post:mode
$this->serv->addlistener("::1", 9500, SWOOLE_SOCK_TCP);
//監聽事件
/*
*
* - onStart
* - onShutdown
* - onWorkerStart
* - onWorkerStop
* - onTimer
* - onConnect
* - onReceive
* - onClose
* - onTask
* - onFinish
* - onPipeMessage
* - onWorkerError
* - onManagerStart
* - onManagerStop
*/
$this->serv->on(‘Start‘, array($this, ‘onStart‘));
$this->serv->on(‘Connect‘, array($this, ‘onConnect‘));
$this->serv->on(‘Receive‘, array($this, ‘onReceive‘));
$this->serv->on(‘Close‘, array($this, ‘onClose‘));
//master進程啟動後, fork出Manager進程, 然後觸發ManagerStart
$this->serv->on(‘ManagerStart‘, function (\swoole_server $server){
echo "On manager start.";
});
//manager進程啟動,啟動work進程的時候調用 workid表示第幾個id, 從0開始。
$this->serv->on(‘WorkerStart‘, function($serv, $workerId) {
echo $workerId . ‘---‘;
});
//當一個work進程死掉後,會觸發
$this->serv->on(‘WorkerStop‘, function() {
echo ‘--stop‘;
});
//啟動
$this->serv->start();
}
//啟動server時候會觸發。
public function onStart( $serv ) {
echo "Start\n";
}
//client連接成功後觸發。
public function onConnect( $serv, $fd, $from_id ) {
$a = $serv->send( $fd, "Hello {$fd}!" );
//var_dump($a); //成功返回true
}
//接收client發過來的請求
public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
echo "Get Message From Client {$fd}:{$data}\n";
//$serv->send($fd, $data);
//關閉該work進程
//$serv->stop();
//宕機
//$serv->shutdown();
//主動關閉 客戶端連接,也會觸發onClose事件
//$serv->close($fd);
$serv->send($fd, $data);
//$list = $serv->connection_list();
// foreach ($list as $fd) {
// $serv->send($fd, $data);
// }
}
}
//客戶端斷開觸發
public function onClose( $serv, $fd, $from_id ) {
echo "Client {$fd} close connection\n";
}
}
//輸出swoole的版本
echo swoole_version(); // 1.9.0
//輸出本機iP
var_dump(swoole_get_local_ip());
/**
array(1) {
‘en4‘ =>
string(13) "172.16.71.149"
}
*/
// 啟動服務器
$server = new Server();
我們啟動服務端server:
$ php server.php
0--start
2--start
Start
1--start
3--start
4--start
5--start
6--start
manager start.
7--start
9--start
8--start
我們來分析整個server 啟動的步驟:
- 啟動php server.php後,當前進程fork出Master進程,然後退出。
- Master進程啟動成功之後,fork出Manager進程,並觸發OnManagerStart事件。
- Manager進程啟動成功時候,fork出Worker進程,並觸發OnWorkerStart事件。
同步client
server端好了,那麽就會需要client端來連接,swoole裏面client分為同步和異步,先來一個同步clent客戶端。
<?php
// sync 同步客戶端
class client
{
private $client;
public function __construct()
{
$this->client = new Swoole\Client(SWOOLE_SOCK_TCP | SWOOLE_KEEP);
$this->client->connect(‘127.0.0.1‘, 9501, 1);
}
public function connect()
{
//fwrite(STDOUT, "請輸入消息:");
//$msg = trim(fgets(STDIN));
$msg = rand(1,12);
//發送給消息到服務端
$this->client->send( $msg );
//接受服務端發來的信息
$message = $this->client->recv();
echo "Get Message From Server:{$message}\n";
//關閉客戶端
$this->client->close();
}
}
$client = new Client();
$client->connect();
同步client是同步阻塞的。一整套connect->send()->rev()->close()
是同步進行的。
所以,如果是大量的循環數據,就不適合同步client了:
比如下面:
<?php
// sync 同步客戶端
class client
{
private $client;
public function __construct()
{
var_dump(swoole_get_local_ip());
$this->client = new Swoole\Client(SWOOLE_SOCK_TCP | SWOOLE_KEEP);
$this->client->connect(‘127.0.0.1‘, 9501, 1);
$i = 0;
while ($i < 100) {
$this->client->send($i."\n");
$message = $this->client->recv();
echo "Get Message From Server:{$message}\n";
$i++;
}
}
}
$client = new Client();
打印的結果就是順序執行。要是想要異步了。
異步client
<?php
//異步客戶端
$client = new Swoole\Client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
$client->on("connect", function($cli) {
var_dump($cli->isConnected()); // true
var_dump($cli->getsockname()); //[‘port‘ => 57305, ‘host‘=> ‘127.0.0.1‘]
var_dump($cli->sock); // 5
$i = 0;
while ($i < 100) {
$cli->send($i."\n");
$i++;
}
//關閉
//$cli->close();
});
$client->on("receive", function($cli, $data){
echo "Receive: $data";
});
$client->on("error", function(swoole_client $cli){
echo "error\n" . $cli->errCode;
});
$client->on("close", function(swoole_client $cli){
echo "Connection close\n";
});
$client->connect(‘127.0.0.1‘, 9501);
這樣就是一個異步的client了,處理更快,但是只支持php的cli模式
。
server與client交互
總結一下client與server的連接過程:
- Client主動Connect的時候,Client實際上是與Master進程中的某個Reactor線程發生了連接。
- 當TCP的三次握手成功了以後,由這個Reactor線程將連接成功的消息告訴Manager進程,再由Manager進程轉交給Worker進程。
- 在這個Worker進程中觸發了OnConnect的方法。
- 當Client向Server發送了一個數據包的時候,首先收到數據包的是Reactor線程,同時Reactor線程會完成組包,再將組好的包交給Manager進程,由Manager進程轉交給Worker。
- 此時Worker進程觸發OnReceive事件。
- 如果在Worker進程中做了什麽處理,然後再用Send方法將數據發回給客戶端時,數據則會沿著這個路徑逆流而上。
關於上面說到的幾個進程,解釋下:
Master進程是一個多線程進程,其中有一組非常重要的線程,叫做Reactor線程(組),每當一個客戶端連接上服務器的時候,都會由Master進程從已有的Reactor線程中,根據一定規則挑選一個,專門負責向這個客戶端提供維持鏈接、處理網絡IO與收發數據等服務。
而Manager進程,某種意義上可以看做一個代理層,它本身並不直接處理業務,其主要工作是將Master進程中收到的數據轉交給Worker進程,或者將Worker進程中希望發給客戶端的數據轉交給Master進程進行發送。另外,Manager進程還負責監控Worker進程,如果Worker進程因為某些意外掛了,Manager進程會重新拉起新的Worker進程,有點像Supervisor的工作。
Worker進程了,顧名思義,Worker進程其實就是處理各種業務工作的進程,Manager將數據包轉交給Worker進程,然後Worker進程進行具體的處理,並根據實際情況將結果反饋給客戶端。
task_worker
在swoole中work進程分為EventWorker和TaskWorker,對應的配置文件設置為:
$this->serv->set([
‘worker_num‘ => 10, #EventWorker
‘task_worker_num‘ => 2, #TaskWorker
‘deamonize‘ => true,
]);
worker是基於event觸發,而task則是manager直接生成的子進程。那麽他們有什麽區別呢?
共同點是:他們都是最底層負責處理業務的進程。
Swoole的業務邏輯部分是同步阻塞運行的,如果遇到一些耗時較大的操作,例如訪問數據庫、廣播消息等,就會影響服務器的響應速度。因此Swoole提供了Task功能,將這些耗時操作放到另外的進程去處理,當前woker進程繼續執行後面的邏輯。運行Task,需要在swoole服務中配置參數task_worker_num
,即可開啟task功能。此外,必須給swoole_server綁定兩個回調函數:onTask
和onFinish
。這兩個回調函數分別用於執行Task任務和處理Task任務的返回結果。
先來寫一個demo,來如何用 taskWoker來處理業務。
taskServer.php
<?php
/**
* Created by PhpStorm.
* User: yangyi
* Date: 2016/12/7
* Time: 16:16
*/
class taskServer
{
private $serv;
/**
* [__construct description]
* 構造方法中,初始化 $serv 服務
*/
public function __construct() {
$this->serv = new Swoole\Server(‘0.0.0.0‘, 9501);
//初始化swoole服務
$this->serv->set(array(
‘worker_num‘ => 8,
‘daemonize‘ => false, //是否作為守護進程,此配置一般配合log_file使用
‘max_request‘ => 1000,
‘log_file‘ => ‘./swoole.log‘,
‘task_worker_num‘ => 8
));
//設置監聽
$this->serv->on(‘Start‘, array($this, ‘onStart‘));
$this->serv->on(‘Connect‘, array($this, ‘onConnect‘));
$this->serv->on("Receive", array($this, ‘onReceive‘));
$this->serv->on("Close", array($this, ‘onClose‘));
$this->serv->on("Task", array($this, ‘onTask‘));
$this->serv->on("Finish", array($this, ‘onFinish‘));
//開啟
$this->serv->start();
}
public function onStart($serv) {
echo SWOOLE_VERSION . " onStart\n";
}
public function onConnect($serv, $fd) {
echo $fd."Client Connect.\n";
}
public function onReceive($serv, $fd, $from_id, $data) {
echo "Get Message From Client {$fd}:{$data}\n";
// send a task to task worker.
$param = array(
‘fd‘ => $fd
);
// start a task
$serv->task(json_encode($param));
echo "Continue Handle Worker\n";
}
public function onClose($serv, $fd) {
echo "Client Close.\n";
}
public function onTask($serv, $task_id, $from_id, $data) {
echo "This Task {$task_id} from Worker {$from_id}\n";
echo "Data: {$data}\n";
for($i = 0 ; $i < 200 ; $i ++ ) {
sleep(1);
echo "Task {$task_id} Handle {$i} times...\n";
}
$fd = json_decode($data, true);
$serv->send($fd[‘fd‘] , "Data in Task {$task_id}");
return "Task {$task_id}‘s result";
}
public function onFinish($serv,$task_id, $data) {
echo "Task {$task_id} finish\n";
echo "Result: {$data}\n";
}
}
$server = new taskServer();
taskClient.php 異步的客戶端
<?php
/**
* Created by PhpStorm.
* User: yangyi
* Date: 2016/12/7
* Time: 16:18
*/
class taskClient
{
private $client;
public function __construct() {
$this->client = new Swoole\Client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
$this->client->on(‘Connect‘, array($this, ‘onConnect‘));
$this->client->on(‘Receive‘, array($this, ‘onReceive‘));
$this->client->on(‘Close‘, array($this, ‘onClose‘));
$this->client->on(‘Error‘, array($this, ‘onError‘));
}
public function connect() {
if(!$fp = $this->client->connect("127.0.0.1", 9501 , 1)) {
echo "Error: {$fp->errMsg}[{$fp->errCode}]\n";
return;
}
}
//connect之後,會調用onConnect方法
public function onConnect($cli) {
fwrite(STDOUT, "Enter Msg:");
swoole_event_add(STDIN,function(){
fwrite(STDOUT, "Enter Msg:");
$msg = trim(fgets(STDIN));
$this->send($msg);
});
}
public function onClose($cli) {
echo "Client close connection\n";
}
public function onError() {
}
public function onReceive($cli, $data) {
echo "Received: ".$data."\n";
}
public function send($data) {
$this->client->send($data);
}
public function isConnected($cli) {
return $this->client->isConnected();
}
}
$client = new taskClient();
$client->connect();
運行一下:
php taskServer.php
php taskClient.php
服務端打印:
$ php task_server.php
1.9.0 onStart
1Client Connect.
Get Message From Client 1:12345
Continue Handle Worker
This Task 0 from Worker 3
Data: {"fd":1}
Task 0 Handle 0 times...
Task 0 Handle 1 times...
Task 0 finish
Result: Task 0‘s result
客戶端打印:
$ php task_client.php
Enter Msg:12345
Enter Msg:Received: Data in Task 0
這裏面有幾點需要註意:
1. 運行Task,必須要在swoole服務中配置參數task_worker_num
,此外,必須給swoole_server綁定兩個回調函數:onTask
和onFinish
。
2. onTash 要return 數據
3. onFinish 會接收到onTash的數據,標記成完成。
4. swoole_event_add 把輸入綁定成事件,這個後續將,這樣client就可以連續的多次輸入。
swoole的架構
上面說了這麽,圖表總結一下swoole結構:
swoole采用 多線程Reactor+多進程Worker
swoole的處理連接流程圖如下:
當請求到達時,swoole是這樣處理的:
請求到達 Main Reactor
|
|
Main Reactor根據Reactor的情況,將請求註冊給對應的Reactor
(每個Reactor都有epoll。用來監聽客戶端的變化)
|
|
客戶端有變化時,交給worker來處理
|
|
worker處理完畢,通過進程間通信(比如管道、共享內存、消息隊列)發給對應的reactor。
|
|
reactor將響應結果發給相應的連接
|
|
請求處理完成
因為reactor基於epoll,所以每個reactor可以處理無數個連接請求。 如此,swoole就輕松的處理了高並發。
參考資料:
http://rango.swoole.com/archives/305
https://github.com/szyhf/swoole_study/blob/master/Swoole%E7%9A%84%E8%BF%9B%E7%A8%8B%E6%A8%A1%E5%9E%8B.md
http://happyliu.blog.51cto.com/501986/1574923
https://segmentfault.com/a/1190000007614502
swoole深入學習 2. tcp Server和tcp Client