1. 程式人生 > >(3)Reactive stream 響應式流——Webflux響應式編程利器

(3)Reactive stream 響應式流——Webflux響應式編程利器

版本 miss new ace -s ble throwable exceptio 使用

Reactive stream 響應式流

  • Reactive stream是jdk9新特性,提供了一套API,就是一種訂閱發布者模式
  • 被壓,背壓是指在異步場景中,發布者發送事件速度遠快於訂閱者的處理速度的情況下,一種告訴上遊的發布者降低發送速度的策略,簡而言之,背壓就是一種流速控制的策略。
    舉個例子:假設以前是沒有水龍頭的,只能自來水廠主動的往用戶輸送水,但是不知道用戶需要多少水,有了Reactive stream,就相當於有了水龍頭,用戶可以主動的請求用水,而自來水廠也知道了用戶的需求
    示例代碼(需要jdk9以上版本的支持)
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;

public class FlowDemo {
    public static void main(String[] args) throws Exception {
        // 1. 定義發布者, 發布的數據類型是 Integer
        // 直接使用jdk自帶的SubmissionPublisher, 它實現了 Publisher 接口
        SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
        
        // 2. 定義訂閱者
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 保存訂閱關系, 需要用它來給發布者響應
                this.subscription = subscription;
                // 請求一個數據
                this.subscription.request(1);
            }

            @Override
            public void onNext(Integer item) {
                // 接受到一個數據, 處理
                System.out.println("接受到數據: " + item);
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                // 處理完調用request再請求一個數據
                this.subscription.request(1);
                // 或者 已經達到了目標, 調用cancel告訴發布者不再接受數據了
                // this.subscription.cancel();
            }

            @Override
            public void onError(Throwable throwable) {
                // 出現了異常(例如處理數據的時候產生了異常)
                throwable.printStackTrace();
                // 我們可以告訴發布者, 後面不接受數據了
                this.subscription.cancel();
            }

            @Override
            public void onComplete() {
                // 全部數據處理完了(發布者關閉了)
                System.out.println("處理完了!");
            }
        };

        // 3. 發布者和訂閱者 建立訂閱關系
        publiser.subscribe(subscriber);

        // 4. 生產數據, 並發布
        // 這裏忽略數據生產過程
        for (int i = 0; i < 1000; i++) {
            System.out.println("生成數據:" + i);
            // submit是個block方法
            publiser.submit(i);
        }

        publiser.submit(111);
        publiser.submit(222);
        publiser.submit(333);

        // 5. 結束後 關閉發布者
        // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉
        publiser.close();

        // 主線程延遲停止, 否則數據沒有消費就退出
        Thread.currentThread().join(1000);
    }
}

(3)Reactive stream 響應式流——Webflux響應式編程利器