Java9 基於非同步響應式流的釋出-訂閱框架
為響應式流(Reactive Streams)增加的釋出-訂閱(publisher-subscriber)框架、併發包
CompletableFuture
類的增強,等等。。
JEP266中為Java語言的併發性又引入許多新的方式:響應式流,一個為它而生互操作性更強的釋出-訂閱框架;並且為了Java9其他API而增強的 java.util.concurrent.CompletableFuture
類, 以及其他的更多的更新。
在本文中,展開對響應式流的介紹,然後介紹這個釋出訂閱框架。
響應式流(Reactive Streams)
批處理系統在收集了足夠多的資料,達到某一個閾值亟待進行下一步操作的時候,就衍生出了一個新的名詞—資料處理(Data processing)。這時候,面向流(stream-oriented)的架構思想可以幫助我們儘快達成這個目標。它可以捕獲和處理實時資料,並且可以快速地(秒級甚至更短)基於處理的結果來對系統進行相應的操作。和它相比,一個批處理系統可能會花費數秒、數天、甚至更久來做出響應。
處理資料流(特別是大小不定的實時資料)需要在非同步系統中特別小心。主要問題是要控制資源消耗,避免資料來源和處理系統出現供大於求(積壓)的情況。這時候,需要非同步地來對資料進行並行處理,利用分散式系統或者發揮多核CPU的效能,能有效地使資料處理過程變得快速高效。
響應式流(Reactive Streams)為這種非阻塞背壓的非同步流處理提供了一個標準。在處理系統出現過載的時候,採用非同步傳送訊號的方式通知資料來源做相應的處理。這個通知的訊號就像是水管的閥門一樣,關閉這個閥門會增加背壓(資料來源對處理系統的壓力),同時也會增加處理系統的壓力。
這個標準的目的是治理跨非同步邊界的流資料交換(比如向其他執行緒傳輸資料) ,同時確保處理系統不被緩衝資料而壓垮。換一種說法,背壓是這個標準模型的一個組成部分,以便允許線上程之間調停的佇列被界定。特別注意,背壓通訊是非同步的。
響應式流(Reactive Streams)的提出就致力於提供一組最小規模的介面、方法、或者協議來描述這個操作或實體:具有非阻塞背壓的非同步資料流。
釋出-訂閱(publisher-subscriber)框架
Java 9 通過java.util.concurrent.Flow
和java.util.concurrent.SubmissionPublisher
類來實現響應式流。
Flow
類中定義了四個巢狀的靜態介面,用於建立流量控制的元件,釋出者在其中生成一個或多個供訂閱者使用的資料項:
- Publisher:資料項釋出者、生產者
- Subscriber:資料項訂閱者、消費者
- Subscription:釋出者與訂閱者之間的關係紐帶,訂閱令牌
- Processor:資料處理器
釋出者(Publisher)以流的方式釋出資料項,並註冊訂閱者,並且實現 Flow.Publisher
介面,該介面聲明瞭一個方法,我們通過呼叫它來為釋出者註冊訂閱者:
void subscribe(Flow.Subscriber<? super T> subscriber)
呼叫此方法來向釋出者註冊訂閱者,但是,如果此訂閱者已被其他釋出者註冊或註冊失敗(策略衝突),這個方法就會呼叫訂閱者的onError()
方法來丟擲IllegalStateException
異常,除此之外,訂閱者的onSubscribe()
方法會呼叫一個新的Flow.Subscription
,當空物件傳給訂閱者時,subscribe()
方法會丟擲NullPointerException異常。
訂閱者(Subscriber)從訂閱的釋出者中返回資料項,並且實現Flow.Subscriber<T>
,這個介面宣告的方法如下:
void onSubscribe(Flow.Subscription subscription)
void onComplete()
void onError(Throwable throwable)
void onNext(T item)
onSubscribe()
方法用來確認訂閱者註冊到釋出者是否註冊成功,它以引數列表的方式接收一個Flow.Subscription
型別的引數,而這個引數型別裡面宣告的方法允許向釋出者請求釋出新的資料項,或請求釋出者不再發布更多的資料項。
onComplete()
方法用在當訂閱者沒有呼叫其他方法,而Subscription
發生錯誤沒有終止的情況下。呼叫這個方法之後,此訂閱者就不能呼叫其他方法。
onError(Throwable throwable)
方法用在當釋出者或訂閱者遭遇不可恢復的錯誤的時候, 呼叫這個方法之後,此訂閱者也不能呼叫其他方法。
onNext()
方法用於宣告下一個資料項的訂閱,如果在此過程中丟擲異常,結果將得不到確認,甚至會導致訂閱被取消。
一個訂閱令牌(Subscription)為釋出者和訂閱者定義一種關係, 使得訂閱者接收特定的資料項或者在特定時間取消接收請求,訂閱令牌實現自Flow.Subscription
介面,該介面宣告方法如下:
void request(long n)
void cancel()
request()
方法新增n個數據項到當前未滿的訂閱請求中。如果n小於或等於0,訂閱者的onError()
方法會被呼叫,並且丟擲IllegalArgumentException 異常,此外,如果n大於0,訂閱者就會在onNext()
方法的呼叫下接收到n個數據項,除非中間異常終止。 從Long.MAX_VALUE次到n次中間是無界的呼叫。
cancel()
用來終止訂閱者接收資料項,它有一種嘗試機制,也就是說,在呼叫它之後也有可能收到資料項。
最後,資料處理器(Processor)在不改變釋出者與訂閱者的情況下基於流做資料處理,可以在釋出者與訂閱者之間放多個數據處理器,成為一個處理器鏈,釋出者與訂閱者不依賴於資料處理,它們是單獨的過程。JDK9中不提供具體的資料處理器,必須由開發者來通過實現無方法宣告的Processor
介面來自行構建。
SubmissionPublisher
實現自Flow.Publisher
介面,向當前訂閱者非同步提交非空的資料項,直到它被關閉。每個當前訂閱者以一個相同的順序接收新提交的資料項,除非資料項丟失或者遇到異常。SubmissionPublisher
允許資料項在丟失或阻塞的時候扮演釋出者角色。
SubmissionPublisher
提供了三個構造方法來獲取例項。無參的構造器依賴於 ForkJoinPool.commonPool()
方法來提交發布者,以此實現生產者向訂閱者提供資料項的非同步特性。
下面的程式演示了SubmissionPublisher
用法和這套釋出-訂閱框架的其他特性:
import java.util.Arrays;
import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
public class FlowDemo
{
public static void main(String[] args)
{
// Create a publisher.
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// Create a subscriber and register it with the publisher.
MySubscriber<String> subscriber = new MySubscriber<>();
publisher.subscribe(subscriber);
// Publish several data items and then close the publisher.
System.out.println("Publishing data items...");
String[] items = { "jan", "feb", "mar", "apr", "may", "jun",
"jul", "aug", "sep", "oct", "nov", "dec" };
Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
publisher.close();
try
{
synchronized("A")
{
"A".wait();
}
}
catch (InterruptedException ie)
{
}
}
}
class MySubscriber<T> implements Subscriber<T>
{
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription)
{
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item)
{
System.out.println("Received: " + item);
subscription.request(1);
}
@Override
public void onError(Throwable t)
{
t.printStackTrace();
synchronized("A")
{
"A".notifyAll();
}
}
@Override
public void onComplete()
{
System.out.println("Done");
synchronized("A")
{
"A".notifyAll();
}
}
}
其中使用了wait()
和notifyAll()
方法來使主執行緒等到onComplete()
的完成,否則是不會看到任何輸出的。
下面是輸出結果:
Publishing data items...
Received: jan
Received: feb
Received: mar
Received: apr
Received: may
Received: jun
Received: jul
Received: aug
Received: sep
Received: oct
Received: nov
Received: dec
Done
最後說一句,熟悉RxJava的同學可以會心一笑了。