1. 程式人生 > >Reactive(2) 響應式流與制奶廠業務

Reactive(2) 響應式流與制奶廠業務

目錄

  • 再談響應式
    • 為什麼Web後端開發的,對 Reactive 沒有感覺
  • Java 9 支援的 Reactive Stream
  • 範例
  • 小結
  • 擴充套件閱讀

再談響應式

在前一篇文章從Reactive程式設計到“好萊塢”中,談到了響應式的一些概念,講的有些發散。 但僅僅還是停留在概念的層面,對於實戰性的東西並沒有涉及。
所以大家看了後,或許還是有些不痛不癢。

響應式程式設計強調的是非同步化、面向流的處理方式,這兩者也並非憑空生出,而是從大量的技術實踐中總結提煉出來的概念,就比如:

  • 我們談非同步化,容易聯想到 Java 非同步IO(Asynchronized IO),而且習慣於將其和 BIO、NIO等概念來做對比。 殊不知,老早出現的 Swing 框架(Java UI)就已經將非同步化思維玩的很溜了,不信的可以看看其內部 Observer模式(觀察者)的實現。

  • 我們談流式處理,容易聯想到 時下當紅的 Flink框架。 但幾乎所有的大資料分析、批處理應用都是基於流式進行處理的,比如 ETL,甚至是一個最簡單的 Map Reduce 作業。

為什麼Web後端開發的,對 Reactive 沒有感覺

除了前端,Reactive 概念在大資料領域的應用其實非常的廣泛了。 但是對於大多數做 Web 後端開發的人來說或許普及程度並不高,以筆者自身的感受是,碼了這麼些年頭,除了做好程式碼分層之外,似乎也沒有見到 Reactive可以發揮重大作用的地方。 原因就在於,在Web 後端開發領域基本是依託 HTTP協議機制實現的,這是一個相當簡單的 請求 -> 應答 互動模式,客戶端在傳送請求後,會一直等待結果返回,也就是結果的通知是由客戶端主動獲取而非非同步通知的,因此並不是 Reactive 的風格。 但這已經是符合使用者一貫的使用方式了,絕大多數情況下並不需要做什麼樣的變化,此時我們對響應式的感知並不深刻。

更符合Reactive 的另外一個場景是 富客戶端(Rich Application),假設在需要大量複雜的前端互動的場景下,我們可以選擇將一些邏輯放在前端程式碼中實現。 此時的 Web 互動就不再是整個頁面的重新整理,而是演變為客戶端與服務端的"實時"雙向通訊,這類應用也比較普遍了,比如基於 WebSocket 實現的 聊天應用、小遊戲等等。

淺顯的從趨勢上看, Reactive 的前景還是很明朗的,這裡並不是說因為現在多數流行的程式語言中都有它的影子(比如提供了Rx風格的框架)。
而是未來的大資料處理、實時流計算會成為主流,這是環境決定的。 而這時 Reactive 這種"面向流"的程式設計模式無疑是很合適的。

Java 9 支援的 Reactive Stream

Java 平臺直到 JDK 9 才提供了對於 Reactive 的完整支援,而在此之前的JDK版本中,也以及存在一些有關聯性的API,比如:

  • Future 和 CompletableFuture介面,用於實現非同步計算。 後者較前者則是完善了非同步結果通知、任務序列等特性。
  • Stream 介面,可以將傳統的集合轉換為"流"的方式進行處理,比如迭代、對映轉換。

這些關聯性API 並不是完整的 Reactive,Java 9所支援的 Reactive Stream API 來自於2013年的響應式流規範(Reactive Stream Specification)。

https://www.reactive-streams.org/

基於這個規範中主要定義了下面幾個介面:

Java的響應式流介面統一定義在 java.util.concurrent.Flow介面

  • Publisher
    即資料的釋出者。 Publisher 介面定義了一個subscribe方法,用於新增訂閱者:

  • Subscriber
    指資料的訂閱者。 Subscriber 介面定義了4個方法,用於針對不同的事件作出響應。

首先,在subscribe方法呼叫成功後,Subscriber的 onSubscribe(Subscription s) 方法會被觸發(Subscription 表示當前的訂閱關係)。
此後,正常可以繼續呼叫 Subscription 的 request(long n) 方法來向釋出者請求資料,n是指最大的資料條目數。

釋出者會產生3種不同的訊息,分別對應到 Subscriber 的3個回撥方法:

資料訊息:對應 onNext 方法,表示釋出者產生的資料。
錯誤訊息:對應 onError 方法,表示釋出者產生了錯誤。
結束訊息:對應 onComplete 方法,表示釋出者已經完成了所有資料的釋出。

在上面的3種通知中,錯誤、結束訊息都表示當前的流已經到達了終點,後面不再會有訊息產生。

  • Subscription
    Subscription 表示的是一個訂閱關係。 可以通過該物件請求資料(request方法),或者取消訂閱(cancel方法)。

  • Processor
    Processor 表示的一種特殊的物件,既是生產者,又是訂閱者。

負壓的支援

負壓是響應式流定義的一種重要的能力,在上述的介面中,實質上已經提供了負壓的支援。
Publisher 只有在收到請求之後,才會產生資料。 這就保證了 Subscriber 可以根據自己的處理能力,確定要向 Publisher 請求的資料量,以此保證自身不會被沖垮。

範例

下面,以一個簡單的程式碼示例來演示 Reactive Stream API 是如何使用的。

以某一個制奶廠為例,為了提高營收,工廠推出了一個廠家直銷的業務。 顧客可以直接向廠方訂購一定天數的奶製品,每天則是由工廠的服務人員送奶上門。
為了模擬這個場景,我們實現的程式碼如下:

  1. 制奶廠,一個Publisher實現:
public class MilkFactory extends SubmissionPublisher<String> {

    private final ScheduledFuture<?> periodicTask;
    private final ScheduledExecutorService scheduler;

    private static final List<String> milks = Arrays.asList("益力多", "酸牛奶", "原味奶", "低脂蛋奶", "羊奶", "甜牛奶");

    public MilkFactory() {
        super();
        //初始化定時器
        scheduler = new ScheduledThreadPoolExecutor(1);

        //每一天生產完牛奶並推送給消費者
        periodicTask = scheduler.scheduleAtFixedRate(
                () -> submit(produceMilk()), 0, 1, TimeUnit.SECONDS);
    }

    //隨機生產牛奶
    private String produceMilk() {
        return milks.get((int) (Math.random() * milks.size()));
    }

    //關閉流
    public void close() {
        periodicTask.cancel(false);
        scheduler.shutdown();
        super.close();
    }
}

MilkFactory 整合自SubmissionPublisher(一個提供緩衝的Publisher實現),其內部會啟動一個定時器,用於模擬每天給使用者發放生產的牛奶。
通過submit()方法可以將資料推送給使用者。

  1. 顧客,一個Subscriber實現:
public class MilkCustomer implements Flow.Subscriber<String> {
    private Flow.Subscription subscription;
    private AtomicInteger available = new AtomicInteger(0);
    private int dayCount;

    public MilkCustomer(int dayCount) {
         this.dayCount = dayCount;
    }
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        //設定總量
        available.set(dayCount);

        //第一天
        subscription.request(1);
    }

    @Override
    public void onNext(String milk) {
        System.out.println("今天的牛奶到了: " + milk);

        //如果還有存量,繼續請求
        if(available.decrementAndGet() > 0){
            subscription.request(1);
        }else{
            System.out.println("牛奶套餐已經派完,歡迎繼續訂購");
            this.subscription.cancel();
        }
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("closed.");
    }
}

MilkCustomer 接受一個dayCount入參,即表示訂購的數量,在首次訂閱時會請求第一天的奶品,此後則每次收到到奶品後再請求下一天的,直到將總量消費完。

  1. 測試程式

執行下面的程式碼:

MilkFactory factory = new MilkFactory();

//訂閱1周
MilkCustomer customer = new MilkCustomer(7);

factory.subscribe(customer);

輸出:

今天的牛奶到了: 酸牛奶
今天的牛奶到了: 羊奶
今天的牛奶到了: 原味奶
牛奶套餐已經派完,歡迎繼續訂購

小結

在上例中,我們使用 Java 提供的 Reactive Stream API 實現了一個"送奶上門" 的業務流。
整個過程相對是比較簡單的,最關鍵的地方就在於對流式處理以及訂閱關係的理解。 然而目前的 Reactive 實現還沒有完全的統一,比如 Spring WebFlux(SpringBoot 2支援) 仍然是基於 Reactor 私有API而不是 Reactive Stream API 來構建的,後面有機會再做下介紹。

擴充套件閱讀

關於Future和CompletableFuture的區別
https://juejin.im/post/5adbf8226fb9a07aac240a67