1. 程式人生 > >Reactive Stream 響應式流

Reactive Stream 響應式流

rac tac 動態切換 速度 限制 class 延遲 auth 靜態

初識Reactive Stream

Reactive Stream (響應式流/反應流) 是JDK9引入的一套標準,是一套基於發布/訂閱模式的數據處理規範。響應式流從2013年開始,作為提供非阻塞背壓的異步流處理標準的倡議。 它旨在解決處理元素流的問題——如何將元素流從發布者傳遞到訂閱者,而不需要發布者阻塞,或訂閱者有無限制的緩沖區或丟棄。更確切地說,Reactive流目的是“找到最小的一組接口,方法和協議,用來描述必要的操作和實體以實現這樣的目標:以非阻塞背壓方式實現數據的異步流”。

“背壓(反壓)back pressure”概念很關鍵。首先異步消費者會向生產者訂閱接收消息,然後當有新的信息可用時,消費者會通過之前訂閱時提供的回調函數被再次激活調用。如果生產者發出的信息比消費者能夠處理消息最大量還要多,消費者可能會被迫一直在抓消息,耗費越來越多的資源,埋下潛在的崩潰風險。為了防止這一點,需要有一種機制使消費者可以通知生產者,降低消息的生成速度。生產者可以采用多種策略來實現這一要求,這種機制稱為背壓。

響應式流模型非常簡單——訂閱者向發布者發送多個元素的異步請求,發布者向訂閱者異步發送多個或稍少的元素。響應式流會在pull模型和push模型流處理機制之間動態切換。 當訂閱者較慢時,它使用pull模型,當訂閱者更快時使用push模型。

簡單來說,在響應式流下訂閱者可以與發布者溝通,如果使用JMS就應該知道,訂閱者只能被動接收發布者所產生的消息數據。這就好比沒有水龍頭的水管一樣,我只能被動接收水管裏流過來的水,無法關閉也無法減少。而響應式流就相當於給水管加了個水龍頭,在消費者這邊可以控制水流的增加、減少及關閉。

響應式流模型圖:
技術分享圖片

發布者(Publisher)是潛在的無限數量的有序元素的生產者。發布者可能有多個來自訂閱者的待處理請求。

  • 根據收到的要求向當前訂閱者發布(或發送)元素。

訂閱者(Subscriber)從發布者那裏訂閱並接收元素。訂閱者可以請求更多的元素。

  • 發布者向訂閱者發送訂閱令牌(Subscription)。
  • 使用訂閱令牌,訂閱者從發布者那裏請求多個元素。
  • 當元素準備就緒時,發布者向訂閱者發送多個或更少的元素。

Reactive Stream主要接口

JDK9 通過java.util.concurrent.Flow 和java.util.concurrent.SubmissionPublisher 類來實現響應式流。在JDK9裏Reactive Stream的主要接口聲明在Flow類裏,Flow 類中定義了四個嵌套的靜態接口,用於建立流量控制的組件,發布者在其中生成一個或多個供訂閱者使用的數據項:

  • Publisher:數據項發布者、生產者
  • Subscriber:數據項訂閱者、消費者
  • Subscription:發布者與訂閱者之間的關系紐帶,訂閱令牌
  • Processor:數據處理器

Flow類結構如下:
技術分享圖片

Publisher是能夠發出元素的發布者,Subscriber是接收元素並做出響應的訂閱者。當執行Publisher裏的subscribe方法時,發布者會回調訂閱者的onSubscribe方法,這個方法中,通常訂閱者會借助傳入的Subscription向發布者請求n個數據。然後發布者通過不斷調用訂閱者的onNext方法向訂閱者發出最多n個數據。如果數據全部發完,則會調用onComplete告知訂閱者流已經發完;如果有錯誤發生,則通過onError發出錯誤數據,同樣也會終止流。

其中,Subscription相當於是連接PublisherSubscriber的“紐帶”。因為當發布者調用subscribe方法註冊訂閱者時,會通過訂閱者的回調方法onSubscribe傳入Subscription對象,之後訂閱者就可以使用這個Subscription對象的request方法向發布者“要”數據了。背壓機制正是基於此來實現的。

如下圖:
技術分享圖片

Processor則是集PublisherSubscriber於一身,相當於是發布者與訂閱者之間的一個”中間人“,可以通過Processor進行一些中間操作:

/**
 * A component that acts as both a Subscriber and Publisher.
 *
 * @param <T> the subscribed item type
 * @param <R> the published item type
 */
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

如下圖:
技術分享圖片

參考:

https://blog.csdn.net/rickiyeat/article/details/78175962


響應流使用示例

1.以下代碼簡單演示了SubmissionPublisher 和這套發布-訂閱框架的基本使用方式:

package com.example.demo;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

/**
 * @program: demo
 * @description: Flow Demo
 * @author: 01
 * @create: 2018-10-04 13:25
 **/
public class FlowDemo {

    public static void main(String[] args) throws Exception {
        // 1. 定義發布者, 發布的數據類型是 Integer
        // 直接使用jdk自帶的SubmissionPublisher, 它實現了 Publisher 接口
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<>();

        // 2. 定義訂閱者
        Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                // 保存訂閱關系, 需要用它來給發布者響應
                this.subscription = subscription;

                // 請求一個數據
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                // 接受到一個數據, 處理
                System.out.println("接受到數據: " + item);

                // 處理完調用request再請求一個數據
                this.subscription.request(1);

                // 或者已經達到了目標, 可以調用cancel告訴發布者不再接受數據了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現了異常(例如處理數據的時候產生了異常)
                throwable.printStackTrace();

                // 我們可以告訴發布者, 後面不接受數據了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部數據處理完了(發布者關閉了)
                System.out.println("處理完了!");
            }

        };

        // 3. 發布者和訂閱者 建立訂閱關系
        publiser.subscribe(subscriber);

        // 4. 生產數據, 並發布
        // 這裏忽略數據生產過程
        for (int i = 0; i < 3; i++) {
            System.out.println("生成數據:" + i);
            // submit是個block方法
            publiser.submit(i);
        }

        // 5. 結束後 關閉發布者
        // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉
        publiser.close();

        // 主線程延遲停止, 否則數據沒有消費就會退出
        Thread.currentThread().join(1000);
        // debug的時候, 下面這行需要有斷點
        // 否則主線程結束無法debug
        System.out.println();
    }
}

運行結果如下:
技術分享圖片

上文中提到過可以調節發布者的數據產出速度,那麽這個速度是如何調節的呢?關鍵就在於submit方法,該方法是一個阻塞方法。需要先說明的是SubmissionPublisher裏有一個數據緩沖區,用於緩沖發布者產生的數據,而這個緩沖區是利用一個Object數組實現的,緩沖區最大長度為256。我們可以在onSubscribe方法裏打上斷點,查看到這個緩沖區:
技術分享圖片

當這個緩沖區的數據滿了之後,submit方法就會進入阻塞狀態,發布者數據的產生速度就會變慢,以此實現調節發布者的數據產出速度。


2.第二個例子演示了結合Processor的使用方式,代碼如下:

package com.example.demo;

import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;

/**
 * Processor, 需要繼承SubmissionPublisher並實現Processor接口
 *
 * 輸入源數據 integer, 過濾掉小於0的, 然後轉換成字符串發布出去
 */
class MyProcessor extends SubmissionPublisher<String>
        implements Processor<Integer, String> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        // 保存訂閱關系, 需要用它來給發布者響應
        this.subscription = subscription;

        // 請求一個數據
        this.subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        // 接受到一個數據, 處理
        System.out.println("處理器接受到數據: " + item);

        // 過濾掉小於0的, 然後發布出去
        if (item > 0) {
            this.submit("轉換後的數據:" + item);
        }

        // 處理完調用request再請求一個數據
        this.subscription.request(1);

        // 或者 已經達到了目標, 調用cancel告訴發布者不再接受數據了
        // this.subscription.cancel();
    }

    @Override
    public void onError(Throwable throwable) {
        // 出現了異常(例如處理數據的時候產生了異常)
        throwable.printStackTrace();

        // 我們可以告訴發布者, 後面不接受數據了
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        // 全部數據處理完了(發布者關閉了)
        System.out.println("處理器處理完了!");
        // 關閉發布者
        this.close();
    }

}

/**
 * 帶 process 的 flow demo
 * @author 01
 */
public class FlowDemo2 {

    public static void main(String[] args) throws Exception {
        // 1. 定義發布者, 發布的數據類型是 Integer
        // 直接使用jdk自帶的SubmissionPublisher
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<>();

        // 2. 定義處理器, 對數據進行過濾, 並轉換為String類型
        MyProcessor processor = new MyProcessor();

        // 3. 發布者 和 處理器 建立訂閱關系
        publiser.subscribe(processor);

        // 4. 定義最終訂閱者, 消費 String 類型數據
        Subscriber<String> subscriber = new Subscriber<>() {

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存訂閱關系, 需要用它來給發布者響應
                this.subscription = subscription;

                // 請求一個數據
                this.subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                // 接受到一個數據, 處理
                System.out.println("接受到數據: " + item);

                // 處理完調用request再請求一個數據
                this.subscription.request(1);

                // 或者 已經達到了目標, 調用cancel告訴發布者不再接受數據了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現了異常(例如處理數據的時候產生了異常)
                throwable.printStackTrace();

                // 我們可以告訴發布者, 後面不接受數據了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部數據處理完了(發布者關閉了)
                System.out.println("處理完了!");
            }

        };

        // 5. 處理器 和 最終訂閱者 建立訂閱關系
        processor.subscribe(subscriber);

        // 6. 生產數據, 並發布
        // 這裏忽略數據生產過程
        publiser.submit(-111);
        publiser.submit(111);

        // 7. 結束後 關閉發布者
        // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉
        publiser.close();

        // 主線程延遲停止, 否則數據沒有消費就退出
        Thread.currentThread().join(1000);
    }
}

運行結果如下:
技術分享圖片

Reactive Stream 響應式流