1. 程式人生 > 實用技巧 >Easyswoole的WaitGroup和Csp元件的分析和使用

Easyswoole的WaitGroup和Csp元件的分析和使用

Easyswoole的WaitGroup和Csp元件的分析和使用

easyswoole可真是個好名字,只是提供了恰到好處的封裝,即使是原始碼也保持了這樣的風格。這種風格不論好壞可能都需要各位適應下,哈哈。下面一起來感受下es中的實現吧。

-waitgroup在easyswoole中的實現和使用

-csp在easyswoole中的實現和使用

waitgroup

分析
<?php

namespace EasySwoole\Component;

use Swoole\Coroutine\Channel;

class WaitGroup
{
    private $count = 0;
    /** @var Channel  */
    private $channel;
    private $success = 0;
    private $size;

    public function __construct(int $size = 128)
    {
        $this->size = $size;
        // 主要的初始化功能在reset中 提供了一個傳遞任務執行情況的通道
        // reset方法 還允許在當前協程多次使用同一個waitgroup變數
        $this->reset();
    }

    public function add()
    {   
        // wg中子任務的計數器
        $this->count++;
    }

    function successNum(): int
    {
        return $this->success;
    }

    public function done()
    {   
        // 當子協程任務執行完畢時 向管道中push了一個int 1
        $this->channel->push(1);
    }

    // 最主要的邏輯在這裡
    // 其中的while迴圈 相當於在主協程中阻塞 直到全部的任務執行完成或者超時
    public function wait(?float $timeout = 15)
    {   
        if ($timeout <= 0) {
            $timeout = PHP_INT_MAX;
        }
        $this->success = 0;
        $left = $timeout;
        // $this->count是還要等待完成的子協程數量
        // $left是超時時間 即主協程繼續向下執行還需要等待的時間(其實並不存在主子協程的概念 這裡只是為了好區分)
        // 當add的任務全部執行完成 或者 超過了設定的timeout 此wait函式執行完畢 程式繼續向下執行
        while (($this->count > 0) && ($left > 0)) {
            $start = round(microtime(true), 3);
            if ($this->channel->pop($left) === 1) {
                // 每從通道pop一次就給還要執行的任務數量-1
                $this->count--;
                // 每從通道pop一次就給成功數量+1
                $this->success++;
            }
            // 每迴圈一次更新一次還要wait的時間
            $left = $left - (round(microtime(true), 3) - $start);
        }
    }

    function reset()
    {
        $this->close();
        $this->count = 0;
        $this->success = 0;
        $this->channel = new Channel($this->size);
    }

    function close()
    {
        if ($this->channel) {
            $this->channel->close();
            $this->channel = null;
        }
    }

    function __destruct()
    {
        $this->close();
    }
}

使用
# 確保開起了swoole的協程hook
$wg = new WaitGroup();
$wg->add();
go(function () use ($wg) {
    defer(function () use ($wg) {
        $wg->done();
    });
    sleep(1); # 子協程使用sleep模擬阻塞
    var_dump("task1 done");
});
$wg->add();
go(function () use ($wg) {
    defer(function () use ($wg) {
        $wg->done();
    });
    sleep(5); # 子協程使用sleep模擬阻塞
    var_dump('task2 done');
});
# 呼叫了wait方法 主協程就阻塞在wait方法中的while迴圈上 直到全部的子協程執行完畢或者超時才執行後面的程式碼
$wg->wait(); # 可以嘗試在wait中傳遞較小的超時時間 檢查效果
var_dump('main task done');

# 給出swoole協程切換的條件
1 協程中的程式碼遇到阻塞 如上的sleep
2 協程主動yield讓出執行權
3 cpu計算超時 主動讓出執行權(詳細配置請檢視swoole官方文件)
其他的切換條件請大家補充

csp

分析
<?php

namespace EasySwoole\Component;

use Swoole\Coroutine\Channel;
use Swoole\Coroutine;

class Csp
{
    private $chan;
    private $count = 0;
    private $success = 0;
    private $task = [];

    // 初始化channel
    function __construct(int $size = 8)
    {
        $this->chan = new Channel($size);
    }

    // 新增要併發執行的任務
    function add($itemName, callable $call): Csp
    {
        $this->count = 0;
        $this->success = 0;
        $this->task[$itemName] = $call;
        return $this;
    }

    function successNum(): int
    {
        return $this->success;
    }

    // 阻塞執行新增的任務 直到全部執行完畢 或者 超時
    function exec(?float $timeout = 5)
    {
        if ($timeout <= 0) {
            $timeout = PHP_INT_MAX;
        }
        $this->count = count($this->task);
        // 建立協程併發執行add進來的任務
        foreach ($this->task as $key => $call) {
            Coroutine::create(function () use ($key, $call) {
                // 呼叫add進來的閉包
                $data = call_user_func($call);
                // 將執行結果送入通道
                $this->chan->push([
                    'key' => $key,
                    'result' => $data
                ]);
            });
        }
        $result = [];
        $start = microtime(true);
        // 同樣是通過while迴圈 將程式碼阻塞在exec方法上
        while ($this->count > 0) {
            // 超時返回false
            $temp = $this->chan->pop(1);
            if (is_array($temp)) {
                $key = $temp['key'];
                $result[$key] = $temp['result'];
                $this->count--;
                $this->success++;
            }
            // 超時就結束迴圈 程式碼向下執行
            if (microtime(true) - $start > $timeout) {
                break;
            }
        }
        return $result;
    }
}
使用
# 執行發現並沒有先列印主協程中的 main task done,這是因為Csp的exec方法會阻塞當前協程

$csp = new Csp();
$csp->add('task1', function () {
    return file_get_contents("https://www.easyswoole.com/Cn/Components/Component/csp.html");
});
$csp->add('task2', function () {
    return file_get_contents("https://www.easyswoole.com/Cn/Components/Component/waitGroup.html");
});
$res = $csp->exec();
var_dump($res);
go(function () {
    var_dump("main task done");
});


# 如此使用觸發了協程排程 導致先輸出了主協程的main task done
    
go(function () {
    $csp = new Csp();
    $csp->add('task1', function () {
        return file_get_contents("https://www.easyswoole.com/Cn/Components/Component/csp.html");
    });
    $csp->add('task2', function () {
        return file_get_contents("https://wiki.swoole.com/#/start/start_tcp_servers");
    });
    $res = $csp->exec();
    var_dump($res);
});
var_dump('main task done');


# 有時候可能需要將csp放到自行建立的協程容器中執行 可以嘗試下面配合WaitGroup的方式
$wg = new WaitGroup();
$wg->add();
go(function () use ($wg) {
    defer(function () use ($wg) {
        $wg->done();
    });
    $csp = new Csp();
    $csp->add('task1', function () {
        return file_get_contents("https://www.easyswoole.com/Cn/Components/Component/csp.html");
    });
    $csp->add('task2', function () {
        return file_get_contents("https://www.easyswoole.com/Cn/Components/Component/waitGroup.html");
    });
    $res = $csp->exec();
    var_dump($res);
});
$wg->wait();
// go(function(){
//     var_dump('main task done');
// });
var_dump('main task done');   
請勿將以上的程式碼搬運到go中,因為golang在語言層面完全支援了協程排程,而swoole的協程不僅需要在協程容器中使用,並且實現協程的切換也需要相應的條件。

今天就說到這吧,發現錯誤歡迎指正,感謝!!!