1. 程式人生 > 實用技巧 >Java的協程Quasar

Java的協程Quasar

協程可以通過yield 來呼叫其它協程,接下來的每次協程被呼叫時,從協程上次yield返回的位置接著執行,通過yield方式轉移執行權的協程之間不是呼叫者與被呼叫者的關係,而是彼此對稱、平等的。

協程vs函式

函式可以呼叫其他函式,呼叫者等待被呼叫者結束後繼續執行,因此函式的生命期遵循後進先出,即最後一個被呼叫的函式最先結束返回。協程的生命期完全由對它們的使用需要來決定。
函式的起始處是惟一的入口點,每當函式被呼叫時,執行都從被呼叫函式的起始處開始。協程可以有多個入口點,協程的起始處是第一個入口點,每個yield返回出口點都是再次被呼叫執行時的入口點。
函式只在結束時一次性的返回全部結果值。協程可以在yield時不呼叫其他協程,而是每次返回一部分的結果值,這種協程常稱為生成器或迭代器。

現代的指令集架構通常提供對呼叫棧的指令支援,便於實現可遞迴呼叫的函式。在以Scheme為代表的提供續體的語言環境下,可用此控制狀態抽象表示來實現協程。

函式可以看作是特定狀況的協程,任何函式都可轉寫為不呼叫yield的協程

協程vs執行緒

對比使用多執行緒來解決IO阻塞任務,使用協程的好處是不用加鎖,訪問共享的資料不用進行同步操作。

使用協程之所以不需要加鎖不是因為所有的協程只在一個執行緒中執行,而是因為協程的非搶佔式的特點。也就是說,協程在沒主動交出CPU之前都是不會被突然切換到其它協程上。而執行緒是搶佔式的,使用多執行緒你不能確定執行緒什麼時候被作業系統排程,什麼時候被切換,因此需要用鎖到實現一種“原子操作”的語義。

協程vs非同步回撥

常見的做法是使用非阻塞的IO(比如是非同步IO,又或者是在syscall上自己實現的一套非同步IO,如asio)並且將處理操作寫在回撥函式中。這樣的做法一般沒什麼問題,但當回撥函式變多,一段連貫的業務程式碼就會被拆分到多個回撥函式之中,增加維護的成本。因此使用協程可以用同步的寫法寫出效果相當於是非同步的程式碼。

在Java中通過Quasar庫實現協程

通過一個channel, 將生成的資料推送給處理者, 這個流程是可以多級串聯的, 達到生成和處理交叉進行的效果.

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.channels.Channel;
import co.paralleluniverse.strands.channels.Channels;

import java.util.concurrent.ExecutionException;

public class FiberExample {
    private static void printer(Channel<Integer> in) throws SuspendExecution,  InterruptedException {
        Integer v;
        while ((v = in.receive()) != null) {
            System.out.println("<< " + v);
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {
        //定義兩個Channel
        Channel<Integer> naturals = Channels.newChannel(1024, Channels.OverflowPolicy.BLOCK, true, true);
        Channel<Integer> squares = Channels.newChannel(1024, Channels.OverflowPolicy.BLOCK, true, true);

        //執行兩個Fiber實現.
        new Fiber<>(() -> {
            for (int i = 0; i < 1000; i++) {
                System.out.println(">> " + i);
                naturals.send(i);
            }
            naturals.close();
        }).start();

        new Fiber<>(() -> {
            while (!naturals.isClosed()) {
                Integer v = naturals.receive();
                System.out.println("< " + v);
                squares.send(v * v);
            }
            System.out.println("Stopped receiving messages");
            squares.close();
        }).start();

        System.out.println("Reached printer");
        printer(squares);
    }
}