1. 程式人生 > >在PHP中使用協程實現多工排程

在PHP中使用協程實現多工排程

PHP5.5一個比較好的新功能是加入了對迭代生成器和協程的支援.對於生成器,PHP的文件和各種其他的部落格文章已經有了非常詳細的講解.協程相對受到的關注就少了,因為協程雖然有很強大的功能但相對比較複雜, 也比較難被理解,解釋起來也比較困難.

這篇文章將嘗試通過介紹如何使用協程來實施任務排程, 來解釋在PHP中的協程.

我將在前三節做一個簡單的背景介紹.如果你已經有了比較好的基礎,可以直接跳到“協同多工處理”一節.

迭代生成器

(迭代)生成器也是一個函式,不同的是這個函式的返回值是依次返回,而不是隻返回一個單獨的值.或者,換句話說,生成器使你能更方便的實現了迭代器介面.下面通過實現一個xrange函式來簡單說明:

<?php
function xrange($start, $end, $step = 1) {
    for ($i = $start; $i <= $end; $i += $step) {
        yield $i;
    }
}

foreach (xrange(1, 1000000) as $num) {
    echo $num, "\n";
}

上面這個xrange()函式提供了和PHP的內建函式range()一樣的功能.但是不同的是range()函式返回的是一個包含值從1到100萬0的陣列(注:請檢視手冊). 而xrange()函式返回的是依次輸出這些值的一個迭代器, 而不會真正以陣列形式返回.

這種方法的優點是顯而易見的.它可以讓你在處理大資料集合的時候不用一次性的載入到記憶體中.甚至你可以處理無限大的資料流.

當然,也可以不同通過生成器來實現這個功能,而是可以通過繼承Iterator介面實現.但通過使用生成器實現起來會更方便,不用再去實現iterator介面中的5個方法了.

生成器為可中斷的函式

要從生成器認識協程, 理解它內部是如何工作是非常重要的: 生成器是一種可中斷的函式, 在它裡面的yield構成了中斷點.

還是看上面的例子, 呼叫xrange(1,1000000)的時候, xrange()函式裡程式碼其實並沒有真正地執行. 它只是返回了一個迭代器:

<?php
$range = xrange(1, 1000000);
var_dump($range); // object(Generator)#1
var_dump($range instanceof Iterator); // bool(true)
?>

這也解釋了為什麼xrange叫做迭代生成器, 因為它返回一個迭代器, 而這個迭代器實現了Iterator介面.

呼叫迭代器的方法一次, 其中的程式碼執行一次.例如, 如果你呼叫$range->rewind(), 那麼xrange()裡的程式碼就會執行到控制流第一次出現yield的地方. 而函式內傳遞給yield語句的返回值可以通過$range->current()獲取.

為了繼續執行生成器中yield後的程式碼, 你就需要呼叫$range->next()方法. 這將再次啟動生成器, 直到下一次yield語句出現. 因此,連續呼叫next()和current()方法, 你就能從生成器裡獲得所有的值, 直到再沒有yield語句出現.

對xrange()來說, 這種情形出現在$i超過$end時. 在這中情況下, 控制流將到達函式的終點,因此將不執行任何程式碼.一旦這種情況發生,vaild()方法將返回假, 這時迭代結束.

協程

協程的支援是在迭代生成器的基礎上, 增加了可以回送資料給生成器的功能(呼叫者傳送資料給被呼叫的生成器函式). 這就把生成器到呼叫者的單向通訊轉變為兩者之間的雙向通訊.

傳遞資料的功能是通過迭代器的send()方法實現的. 下面的logger()協程是這種通訊如何執行的例子:

<?php
function logger($fileName) {
    $fileHandle = fopen($fileName, 'a');
    while (true) {
        fwrite($fileHandle, yield . "\n");
    }
}

$logger = logger(__DIR__ . '/log');
$logger->send('Foo');
$logger->send('Bar')
?>

正如你能看到,這兒yield沒有作為一個語句來使用, 而是用作一個表示式, 即它能被演化成一個值. 這個值就是呼叫者傳遞給send()方法的值. 在這個例子裡, yield表示式將首先被”Foo”替代寫入Log, 然後被”Bar”替代寫入Log.

上面的例子裡演示了yield作為接受者, 接下來我們看如何同時進行接收和傳送的例子:

<?php
function gen() {
    $ret = (yield 'yield1');
    var_dump($ret);
    $ret = (yield 'yield2');
    var_dump($ret);
}

$gen = gen();
var_dump($gen->current());    // string(6) "yield1"
var_dump($gen->send('ret1')); // string(4) "ret1"   (the first var_dump in gen)
                              // string(6) "yield2" (the var_dump of the ->send() return value)
var_dump($gen->send('ret2')); // string(4) "ret2"   (again from within gen)
                              // NULL               (the return value of ->send())
?>

要很快的理解輸出的精確順序可能稍微有點困難, 但你確定要搞清楚為什按照這種方式輸出. 以便後續繼續閱讀.

另外, 我要特別指出的有兩點:

第一點,yield表示式兩邊的括號在PHP7以前不是可選的, 也就是說在PHP5.5和PHP5.6中圓括號是必須的.

第二點,你可能已經注意到呼叫current()之前沒有呼叫rewind().這是因為生成迭代物件的時候已經隱含地執行了rewind操作.

多工協作

如果閱讀了上面的logger()例子, 你也許會疑惑“為了雙向通訊我為什麼要使用協程呢?我完全可以使用其他非協程方法實現同樣的功能啊?”, 是的, 你是對的, 但上面的例子只是為了演示了基本用法, 這個例子其實並沒有真正的展示出使用協程的優點.

正如上面介紹裡提到的,協程是非常強大的概念,不過卻應用的很稀少而且常常十分複雜.要給出一些簡單而真實的例子很難.

在這篇文章裡,我決定去做的是使用協程實現多工協作.我們要解決的問題是你想併發地執行多工(或者“程式”).不過我們都知道CPU在一個時刻只能執行一個任務(不考慮多核的情況).因此處理器需要在不同的任務之間進行切換,而且總是讓每個任務執行 “一小會兒”.

多工協作這個術語中的“協作”很好的說明了如何進行這種切換的:它要求當前正在執行的任務自動把控制傳回給排程器,這樣就可以執行其他任務了. 這與“搶佔”多工相反, 搶佔多工是這樣的:排程器可以中斷運行了一段時間的任務, 不管它喜歡還是不喜歡. 協作多工在Windows的早期版本(windows95)和Mac OS中有使用, 不過它們後來都切換到使用搶先多工了. 理由相當明確:如果你依靠程式自動交出控制的話, 那麼一些惡意的程式將很容易佔用整個CPU, 不與其他任務共享.

現在你應當明白協程和任務排程之間的關係:yield指令提供了任務中斷自身的一種方法, 然後把控制交回給任務排程器. 因此協程可以執行多個其他任務. 更進一步來說, yield還可以用來在任務和排程器之間進行通訊.

為了實現我們的多工排程, 首先實現“任務” — 一個用輕量級的包裝的協程函式:

<?php
class Task {
    protected $taskId;
    protected $coroutine;
    protected $sendValue = null;
    protected $beforeFirstYield = true;

    public function __construct($taskId, Generator $coroutine) {
        $this->taskId = $taskId;
        $this->coroutine = $coroutine;
    }

    public function getTaskId() {
        return $this->taskId;
    }

    public function setSendValue($sendValue) {
        $this->sendValue = $sendValue;
    }

    public function run() {
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } else {
            $retval = $this->coroutine->send($this->sendValue);
            $this->sendValue = null;
            return $retval;
        }
    }

    public function isFinished() {
        return !$this->coroutine->valid();
    }
}

如程式碼, 一個任務就是用任務ID標記的一個協程(函式). 使用setSendValue()方法, 你可以指定哪些值將被髮送到下次的恢復(在之後你會了解到我們需要這個), run()函式確實沒有做什麼, 除了呼叫send()方法的協同程式, 要理解為什麼添加了一個 beforeFirstYieldflag變數, 需要考慮下面的程式碼片段:

<?php
function gen() {
    yield 'foo';
    yield 'bar';
}

$gen = gen();
var_dump($gen->send('something'));

// 如之前提到的在send之前, 當$gen迭代器被建立的時候一個renwind()方法已經被隱式呼叫
// 所以實際上發生的應該類似:
//$gen->rewind();
//var_dump($gen->send('something'));

//這樣renwind的執行將會導致第一個yield被執行, 並且忽略了他的返回值.
//真正當我們呼叫yield的時候, 我們得到的是第二個yield的值! 導致第一個yield的值被忽略.
//string(3) "bar"

通過新增 beforeFirstYieldcondition 我們可以確定第一個yield的值能被正確返回.

排程器現在不得不比多工迴圈要做稍微多點了, 然後才執行多工:

<?php
class Scheduler {
    protected $maxTaskId = 0;
    protected $taskMap = []; // taskId => task
    protected $taskQueue;

    public function __construct() {
        $this->taskQueue = new SplQueue();
    }

    public function newTask(Generator $coroutine) {
        $tid = ++$this->maxTaskId;
        $task = new Task($tid, $coroutine);
        $this->taskMap[$tid] = $task;
        $this->schedule($task);
        return $tid;
    }

    public function schedule(Task $task) {
        $this->taskQueue->enqueue($task);
    }

    public function run() {
        while (!$this->taskQueue->isEmpty()) {
            $task = $this->taskQueue->dequeue();
            $task->run();

            if ($task->isFinished()) {
                unset($this->taskMap[$task->getTaskId()]);
            } else {
                $this->schedule($task);
            }
        }
    }
}
?>

newTask()方法(使用下一個空閒的任務id)建立一個新任務,然後把這個任務放入任務map數組裡. 接著它通過把任務放入任務佇列裡來實現對任務的排程. 接著run()方法掃描任務佇列, 執行任務.如果一個任務結束了, 那麼它將從佇列裡刪除, 否則它將在佇列的末尾再次被排程.

讓我們看看下面具有兩個簡單(沒有什麼意義)任務的排程器:

<?php
function task1() {
    for ($i = 1; $i <= 10; ++$i) {
        echo "This is task 1 iteration $i.\n";
        yield;
    }
}

function task2() {
    for ($i = 1; $i <= 5; ++$i) {
        echo "This is task 2 iteration $i.\n";
        yield;
    }
}

$scheduler = new Scheduler;

$scheduler->newTask(task1());
$scheduler->newTask(task2());

$scheduler->run();

兩個任務都僅僅回顯一條資訊,然後使用yield把控制回傳給排程器.輸出結果如下:

This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.

輸出確實如我們所期望的:對前五個迭代來說,兩個任務是交替執行的, 而在第二個任務結束後, 只有第一個任務繼續執行.

與排程器之間通訊

既然排程器已經運行了, 那麼我們來看下一個問題:任務和排程器之間的通訊.

我們將使用程序用來和作業系統會話的同樣的方式來通訊:系統呼叫.

我們需要系統呼叫的理由是作業系統與程序相比它處在不同的許可權級別上. 因此為了執行特權級別的操作(如殺死另一個程序), 就不得不以某種方式把控制傳回給核心, 這樣核心就可以執行所說的操作了. 再說一遍, 這種行為在內部是通過使用中斷指令來實現的. 過去使用的是通用的int指令, 如今使用的是更特殊並且更快速的syscall/sysenter指令.

我們的任務排程系統將反映這種設計:不是簡單地把排程器傳遞給任務(這樣就允許它做它想做的任何事), 我們將通過給yield表示式傳遞資訊來與系統呼叫通訊. 這兒yield即是中斷, 也是傳遞資訊給排程器(和從排程器傳遞出資訊)的方法.

為了說明系統呼叫, 我們對可呼叫的系統呼叫做一個小小的封裝:

<?php
class SystemCall {
    protected $callback;

    public function __construct(callable $callback) {
        $this->callback = $callback;
    }

    public function __invoke(Task $task, Scheduler $scheduler) {
        $callback = $this->callback;
        return $callback($task, $scheduler);
    }
}

它和其他任何可呼叫的物件(使用_invoke)一樣的執行, 不過它要求排程器把正在呼叫的任務和自身傳遞給這個函式.

為了解決這個問題我們不得不微微的修改排程器的run方法:

<?php
public function run() {
    while (!$this->taskQueue->isEmpty()) {
        $task = $this->taskQueue->dequeue();
        $retval = $task->run();

        if ($retval instanceof SystemCall) {
            $retval($task, $this);
            continue;
        }

        if ($task->isFinished()) {
            unset($this->taskMap[$task->getTaskId()]);
        } else {
            $this->schedule($task);
        }
    }
}

第一個系統呼叫除了返回任務ID外什麼都沒有做:

<?php
function getTaskId() {
    return new SystemCall(function(Task $task, Scheduler $scheduler) {
        $task->setSendValue($task->getTaskId());
        $scheduler->schedule($task);
    });
}

這個函式設定任務id為下一次傳送的值, 並再次排程了這個任務 .由於使用了系統呼叫, 所以排程器不能自動呼叫任務, 我們需要手工排程任務(稍後你將明白為什麼這麼做). 要使用這個新的系統呼叫的話, 我們要重新編寫以前的例子:

<?php
function task($max) {
    $tid = (yield getTaskId()); // <-- here's the syscall!
    for ($i = 1; $i <= $max; ++$i) {
        echo "This is task $tid iteration $i.\n";
        yield;
    }
}

$scheduler = new Scheduler;

$scheduler->newTask(task(10));
$scheduler->newTask(task(5));

$scheduler->run();
?>

這段程式碼將給出與前一個例子相同的輸出. 請注意系統呼叫如何同其他任何呼叫一樣正常地執行, 只不過預先增加了yield.

要建立新的任務, 然後再殺死它們的話, 需要兩個以上的系統呼叫:

<?php
function newTask(Generator $coroutine) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($coroutine) {
            $task->setSendValue($scheduler->newTask($coroutine));
            $scheduler->schedule($task);
        }
    );
}

function killTask($tid) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($tid) {
            $task->setSendValue($scheduler->killTask($tid));
            $scheduler->schedule($task);
        }
    );
}

killTask函式需要在排程器裡增加一個方法:

<?php
public function killTask($tid) {
    if (!isset($this->taskMap[$tid])) {
        return false;
    }

    unset($this->taskMap[$tid]);

    // This is a bit ugly and could be optimized so it does not have to walk the queue,
    // but assuming that killing tasks is rather rare I won't bother with it now
    foreach ($this->taskQueue as $i => $task) {
        if ($task->getTaskId() === $tid) {
            unset($this->taskQueue[$i]);
            break;
        }
    }

    return true;
}

用來測試新功能的微指令碼:

<?php
function childTask() {
    $tid = (yield getTaskId());
    while (true) {
        echo "Child task $tid still alive!\n";
        yield;
    }
}

function task() {
    $tid = (yield getTaskId());
    $childTid = (yield newTask(childTask()));

    for ($i = 1; $i <= 6; ++$i) {
        echo "Parent task $tid iteration $i.\n";
        yield;

        if ($i == 3) yield killTask($childTid);
    }
}

$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();
?>

這段程式碼將列印以下資訊:

Parent task 1 iteration 1.
Child task 2 still alive!
Parent task 1 iteration 2.
Child task 2 still alive!
Parent task 1 iteration 3.
Child task 2 still alive!
Parent task 1 iteration 4.
Parent task 1 iteration 5.
Parent task 1 iteration 6.

經過三次迭代以後子任務將被殺死, 因此這就是”Child is still alive”訊息結束的時候. 不過你要明白這還不是真正的父子關係. 因為在父任務結束後子任務仍然可以執行, 子任務甚至可以殺死父任務. 可以修改排程器使它具有更層級化的任務結構, 不過這個不是我們這個文章要繼續討論的範圍了.

現在你可以實現許多程序管理呼叫. 例如 wait(它一直等待到任務結束執行時), exec(它替代當前任務)和fork(它建立一個當前任務的克隆). fork非常酷,而 且你可以使用PHP的協程真正地實現它, 因為它們都支援克隆.

讓我們把這些留給有興趣的讀者吧,我們來看下一個議題.

非阻塞IO

很明顯, 我們的任務管理系統的真正很酷的應用應該是web伺服器. 它有一個任務是在套接字上偵聽是否有新連線, 當有新連線要建立的時候, 它建立一個新任務來處理新連線.

Web伺服器最難的部分通常是像讀資料這樣的套接字操作是阻塞的. 例如PHP將等待到客戶端完成傳送為止. 對一個Web伺服器來說, 這有點不太高效. 因為伺服器在一個時間點上只能處理一個連線.

解決方案是確保在真正對套接字讀寫之前該套接字已經“準備就緒”. 為了查詢哪個套接字已經準備好讀或者寫了, 可以使用 流選擇函式.

首先,讓我們新增兩個新的 syscall, 它們將等待直到指定socket 準備好:

<?php
function waitForRead($socket) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($socket) {
            $scheduler->waitForRead($socket, $task);
        }
    );
}

function waitForWrite($socket) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($socket) {
            $scheduler->waitForWrite($socket, $task);
        }
    );
}

這些 syscall 只是在排程器中代理其各自的方法:

<?php

// resourceID => [socket, tasks]
protected $waitingForRead = [];
protected $waitingForWrite = [];

public function waitForRead($socket, Task $task) {
    if (isset($this->waitingForRead[(int) $socket])) {
        $this->waitingForRead[(int) $socket][1][] = $task;
    } else {
        $this->waitingForRead[(int) $socket] = [$socket, [$task]];
    }
}

public function waitForWrite($socket, Task $task) {
    if (isset($this->waitingForWrite[(int) $socket])) {
        $this->waitingForWrite[(int) $socket][1][] = $task;
    } else {
        $this->waitingForWrite[(int) $socket] = [$socket, [$task]];
    }
}

waitingForRead 及 waitingForWrite 屬性是兩個承載等待的socket 及等待它們的任務的陣列. 有趣的部分在於下面的方法,它將檢查 socket 是否可用, 並重新安排各自任務:

<?php

protected function ioPoll($timeout) {
    $rSocks = [];
    foreach ($this->waitingForRead as list($socket)) {
        $rSocks[] = $socket;
    }

    $wSocks = [];
    foreach ($this->waitingForWrite as list($socket)) {
        $wSocks[] = $socket;
    }

    $eSocks = []; // dummy

    if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
        return;
    }

    foreach ($rSocks as $socket) {
        list(, $tasks) = $this->waitingForRead[(int) $socket];
        unset($this->waitingForRead[(int) $socket]);

        foreach ($tasks as $task) {
            $this->schedule($task);
        }
    }

    foreach ($wSocks as $socket) {
        list(, $tasks) = $this->waitingForWrite[(int) $socket];
        unset($this->waitingForWrite[(int) $socket]);

        foreach ($tasks as $task) {
            $this->schedule($task);
        }
    }
}

stream_select 函式接受承載讀取、寫入以及待檢查的socket的陣列(我們無需考慮最後一類). 陣列將按引用傳遞, 函式只會保留那些狀態改變了的陣列元素. 我們可以遍歷這些陣列, 並重新安排與之相關的任務.

為了正常地執行上面的輪詢動作, 我們將在排程器裡增加一個特殊的任務:

<?php
protected function ioPollTask() {
    while (true) {
        if ($this->taskQueue->isEmpty()) {
            $this->ioPoll(null);
        } else {
            $this->ioPoll(0);
        }
        yield;
    }
}
?>

需要在某個地方註冊這個任務, 例如, 你可以在run()方法的開始增加$this->newTask($this->ioPollTask()). 然後就像其他任務一樣每執行完整任務迴圈一次就執行輪詢操作一次(這麼做一定不是最好的方法), ioPollTask將使用0秒的超時來呼叫ioPoll, 也就是stream_select將立即返回(而不是等待).

只有任務佇列為空時,我們才使用null超時,這意味著它一直等到某個套介面準備就緒.如果我們沒有這麼做,那麼輪詢任務將一而再, 再而三的迴圈執行, 直到有新的連線建立. 這將導致100%的CPU利用率. 相反, 讓作業系統做這種等待會更有效.

現在編寫伺服器就相對容易了:

<?php

function server($port) {
    echo "Starting server at port $port...\n";

    $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
    if (!$socket) throw new Exception($errStr, $errNo);

    stream_set_blocking($socket, 0);

    while (true) {
        yield waitForRead($socket);
        $clientSocket = stream_socket_accept($socket, 0);
        yield newTask(handleClient($clientSocket));
    }
}

function handleClient($socket) {
    yield waitForRead($socket);
    $data = fread($socket, 8192);

    $msg = "Received following request:\n\n$data";
    $msgLength = strlen($msg);

    $response = <<<res
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;

    yield waitForWrite($socket);
    fwrite($socket, $response);

    fclose($socket);
}

$scheduler = new Scheduler;
$scheduler->newTask(server(8000));
$scheduler->run();

這段程式碼實現了接收localhost:8000上的連線, 然後返回傳送來的內容作為HTTP響應. 當然它還能處理真正的複雜HTTP請求, 上面的程式碼片段只是演示了一般性的概念.

你可以使用類似於ab -n 10000 -c 100 localhost:8000/這樣命令來測試伺服器. 這條命令將向伺服器傳送10000個請求, 並且其中100個請求將同時到達. 使用這樣的數目, 我得到了處於中間的10毫秒的響應時間. 不過還有一個問題:有少數幾個請求真正處理的很慢(如5秒), 這就是為什麼總吞吐量只有2000請求/秒(如果是10毫秒的響應時間的話, 總的吞吐量應該更像是10000請求/秒)

協程堆疊

如果你試圖用我們的排程系統建立更大的系統的話, 你將很快遇到問題:我們習慣了把程式碼分解為更小的函式, 然後呼叫它們. 然而, 如果使用了協程的話, 就不能這麼做了. 例如,看下面程式碼:

<?php
function echoTimes($msg, $max) {
    for ($i = 1; $i <= $max; ++$i) {
        echo "$msg iteration $i\n";
        yield;
    }
}

function task() {
    echoTimes('foo', 10); // print foo ten times
    echo "---\n";
    echoTimes('bar', 5); // print bar five times
    yield; // force it to be a coroutine
}

$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();

這段程式碼試圖把重複迴圈“輸出n次“的程式碼嵌入到一個獨立的協程裡,然後從主任務裡呼叫它. 然而它無法執行. 正如在這篇文章的開始所提到的, 呼叫生成器(或者協程)將沒有真正地做任何事情, 它僅僅返回一個物件.這 也出現在上面的例子裡:echoTimes呼叫除了放回一個(無用的)協程物件外不做任何事情.

為了仍然允許這麼做,我們需要在這個裸協程上寫一個小小的封裝.我們將呼叫它:“協程堆疊”. 因為它將管理巢狀的協程呼叫堆疊. 這將是通過生成協程來呼叫子協程成為可能:

$retval = (yield someCoroutine($foo, $bar));

使用yield,子協程也能再次返回值:

yield retval("I'm a return value!");

retval函式除了返回一個值的封裝外沒有做任何其他事情.這個封裝將表示它是一個返回值.

<?php

class CoroutineReturnValue {
    protected $value;

    public function __construct($value) {
        $this->value = $value;
    }

    public function getValue() {
        return $this->value;
    }
}

function retval($value) {
    return new CoroutineReturnValue($value);
}

為了把協程轉變為協程堆疊(它支援子呼叫),我們將不得不編寫另外一個函式(很明顯,它是另一個協程):

<?php

function stackedCoroutine(Generator $gen) {
    $stack = new SplStack;

    for (;;) {
        $value = $gen->current();

        if ($value instanceof Generator) {
            $stack->push($gen);
            $gen = $value;
            continue;
        }

        $isReturnValue = $value instanceof CoroutineReturnValue;
        if (!$gen->valid() || $isReturnValue) {
            if ($stack->isEmpty()) {
                return;
            }

            $gen = $stack->pop();
            $gen->send($isReturnValue ? $value->getValue() : NULL);
            continue;
        }

        $gen->send(yield $gen->key() => $value);
    }
}

這個函式在呼叫者和當前正在執行的子協程之間扮演著簡單代理的角色.在$gen->send(yield $gen->key()=>$value);這行完成了代理功能.另外它檢查返回值是否是生成器,萬一是生成器的話,它將開始執行這個生成器,並把前一個協程壓入堆疊裡.一旦它獲得了CoroutineReturnValue的話,它將再次請求堆疊彈出,然後繼續執行前一個協程.

為了使協程堆疊在任務裡可用,任務構造器裡的$this-coroutine =$coroutine;這行需要替代為$this->coroutine = StackedCoroutine($coroutine);.

現在我們可以稍微改進上面web伺服器例子:把wait+read(和wait+write和warit+accept)這樣的動作分組為函式.為了分組相關的 功能,我將使用下面類:

<?php

class CoSocket {
    protected $socket;

    public function __construct($socket) {
        $this->socket = $socket;
    }

    public function accept() {
        yield waitForRead($this->socket);
        yield retval(new CoSocket(stream_socket_accept($this->socket, 0)));
    }

    public function read($size) {
        yield waitForRead($this->socket);
        yield retval(fread($this->socket, $size));
    }

    public function write($string) {
        yield waitForWrite($this->socket);
        fwrite($this->socket, $string);
    }

    public function close() {
        @fclose($this->socket);
    }
}

現在伺服器可以編寫的稍微簡潔點了:

<?php

function server($port) {
    echo "Starting server at port $port...\n";

    $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
    if (!$socket) throw new Exception($errStr, $errNo);

    stream_set_blocking($socket, 0);

    $socket = new CoSocket($socket);
    while (true) {
        yield newTask(
            handleClient(yield $socket->accept())
        );
    }
}

function handleClient($socket) {
    $data = (yield $socket->read(8192));

    $msg = "Received following request:\n\n$data";
    $msgLength = strlen($msg);

    $response = <<<res
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;

    yield $socket->write($response);
    yield $socket->close();
}

錯誤處理

作為一個優秀的程式設計師, 相信你已經察覺到上面的例子缺少錯誤處理. 幾乎所有的 socket 都是易出錯的. 我沒有這樣做的原因一方面固然是因為錯誤處理的乏味(特別是 socket), 另一方面也在於它很容易使程式碼體積膨脹.

不過, 我仍然想講下常見的協程錯誤處理:協程允許使用 throw() 方法在其內部丟擲一個錯誤.

throw() 方法接受一個 Exception, 並將其丟擲到協程的當前懸掛點, 看看下面程式碼:

<?php
function gen() {
    echo "Foo\n";
    try {
        yield;
    } catch (Exception $e) {
        echo "Exception: {$e->getMessage()}\n";
    }
    echo "Bar\n";
}

$gen = gen();
$gen->rewind();                     // echos "Foo"
$gen->throw(new Exception('Test')); // echos "Exception: Test"
                                    // and "Bar"

這非常好, 有沒有? 因為我們現在可以使用系統呼叫以及子協程呼叫異常丟擲了.

不過我們要對系統呼叫Scheduler::run() 方法做一些小調整:

<?php
if ($retval instanceof SystemCall) {
    try {
        $retval($task, $this);
    } catch (Exception $e) {
        $task->setException($e);
        $this->schedule($task);
    }
    continue;
}

Task 類也要新增 throw 呼叫處理:

<?php
class Task {
    // ...
    protected $exception = null;

    public function setException($exception) {
        $this->exception = $exception;
    }

    public function run() {
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } elseif ($this->exception) {
            $retval = $this->coroutine->throw($this->exception);
            $this->exception = null;
            return $retval;
        } else {
            $retval = $this->coroutine->send($this->sendValue);
            $this->sendValue = null;
            return $retval;
        }
    }

    // ...
}

現在, 我們已經可以在系統呼叫中使用異常丟擲了!例如,要呼叫 killTask,讓我們在傳遞 ID 不可用時丟擲一個異常:

<?php
function killTask($tid) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($tid) {
            if ($scheduler->killTask($tid)) {
                $scheduler->schedule($task);
            } else {
                throw new InvalidArgumentException('Invalid task ID!');
            }
        }
    );
}

試試看:

<?php
function task() {
    try {
        yield killTask(500);
    } catch (Exception $e) {
        echo 'Tried to kill task 500 but failed: ', $e->getMessage(), "\n";
    }
}

這些程式碼現在尚不能正常運作,因為 stackedCoroutine 函式無法正確處理異常.要修復需要做些調整:

<?php
function stackedCoroutine(Generator $gen) {
    $stack = new SplStack;
    $exception = null;

    for (;;) {
        try {
            if ($exception) {
                $gen->throw($exception);
                $exception = null;
                continue;
            }

            $value = $gen->current();

            if ($value instanceof Generator) {
                $stack->push($gen);
                $gen = $value;
                continue;
            }

            $isReturnValue = $value instanceof CoroutineReturnValue;
            if (!$gen->valid() || $isReturnValue) {
                if ($stack->isEmpty()) {
                    return;
                }

                $gen = $stack->pop();
                $gen->send($isReturnValue ? $value->getValue() : NULL);
                continue;
            }

            try {
                $sendValue = (yield $gen->key() => $value);
            } catch (Exception $e) {
                $gen->throw($e);
                continue;
            }

            $gen->send($sendValue);
        } catch (Exception $e) {
            if ($stack->isEmpty()) {
                throw $e;
            }

            $gen = $stack->pop();
            $exception = $e;
        }
    }
}

結束語

在這篇文章裡,我使用多工協作構建了一個任務排程器, 其中包括執行“系統呼叫”, 做非阻塞操作和處理錯誤. 所有這些裡真正很酷的事情是任務的結果程式碼看起來完全同步, 甚至任務正在執行大量的非同步操作的時候也是這樣.

如果你打算從套介面讀取資料的話, 你將不需要傳遞某個回撥函式或者註冊一個事件偵聽器. 相反, 你只要書寫yield $socket->read(). 這兒大部分都是你常常也要編寫的,只 在它的前面增加yield.

當我第一次聽到協程的時候, 我發現這個概念完全令人折服, 正是因為這個激勵我在PHP中實現了它. 同時我發現協程真正非常的令人驚歎:在令人敬畏的程式碼和一大堆亂程式碼之間只有一線之隔, 我認為協程恰好處在這條線上, 不多不少. 不過, 要說使用上面所述的方法書寫非同步程式碼是否真的有益, 這個就見仁見智了.

但, 不管咋樣, 我認為這是一個有趣的話題, 而且我希望你也能找到它的樂趣. 歡迎評論:)