Easyswoole的WaitGroup和Csp元件的分析和使用
阿新 • • 發佈:2020-10-29
Easyswoole的WaitGroup和Csp元件的分析和使用
easyswoole可真是個好名字,只是提供了恰到好處的封裝,即使是原始碼也保持了這樣的風格。這種風格不論好壞可能都需要各位適應下,哈哈。下面一起來感受下es中的實現吧。
分析
<?php namespace EasySwoole\Component; use Swoole\Coroutine\Channel; class WaitGroup { private $count = 0; /** @var Channel */ private $channel; private $success = 0; private $size; public function __construct(int $size = 128) { $this->size = $size; // 主要的初始化功能在reset中 提供了一個傳遞任務執行情況的通道 // reset方法 還允許在當前協程多次使用同一個waitgroup變數 $this->reset(); } public function add() { // wg中子任務的計數器 $this->count++; } function successNum(): int { return $this->success; } public function done() { // 當子協程任務執行完畢時 向管道中push了一個int 1 $this->channel->push(1); } // 最主要的邏輯在這裡 // 其中的while迴圈 相當於在主協程中阻塞 直到全部的任務執行完成或者超時 public function wait(?float $timeout = 15) { if ($timeout <= 0) { $timeout = PHP_INT_MAX; } $this->success = 0; $left = $timeout; // $this->count是還要等待完成的子協程數量 // $left是超時時間 即主協程繼續向下執行還需要等待的時間(其實並不存在主子協程的概念 這裡只是為了好區分) // 當add的任務全部執行完成 或者 超過了設定的timeout 此wait函式執行完畢 程式繼續向下執行 while (($this->count > 0) && ($left > 0)) { $start = round(microtime(true), 3); if ($this->channel->pop($left) === 1) { // 每從通道pop一次就給還要執行的任務數量-1 $this->count--; // 每從通道pop一次就給成功數量+1 $this->success++; } // 每迴圈一次更新一次還要wait的時間 $left = $left - (round(microtime(true), 3) - $start); } } function reset() { $this->close(); $this->count = 0; $this->success = 0; $this->channel = new Channel($this->size); } function close() { if ($this->channel) { $this->channel->close(); $this->channel = null; } } function __destruct() { $this->close(); } }
使用
# 確保開起了swoole的協程hook $wg = new WaitGroup(); $wg->add(); go(function () use ($wg) { defer(function () use ($wg) { $wg->done(); }); sleep(1); # 子協程使用sleep模擬阻塞 var_dump("task1 done"); }); $wg->add(); go(function () use ($wg) { defer(function () use ($wg) { $wg->done(); }); sleep(5); # 子協程使用sleep模擬阻塞 var_dump('task2 done'); }); # 呼叫了wait方法 主協程就阻塞在wait方法中的while迴圈上 直到全部的子協程執行完畢或者超時才執行後面的程式碼 $wg->wait(); # 可以嘗試在wait中傳遞較小的超時時間 檢查效果 var_dump('main task done'); # 給出swoole協程切換的條件 1 協程中的程式碼遇到阻塞 如上的sleep 2 協程主動yield讓出執行權 3 cpu計算超時 主動讓出執行權(詳細配置請檢視swoole官方文件) 其他的切換條件請大家補充
分析
<?php namespace EasySwoole\Component; use Swoole\Coroutine\Channel; use Swoole\Coroutine; class Csp { private $chan; private $count = 0; private $success = 0; private $task = []; // 初始化channel function __construct(int $size = 8) { $this->chan = new Channel($size); } // 新增要併發執行的任務 function add($itemName, callable $call): Csp { $this->count = 0; $this->success = 0; $this->task[$itemName] = $call; return $this; } function successNum(): int { return $this->success; } // 阻塞執行新增的任務 直到全部執行完畢 或者 超時 function exec(?float $timeout = 5) { if ($timeout <= 0) { $timeout = PHP_INT_MAX; } $this->count = count($this->task); // 建立協程併發執行add進來的任務 foreach ($this->task as $key => $call) { Coroutine::create(function () use ($key, $call) { // 呼叫add進來的閉包 $data = call_user_func($call); // 將執行結果送入通道 $this->chan->push([ 'key' => $key, 'result' => $data ]); }); } $result = []; $start = microtime(true); // 同樣是通過while迴圈 將程式碼阻塞在exec方法上 while ($this->count > 0) { // 超時返回false $temp = $this->chan->pop(1); if (is_array($temp)) { $key = $temp['key']; $result[$key] = $temp['result']; $this->count--; $this->success++; } // 超時就結束迴圈 程式碼向下執行 if (microtime(true) - $start > $timeout) { break; } } return $result; } }
使用
# 執行發現並沒有先列印主協程中的 main task done,這是因為Csp的exec方法會阻塞當前協程
$csp = new Csp();
$csp->add('task1', function () {
return file_get_contents("https://www.easyswoole.com/Cn/Components/Component/csp.html");
});
$csp->add('task2', function () {
return file_get_contents("https://www.easyswoole.com/Cn/Components/Component/waitGroup.html");
});
$res = $csp->exec();
var_dump($res);
go(function () {
var_dump("main task done");
});
# 如此使用觸發了協程排程 導致先輸出了主協程的main task done
go(function () {
$csp = new Csp();
$csp->add('task1', function () {
return file_get_contents("https://www.easyswoole.com/Cn/Components/Component/csp.html");
});
$csp->add('task2', function () {
return file_get_contents("https://wiki.swoole.com/#/start/start_tcp_servers");
});
$res = $csp->exec();
var_dump($res);
});
var_dump('main task done');
# 有時候可能需要將csp放到自行建立的協程容器中執行 可以嘗試下面配合WaitGroup的方式
$wg = new WaitGroup();
$wg->add();
go(function () use ($wg) {
defer(function () use ($wg) {
$wg->done();
});
$csp = new Csp();
$csp->add('task1', function () {
return file_get_contents("https://www.easyswoole.com/Cn/Components/Component/csp.html");
});
$csp->add('task2', function () {
return file_get_contents("https://www.easyswoole.com/Cn/Components/Component/waitGroup.html");
});
$res = $csp->exec();
var_dump($res);
});
$wg->wait();
// go(function(){
// var_dump('main task done');
// });
var_dump('main task done');
請勿將以上的程式碼搬運到go中,因為golang在語言層面完全支援了協程排程,而swoole的協程不僅需要在協程容器中使用,並且實現協程的切換也需要相應的條件。
今天就說到這吧,發現錯誤歡迎指正,感謝!!!