WebFlux響應式程式設計基礎之 4 reactive stream 響應式流
阿新 • • 發佈:2019-01-06
reactive stream 響應式流 — 簡而言之,就是多了一個溝通的渠道
釋出訂閱者
背壓 交流
Reactive Stream主要介面
java.util.concurrent.Flow 原始碼很重要 很有意思 多讀幾遍
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
public class FlowDemo {
public static void main(String[] args) throws Exception {
// 1. 定義釋出者, 釋出的資料型別是 Integer
// 直接使用jdk自帶的SubmissionPublisher, 它實現了 Publisher 介面
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
// 2. 定義訂閱者
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 儲存訂閱關係, 需要用它來給釋出者響應
this.subscription = subscription;
// 請求一個數據
this.subscription.request(1 );
}
@Override
public void onNext(Integer item) {
// 接受到一個數據, 處理
System.out.println("接受到資料: " + item);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 處理完呼叫request再請求一個數據
this.subscription.request(1);
// 或者 已經達到了目標, 呼叫cancel告訴釋出者不再接受資料了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出現了異常(例如處理資料的時候產生了異常)
throwable.printStackTrace();
// 我們可以告訴釋出者, 後面不接受資料了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部資料處理完了(釋出者關閉了)
System.out.println("處理完了!");
}
};
// 3. 釋出者和訂閱者 建立訂閱關係
publiser.subscribe(subscriber);
// 4. 生產資料, 併發布
// 這裡忽略資料生產過程
for (int i = 0; i < 1000; i++) {
System.out.println("生成資料:" + i);
// submit是個block方法
publiser.submit(i);
}
// 5. 結束後 關閉釋出者
// 正式環境 應該放 finally 或者使用 try-resouce 確保關閉
publiser.close();
// 主執行緒延遲停止, 否則資料沒有消費就退出
Thread.currentThread().join(1000);
// debug的時候, 下面這行需要有斷點
// 否則主執行緒結束無法debug
System.out.println();
}
}
響應式流(ReactiveStreams)為這種非阻塞背壓的非同步流處理提供了一個標準。在處理系統出現過載的時候,採用非同步傳送訊號的方式通知資料來源做相應的處理。這個通知的訊號就像是水管的閥門一樣,關閉這個閥門會增加背壓(資料來源對處理系統的壓力),同時也會增加處理系統的壓力。
這個標準的目的是治理跨非同步邊界的流資料交換(比如向其他執行緒傳輸資料) ,同時確保處理系統不被緩衝資料而壓垮。換一種說法,背壓是這個標準模型的一個組成部分,以便允許線上程之間調停的佇列被界定。特別注意,背壓通訊是非同步的。
完整例項
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
/**
* 帶 process 的 flow demo
*/
/**
* Processor, 需要繼承SubmissionPublisher並實現Processor介面
*
* 輸入源資料 integer, 過濾掉小於0的, 然後轉換成字串釋出出去
*/
class MyProcessor extends SubmissionPublisher<String>implements Processor<Integer, String> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 儲存訂閱關係, 需要用它來給釋出者響應
this.subscription = subscription;
// 請求一個數據
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受到一個數據, 處理
System.out.println("處理器接受到資料: " + item);
// 過濾掉小於0的, 然後釋出出去
if (item > 0) {
this.submit("轉換後的資料:" + item);
}
// 處理完呼叫request再請求一個數據
this.subscription.request(1);
// 或者 已經達到了目標, 呼叫cancel告訴釋出者不再接受資料了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出現了異常(例如處理資料的時候產生了異常)
throwable.printStackTrace();
// 我們可以告訴釋出者, 後面不接受資料了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部資料處理完了(釋出者關閉了)
System.out.println("處理器處理完了!");
// 關閉釋出者
this.close();
}
}
public class FlowDemo2 {
public static void main(String[] args) throws Exception {
// 1. 定義釋出者, 釋出的資料型別是 Integer
// 直接使用jdk自帶的SubmissionPublisher
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
// 2. 定義處理器, 對資料進行過濾, 並轉換為String型別
MyProcessor processor = new MyProcessor();
// 3. 釋出者 和 處理器 建立訂閱關係
publiser.subscribe(processor);
// 4. 定義最終訂閱者, 消費 String 型別資料
Subscriber<String> subscriber = new Subscriber<String>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 儲存訂閱關係, 需要用它來給釋出者響應
this.subscription = subscription;
// 請求一個數據
this.subscription.request(1);
}
@Override
public void onNext(String item) {
// 接受到一個數據, 處理
System.out.println("接受到資料: " + item);
// 處理完呼叫request再請求一個數據
this.subscription.request(1);
// 或者 已經達到了目標, 呼叫cancel告訴釋出者不再接受資料了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出現了異常(例如處理資料的時候產生了異常)
throwable.printStackTrace();
// 我們可以告訴釋出者, 後面不接受資料了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部資料處理完了(釋出者關閉了)
System.out.println("處理完了!");
}
};
// 5. 處理器 和 最終訂閱者 建立訂閱關係
processor.subscribe(subscriber);
// 6. 生產資料, 併發布
// 這裡忽略資料生產過程
publiser.submit(-111);
publiser.submit(111);
// 7. 結束後 關閉釋出者
// 正式環境 應該放 finally 或者使用 try-resouce 確保關閉
publiser.close();
// 主執行緒延遲停止, 否則資料沒有消費就退出
Thread.currentThread().join(1000);
}
}
執行機制
反饋
submit是一個阻塞方法