(3)Reactive stream 響應式流——Webflux響應式編程利器
阿新 • • 發佈:2019-01-22
版本 miss new ace -s ble throwable exceptio 使用
Reactive stream 響應式流
- Reactive stream是jdk9新特性,提供了一套API,就是一種訂閱發布者模式
- 被壓,背壓是指在異步場景中,發布者發送事件速度遠快於訂閱者的處理速度的情況下,一種告訴上遊的發布者降低發送速度的策略,簡而言之,背壓就是一種流速控制的策略。
舉個例子:假設以前是沒有水龍頭的,只能自來水廠主動的往用戶輸送水,但是不知道用戶需要多少水,有了Reactive stream,就相當於有了水龍頭,用戶可以主動的請求用水,而自來水廠也知道了用戶的需求
示例代碼(需要jdk9以上版本的支持)
import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; public class FlowDemo { public static void main(String[] args) throws Exception { // 1. 定義發布者, 發布的數據類型是 Integer // 直接使用jdk自帶的SubmissionPublisher, 它實現了 Publisher 接口 SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>(); // 2. 定義訂閱者 Subscriber<Integer> subscriber = new Subscriber<Integer>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存訂閱關系, 需要用它來給發布者響應 this.subscription = subscription; // 請求一個數據 this.subscription.request(1); } @Override public void onNext(Integer item) { // 接受到一個數據, 處理 System.out.println("接受到數據: " + item); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } // 處理完調用request再請求一個數據 this.subscription.request(1); // 或者 已經達到了目標, 調用cancel告訴發布者不再接受數據了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出現了異常(例如處理數據的時候產生了異常) throwable.printStackTrace(); // 我們可以告訴發布者, 後面不接受數據了 this.subscription.cancel(); } @Override public void onComplete() { // 全部數據處理完了(發布者關閉了) System.out.println("處理完了!"); } }; // 3. 發布者和訂閱者 建立訂閱關系 publiser.subscribe(subscriber); // 4. 生產數據, 並發布 // 這裏忽略數據生產過程 for (int i = 0; i < 1000; i++) { System.out.println("生成數據:" + i); // submit是個block方法 publiser.submit(i); } publiser.submit(111); publiser.submit(222); publiser.submit(333); // 5. 結束後 關閉發布者 // 正式環境 應該放 finally 或者使用 try-resouce 確保關閉 publiser.close(); // 主線程延遲停止, 否則數據沒有消費就退出 Thread.currentThread().join(1000); } }
(3)Reactive stream 響應式流——Webflux響應式編程利器