Reactive Stream 響應式流
Reactive Stream (響應式流/反應流) 是JDK9引入的一套標準,是一套基於發布/訂閱模式的數據處理規範。響應式流從2013年開始,作為提供非阻塞背壓的異步流處理標準的倡議。 它旨在解決處理元素流的問題——如何將元素流從發布者傳遞到訂閱者,而不需要發布者阻塞,或訂閱者有無限制的緩沖區或丟棄。更確切地說,Reactive流目的是“找到最小的一組接口,方法和協議,用來描述必要的操作和實體以實現這樣的目標:以非阻塞背壓方式實現數據的異步流”。
“背壓(反壓)back pressure”概念很關鍵。首先異步消費者會向生產者訂閱接收消息,然後當有新的信息可用時,消費者會通過之前訂閱時提供的回調函數被再次激活調用。如果生產者發出的信息比消費者能夠處理消息最大量還要多,消費者可能會被迫一直在抓消息,耗費越來越多的資源,埋下潛在的崩潰風險。為了防止這一點,需要有一種機制使消費者可以通知生產者,降低消息的生成速度。生產者可以采用多種策略來實現這一要求,這種機制稱為背壓。
響應式流模型非常簡單——訂閱者向發布者發送多個元素的異步請求,發布者向訂閱者異步發送多個或稍少的元素。響應式流會在pull模型和push模型流處理機制之間動態切換。 當訂閱者較慢時,它使用pull模型,當訂閱者更快時使用push模型。
簡單來說,在響應式流下訂閱者可以與發布者溝通,如果使用JMS就應該知道,訂閱者只能被動接收發布者所產生的消息數據。這就好比沒有水龍頭的水管一樣,我只能被動接收水管裏流過來的水,無法關閉也無法減少。而響應式流就相當於給水管加了個水龍頭,在消費者這邊可以控制水流的增加、減少及關閉。
響應式流模型圖:
發布者(Publisher)是潛在的無限數量的有序元素的生產者。發布者可能有多個來自訂閱者的待處理請求。
- 根據收到的要求向當前訂閱者發布(或發送)元素。
訂閱者(Subscriber)從發布者那裏訂閱並接收元素。訂閱者可以請求更多的元素。
- 發布者向訂閱者發送訂閱令牌(Subscription)。
- 使用訂閱令牌,訂閱者從發布者那裏請求多個元素。
- 當元素準備就緒時,發布者向訂閱者發送多個或更少的元素。
Reactive Stream主要接口
JDK9 通過java.util.concurrent.Flow 和java.util.concurrent.SubmissionPublisher 類來實現響應式流。在JDK9裏Reactive Stream的主要接口聲明在Flow類裏,Flow 類中定義了四個嵌套的靜態接口,用於建立流量控制的組件,發布者在其中生成一個或多個供訂閱者使用的數據項:
- Publisher:數據項發布者、生產者
- Subscriber:數據項訂閱者、消費者
- Subscription:發布者與訂閱者之間的關系紐帶,訂閱令牌
- Processor:數據處理器
Flow類結構如下:
Publisher
是能夠發出元素的發布者,Subscriber
是接收元素並做出響應的訂閱者。當執行Publisher
裏的subscribe
方法時,發布者會回調訂閱者的onSubscribe
方法,這個方法中,通常訂閱者會借助傳入的Subscription
向發布者請求n個數據。然後發布者通過不斷調用訂閱者的onNext
方法向訂閱者發出最多n個數據。如果數據全部發完,則會調用onComplete
告知訂閱者流已經發完;如果有錯誤發生,則通過onError
發出錯誤數據,同樣也會終止流。
其中,Subscription
相當於是連接Publisher
和Subscriber
的“紐帶”。因為當發布者調用subscribe
方法註冊訂閱者時,會通過訂閱者的回調方法onSubscribe
傳入Subscription
對象,之後訂閱者就可以使用這個Subscription
對象的request
方法向發布者“要”數據了。背壓機制正是基於此來實現的。
如下圖:
Processor
則是集Publisher
和Subscriber
於一身,相當於是發布者與訂閱者之間的一個”中間人“,可以通過Processor
進行一些中間操作:
/**
* A component that acts as both a Subscriber and Publisher.
*
* @param <T> the subscribed item type
* @param <R> the published item type
*/
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
如下圖:
參考:
https://blog.csdn.net/rickiyeat/article/details/78175962
響應流使用示例
1.以下代碼簡單演示了SubmissionPublisher 和這套發布-訂閱框架的基本使用方式:
package com.example.demo;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
/**
* @program: demo
* @description: Flow Demo
* @author: 01
* @create: 2018-10-04 13:25
**/
public class FlowDemo {
public static void main(String[] args) throws Exception {
// 1. 定義發布者, 發布的數據類型是 Integer
// 直接使用jdk自帶的SubmissionPublisher, 它實現了 Publisher 接口
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<>();
// 2. 定義訂閱者
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
// 保存訂閱關系, 需要用它來給發布者響應
this.subscription = subscription;
// 請求一個數據
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受到一個數據, 處理
System.out.println("接受到數據: " + item);
// 處理完調用request再請求一個數據
this.subscription.request(1);
// 或者已經達到了目標, 可以調用cancel告訴發布者不再接受數據了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出現了異常(例如處理數據的時候產生了異常)
throwable.printStackTrace();
// 我們可以告訴發布者, 後面不接受數據了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部數據處理完了(發布者關閉了)
System.out.println("處理完了!");
}
};
// 3. 發布者和訂閱者 建立訂閱關系
publiser.subscribe(subscriber);
// 4. 生產數據, 並發布
// 這裏忽略數據生產過程
for (int i = 0; i < 3; i++) {
System.out.println("生成數據:" + i);
// submit是個block方法
publiser.submit(i);
}
// 5. 結束後 關閉發布者
// 正式環境 應該放 finally 或者使用 try-resouce 確保關閉
publiser.close();
// 主線程延遲停止, 否則數據沒有消費就會退出
Thread.currentThread().join(1000);
// debug的時候, 下面這行需要有斷點
// 否則主線程結束無法debug
System.out.println();
}
}
運行結果如下:
上文中提到過可以調節發布者的數據產出速度,那麽這個速度是如何調節的呢?關鍵就在於submit方法,該方法是一個阻塞方法。需要先說明的是SubmissionPublisher裏有一個數據緩沖區,用於緩沖發布者產生的數據,而這個緩沖區是利用一個Object數組實現的,緩沖區最大長度為256。我們可以在onSubscribe方法裏打上斷點,查看到這個緩沖區:
當這個緩沖區的數據滿了之後,submit方法就會進入阻塞狀態,發布者數據的產生速度就會變慢,以此實現調節發布者的數據產出速度。
2.第二個例子演示了結合Processor的使用方式,代碼如下:
package com.example.demo;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
/**
* Processor, 需要繼承SubmissionPublisher並實現Processor接口
*
* 輸入源數據 integer, 過濾掉小於0的, 然後轉換成字符串發布出去
*/
class MyProcessor extends SubmissionPublisher<String>
implements Processor<Integer, String> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存訂閱關系, 需要用它來給發布者響應
this.subscription = subscription;
// 請求一個數據
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受到一個數據, 處理
System.out.println("處理器接受到數據: " + item);
// 過濾掉小於0的, 然後發布出去
if (item > 0) {
this.submit("轉換後的數據:" + item);
}
// 處理完調用request再請求一個數據
this.subscription.request(1);
// 或者 已經達到了目標, 調用cancel告訴發布者不再接受數據了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出現了異常(例如處理數據的時候產生了異常)
throwable.printStackTrace();
// 我們可以告訴發布者, 後面不接受數據了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部數據處理完了(發布者關閉了)
System.out.println("處理器處理完了!");
// 關閉發布者
this.close();
}
}
/**
* 帶 process 的 flow demo
* @author 01
*/
public class FlowDemo2 {
public static void main(String[] args) throws Exception {
// 1. 定義發布者, 發布的數據類型是 Integer
// 直接使用jdk自帶的SubmissionPublisher
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<>();
// 2. 定義處理器, 對數據進行過濾, 並轉換為String類型
MyProcessor processor = new MyProcessor();
// 3. 發布者 和 處理器 建立訂閱關系
publiser.subscribe(processor);
// 4. 定義最終訂閱者, 消費 String 類型數據
Subscriber<String> subscriber = new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存訂閱關系, 需要用它來給發布者響應
this.subscription = subscription;
// 請求一個數據
this.subscription.request(1);
}
@Override
public void onNext(String item) {
// 接受到一個數據, 處理
System.out.println("接受到數據: " + item);
// 處理完調用request再請求一個數據
this.subscription.request(1);
// 或者 已經達到了目標, 調用cancel告訴發布者不再接受數據了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出現了異常(例如處理數據的時候產生了異常)
throwable.printStackTrace();
// 我們可以告訴發布者, 後面不接受數據了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部數據處理完了(發布者關閉了)
System.out.println("處理完了!");
}
};
// 5. 處理器 和 最終訂閱者 建立訂閱關系
processor.subscribe(subscriber);
// 6. 生產數據, 並發布
// 這裏忽略數據生產過程
publiser.submit(-111);
publiser.submit(111);
// 7. 結束後 關閉發布者
// 正式環境 應該放 finally 或者使用 try-resouce 確保關閉
publiser.close();
// 主線程延遲停止, 否則數據沒有消費就退出
Thread.currentThread().join(1000);
}
}
運行結果如下:
Reactive Stream 響應式流