1. 程式人生 > >Java Reactive 非同步與併發程式設計

Java Reactive 非同步與併發程式設計

Java Reactive 非同步與併發程式設計

【摘要】Reactive 程式設計在多數人眼中是非同步、併發的“銀彈/神器”。本文分析了Reactive 執行原理,說明 Reactive 程式設計是資料驅動的,而不是“事件”驅動的。Reactive 程式設計分為資料來源準備、資料流建模、排程者分配三個基本設計步驟,才能實現非同步併發執行。最後,我們給出基於資料流圖的計算模型的設計與程式設計方案。

大資料和雲端計算(雲服務)讓 Reactive 程式設計成為新一代的程式設計神器。儘管 Reactive 程式設計模型大大簡化了非同步與併發程式設計的難度,但絕不是低門檻的。它首先需要你改變傳統順序處理的計算模式,建立面向資料流的計算模型;然後,需要有強大的執行緒、協程等併發知識,才能編寫出 safe 的應用;再者還需要一些函數語言程式設計的知識,如 Lambda、閉包等。本文努力描述響應式程式設計需要的最基礎的知識,並用一些案例,讓你體驗 Reactive 程式設計的神奇與優雅。

1、準備知識

Java Reactive 程式設計使用 RxJava 庫,雖然可以相容 Java 5 程式設計,但你會失去 Java 8 給你帶來的便利,如 Lambda 表示式,CompleteableFuture 等非同步特性的支援。關鍵是沒有 Lambda 函式,Reactive Java 程式幾乎無法閱讀!

1.1 程式設計環境配置

1. 專案檔案配置

maven 配置檔案 pom.xml 需要

    <build>
        <plugins>
            <plugin>
                <groupId
>
org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF8</encoding> </configuration> </plugin> </plugins> </build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2. IDEA 設定

Intellij IDEA 要設定三處位置,且要求版本一致,否則就有出錯提示,例如:“source Release 8 Requires Target Release 1.8…”

  • File -> Project Structure -> Project
  • File -> Project Structure -> Modules
  • File -> Setting -> Build -> Compiler -> Java Compiler

設定如圖

1.2 Lambda 表示式

現在,不支援 Lambda 表示式的語言真不多。 在 Java 中,它主要作為單一方法的介面匿名實現。例如:

public class TestLambda {
    public static void main(String[] args) {

        System.out.println("=== RunnableTest ===");

        // Anonymous Runnable
        Runnable r1 = new Runnable() {
            @Override
            public void run() {
                System.out.println("Hello world one!");
            }
        };

        // Lambda Runnable
        Runnable r2 = () -> System.out.println("Hello world two!");

        // Run em!
        r1.run();
        r2.run();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

Lambda 表示式 的語法,例如:

Argument List Arrow Token Body
(int x, int y) -> x + y

官方教程: Java SE 8: Lambda Quick Start

2、Future<V> 與多執行緒程式設計

Future<V> 是一個泛型介面,如果一個可執行的函式(實現 Callable 或 Runable 的類)在一個執行緒中執行,利用 Future<V> 可以用它的 get() 方法返回 V 型別的結果。 注意, get() 會阻塞當前執行緒。例如:

public class TestFuture {

    // https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html
    static ExecutorService executor = Executors.newCachedThreadPool();

    public void testTaskRunning(String name, Integer t) {
        System.out.println("Prepare for execution:" + name);
        long startTime = System.currentTimeMillis(); //獲取開始時間

        // using lambda may cause 10X time then Callable
//        Future<String> fa = executor.submit(
//                new Callable<String>() {
//                    @Override
//                    public String call() throws Exception {
//                        try {
//                            Thread.sleep(t);
//                        } catch (Exception e) {
//                            e.printStackTrace();
//                        }
//                        return String.format("service exec time: %d", t);
//                    }
//                }
//        );

        Future<String> fa =  executor.submit(
                () -> {
                        try {
                            Thread.sleep(t);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        return String.format("service exec time: %d", t);
                });

        long endTime = System.currentTimeMillis();
        System.out.println("Start execute: " + (endTime - startTime) + "ms");

        try {
            String s = fa.get(); //Future to Blocked
            System.out.println(s);
        } catch (
                Exception e)
        {
            e.printStackTrace();
        }

        endTime = System.currentTimeMillis(); //
        System.out.println("End execute: " + (endTime - startTime) + "ms");

    }

    public void testAsyncTaskRunning() {
        System.out.println("Prepare for execution: composite task" );
        long startTime = System.currentTimeMillis(); //獲取開始時間

        Future<String> fa = executor.submit(new TimeConsumingService("fa",200,new String[]{}));
        Future<String> fb = executor.submit(new TimeConsumingService("fb",400,new String[]{}));

        System.out.println("Start execute: " + (System.currentTimeMillis() - startTime) + "ms");

        try {
            // What will happen when change line fc and fd ?
            Future<String> fc = executor.submit(new TimeConsumingService("fc",400,new String[]{fa.get()}));
            Future<String> fd = executor.submit(new TimeConsumingService("fd",200,new String[]{fb.get()}));
            Future<String> fe = executor.submit(new TimeConsumingService("fe",200,new String[]{fb.get()}));
            fc.get(); fd.get(); fe.get();
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("End execute: " + (System.currentTimeMillis() - startTime) + "ms");
    }

    public static void main(String[] args) {
        TestFuture test = new TestFuture();
        test.testTaskRunning("fa", 300);
        //test.testAsyncTaskRunning();
        System.out.println("sssssssssssssssssss");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • static ExecutorService executor = Executors.newCachedThreadPool(); 
    • Executors 返回一個執行緒模型,用於發射執行緒; 
      • newCachedThreadPool() 建立快取執行緒池,常用於管理 IO 阻塞型執行緒
      • newFixedThreadPool(int nThreads) 建立管理計算需求的執行緒池,n=cpu*2;
      • newSingleThreadExecutor() 建立單個執行緒
      • … …
    • ExecutorService 管理髮射(啟動)、停止執行緒等操作 
      • submit 用於將實現 Callable 或 Runable 介面的物件,啟動執行緒;
      • 它返回 Future<V> 介面,方便管理執行緒和獲取結果
  • testTaskRunning() 
    • executor.submit 啟動 Callable 介面實現,或 Lambda 函式為一個執行緒 
      • Lambda 準備需要遠常於介面的時間
  • testAsyncTaskRunning() 
    • 線上程池中執行一群耗時的函式(實現 Callable的類)
    • TimeConsumingService 是模擬服務物件,它有名字、計算時間。程式碼見後面

問題

  1. get() 順序會影響出結果時間,關鍵 get 的阻塞;
  2. 如果能按這些執行緒出結果的時間序列,把資料結果交給後面的執行緒並行加工處理,CPU就不用阻塞在 get() 了。但程式設計無疑會很複雜。

3、Reactive(響應式)程式設計

大到大資料處理(如spark),小到 android介面事件流的處理,Reactive(響應式)程式設計以成為最流行的標配。然而它是如何工作的呢? 如何正確編寫 Reactive 程式?

3.1 響應式程式設計基本原理

響應式程式設計是一種按流水線模式組織處理資料流的協程計算模型(實在找不到合適的定義)。一個流水線有輸入源、工人(函式)、流水結構、排程員、輸出構成。假設一個簡單的線性(順序處理)流水線,如圖:

(s0--S1----s2-s3---|)  [alpha]  ...  [beta]  ...  [theta]  (o0---o2------o2o4-)
                      |                                   |
                      |                                   |
                      +------------scheduler--------------+
  • 1
  • 2
  • 3
  • 4

其中 :

*(d0–d1—–d2–|)表示帶終止標誌的時序資料流; 
* [worker] 表示一個操作函式,它只能從前道工序取資料,加工處理後交給下一個 worker; 
* … 表示兩個工人之間的存放資料的快取佇列(EndPoint); 
* scheduler 是管理工人幹活的排程員,一個排程員能僅能一次指揮一個工人,且工人一次只處理一個數據,要麼成功,要麼失敗(異常)。

例如:

    public static void hello(Integer... integers) {
        Observable<Integer> workflow = Observable.from(integers)
                .filter( i -> (i < 10) && (i > 0))
                .map( i -> i *2);
        workflow.subscribe(i -> System.out.print(i + "! ") );
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

這是典型的鏈式函式式處理計算模式。輸入與輸出分別是:

hello(-1,4,5,0,2,19,6);

8! 10! 4! 12! 
  • 1
  • 2
  • 3

關鍵不是結果,而是處理過程。第一條語句僅是搭建了一個框架,聲明瞭資料來源和兩個操作工人 filter,map,它們各自使用定義的函式幹活。最重要的工人(subscribe)出場了,它開動了流水線, scheduler 開始指揮工人幹活,注意工人輪流上場(不是並行),直到遇到結果符號且每個工人無活可幹,流水線關閉。

這個程式是單執行緒程式(一個 scheduler),但是 filter,map,scheduler 幹活的順序是不可預知的,如果他們取資料的 EndPoint有資料,則可以被 scheduler 排程的。一個工人被排程一次,則是獲得了CPU,直到函式操作完成,釋放 CPU。這是標準的協程(Coroutine)概念。

響應式程式設計意思就是根據 EndPoint 中的資料啟動對應資料處理函式,是函式之間非同步執行(鏈式反應)的過程,這對於減少程式阻塞、減低執行緒開銷,特別是不支援多執行緒的nodejs,javascript,python等具有特別的意義。

因此,Reactive 程式設計是資料驅動的程式設計,在資料驅動的模型上建立併發處理機制,需要引入多個排程者。

千萬不要認為響應式程式設計是併發的,建議你有時間時, 務必仔細閱讀 RxJava Threading Examples 。理論上,Reactive 是資料驅動的,而不是事件驅動的。

響應式程式設計不是真正意義上的併發,由於每個排程器是一個執行緒,它管理的操作函式之間一般都不需要對資料上鎖,這些函式是按資料驅動,“併發執行”的。

Reactive 要點

  1. Reactive 程式設計開始是流程建模過程,而不是資料處理過程
  2. 工作流由 subscribe 啟動,subscribe 多次就會導致多次啟動流程。這與普通語言按順序執行不一樣
  3. scheduler 是一個單執行緒的排程器,每個scheduler中管理的函式都是順序執行的,阻塞函式會阻塞流水線工作。
  4. 工作函式是響應式的,觀察者模式僅是實現資料驅動的技術概念,每個資料流,就稱為 “Observable” 的物件。所以,到了 2.x 就改稱 “Flowable” 了

3.2 RxJava 入門

RxJava 是 Reactive eXtend for Java VM 的縮寫。Reactive eXtend 支援許多語言,支援 Groovy, Clojure, JRuby, Kotlin 和 Scala 等現代語言。官方網站: 
https://github.com/ReactiveX/RxJava/wiki

1. Maven 依賴

        <dependency>
            <groupId>io.reactivex</groupId>
            <artifactId>rxjava</artifactId>
            <version>1.2.9</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

個人認為 RxJava 1.x 比較合適入門,2.x 抽象程度高,較難入門。

2. Hello world

    public static void hello(String... names) {
        Observable.from(names).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("Completed!");
            }

            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void onNext(String strings) {
                System.out.println("same hello " + strings);
            }

        });
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

程式設計步驟

  1. 響應式程式設計第一步就是建立資料來源。簡單就是 Observable.from 從一個集合中取數,或者 Observable.just 直接取數,例如 Observable.just(1,2);
  2. 用操作函式搭建資料流處理流水線(workflow),即使用操作資料變換(transform) 定義工作流。官方有幾百個操作或函式???,學習難度有點大!!!
  3. 最後操作 subscribe,啟動流程,輸出資料。

subscribe支援3個函式,onNext 收到一個結果,onError 收到一個錯誤,onCompleted 流程結束。

3.3 排程器與執行緒

  • 排程器(Scheduler)物件:操作排程的執行緒。
  • 排程器工作的執行緒模型: 
    • Scheduler.io() 建立排程器用於 IO 阻塞的工作,使用快取執行緒池;
    • Scheduler.computation() 建立排程器用於耗時計算,使用固定執行緒池;
    • Scheduler.from(java.util.concurrent.Executor executor) 使用已有執行緒池;

官方案例是:

import io.reactivex.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

這是一個2.x的例子。

  • Flowable.fromCallable 建立一個耗時的源。
  • subscribeOn(Schedulers.io()) 源開始使用執行緒池中的這個排程器
  • observeOn(Schedulers.single()) 下面的操作使用這個排程器

注:由於排程器在後臺,沒有最後一句,你將等不到任何輸出主執行緒就結束了。在實際應用中,如android介面主執行緒、web服務非同步執行緒一般不會結果,而是等你 subscribe 結果。

案例研究:假設你有100個URL,要從網站爬這些URL資料,這樣程式設計可以嗎

    Observable.from(Urls)
        .observeOn(Schedulers.io())
        .map(url -> readFileFromUrl(url))
        .observeOn(Schedulers.computation())
        .flatMap(doc -> process(doc))
        .observeOn(Schedulers.single())
        .subscribe(out -> output(out));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

問題

  • 這裡有幾個 Scheduler 執行緒呢?
  • 每個執行緒排程擁有那些計算函式呢?

參考:理解RxJava的執行緒模型

Reactive 併發程式設計要點

  1. 在資料流中,設計與管理 Scheduler 的數量與管理範圍,是 RxJava 併發的關鍵
  2. 在資料流中,有當前 任務排程(執行緒) 概念,儘管執行緒是隱式的
  3. subscribeOn(Scheduler) 定義最近資料來源的排程者,因此一個源頭一個就夠了,多了也沒有意義。
  4. observeOn(Schedule) 定義以後任務的排程者。
  5. 並行分為函式處理(單執行緒)級別的併發,和排程者(多執行緒)級別的併發

3.4. RxJava 併發程式設計設計

案例研究:非同步任務的依賴

假設我們的程式需要五個 micro-service 協作完成計算任務,這些 micro-services 之間存在資料依賴關係:

micro-services-deps

為了實驗方面,我們構造了實現 Callable 的類 TimeConsumingService:

public class TimeConsumingService implements Callable<String> {

    private String service_name;
    private int wait_ms;

    public TimeConsumingService(String name, Integer waiting, String[] depandencies) {
        this.service_name = name;
        this.wait_ms = waiting;
    }

    @Override
    public String call() throws Exception {
                Thread.sleep(wait_ms);
                return String.format("service %s exec time is: %d", service_name,wait_ms);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

為了確保這些函式能併發執行,要點就是要構造足夠執行緒,讓沒有依賴關係的服務在不同執行緒中執行。這裡我們採用

join 設計方法

  • 畫出資料流圖;
  • 選擇流程圖上的流程歸併節點;
  • 為每條歸併點的一條執行路徑設計一個排程者(執行緒);
  • 在歸併點 merge 這些路徑的流。

程式碼如下:

    public void testAsyncCompositeJoin() {
        System.out.println("Prepare for execution:Async Composite Join");
        long startTime = System.currentTimeMillis(); //獲取開始時間

        // Tasks oa -> oc,  both in the same thread 1.
        Observable<String> oa = Observable.just("oa").observeOn(Schedulers.io()).flatMap(
                soa -> Observable.fromCallable(new TimeConsumingService("fa", 1000, new String[]{}))
        );
        Observable<String> oc = oa.flatMap(
                (String res) -> {
                    System.out.println(res);
                    System.out.println("Executed At: " + (System.currentTimeMillis() - startTime) + "ms");
                    return Observable.fromCallable(
                            new TimeConsumingService("fc", 2000, new String[]{res}));
                });

        // tasks ob -> (od,oe),  ob, od, oe have special thread 2,3,4.
        Observable<String> ob = Observable.just("ob").observeOn(Schedulers.io()).flatMap(
                sob -> Observable.fromCallable(new TimeConsumingService("fb", 2000, new String[]{}))
        );
        Observable<String> od_oe = ob.flatMap(
                (String res) -> {
                    System.out.println(res);
                    System.out.println("Executed At: " + (System.currentTimeMillis() - startTime) + "ms");
                    Observable<String> od = Observable.just("od").observeOn(Schedulers.io()).flatMap(
                            sod -> Observable.fromCallable(new TimeConsumingService("fd", 1000, new String[]{res}))
                    );
                    Observable<String> oe = Observable.just("oe").observeOn(Schedulers.io()).flatMap(
                            sod -> Observable.fromCallable(new TimeConsumingService("fe", 1000, new String[]{res}))
                    );
                    return Observable.merge(od, oe);
                });

        System.out.println("Observable build: " + (System.currentTimeMillis() - startTime) + "ms");

        // tasks join oc,(od_oe) and subscribe
        Observable.merge(oc, od_oe).toBlocking().subscribe(
                (res) -> {
                    System.out.println(res);
                    System.out.println("Executed At: " + (System.currentTimeMillis() - startTime) + "ms");
                });

        System.out.println("End executed: " + (System.currentTimeMillis() - startTime) + "ms");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

注意:上述程式中既有 lambda 函式,也有 lambda 表示式。區別是前者需要 return,而後者不需要。

啟動程式: