1. 程式人生 > >響應式流(Reactive,Streams)

響應式流(Reactive,Streams)

概念

響應式流(Reactive Streams)是以帶非阻塞背壓方式處理非同步資料流的標準,提供一組最小化的介面,方法和協議來描述必要的操作和實體。

要解決的問題:

系統之間高併發的大量資料流互動通常採用非同步的釋出-訂閱模式。資料由釋出者推送給訂閱者的過程中,容易產生的一個問題是,當釋出者即生產者產生的資料速度遠遠大於訂閱者即消費者的消費速度時,消費者會承受巨大的資源壓力(pressure)而有可能崩潰。

解決原理:

為了解決以上問題,資料流的速度需要被控制,即流量控制(flow control),以防止快速的資料流不會壓垮目標。因此需要反壓即背壓(back pressure),生產者和消費者之間需要通過實現一種背壓機制來互操作。實現這種背壓機制要求是非同步非阻塞的,如果是同步阻塞的,消費者在處理資料時生產者必須等待,會產生效能問題。

解決方法:

響應式流(Reactive Streams)通過定義一組實體,介面和互操作方法,給出了實現非阻塞背壓的標準。第三方遵循這個標準來實現具體的解決方案,常見的有Reactor,RxJava,Akka Streams,Ratpack等。

該標準定義了四個介面:

釋出者Publisher
interface Publisher《T》 {
 void subscribe(Subscriber《? super T》 subscriber);
}
釋出者只有一個方法,用來接受訂閱者進行訂閱(subscribe)。T代表釋出者和訂閱者之間傳輸的資料型別。

訂閱者Subscriber


interface Subscriber《T》 {
 void onSubscribe(Subscription s);
 void onNext(T t);
 void onError(Throwable t);
 void onComplete();
}
訂閱者有四個事件方法,分別在開啟訂閱、接收資料、發生錯誤和資料傳輸結束時被呼叫。

訂閱物件Subscription
interface Subscription {
 void request(long n);
 void cancel();
}
訂閱物件是釋出者和訂閱者之間互動的操作物件,在釋出者(Publisher)通過subscribe方法加入訂閱者時,會通過呼叫訂閱者(Subscriber)的onSubscribe把訂閱物件(Subscription)傳給訂閱者。

訂閱者拿到訂閱物件後,通過呼叫訂閱物件的request方法,根據自身消費能力請求n條資料,或者呼叫cancel方法來停止接收資料。

訂閱物件的request方法被呼叫時,會觸發訂閱者的onNext事件方法,把資料傳輸給訂閱者。如果資料全部傳輸完成,則觸發訂閱者的onComplete事件方法。如果資料傳輸發生錯誤,則觸發訂閱者的onError事件方法。

處理者Processor
interface Processor《T,R》 extends Subscriber《T》, Publisher《R》 {
}
處理者既是釋出者又是訂閱者,用於在釋出者和訂閱者之間轉換資料格式,把釋出者的T型別資料轉換為訂閱者接受的R型別資料。處理者作為資料轉換的中介不是必須的。

由以上的介面可以看出,核心在於訂閱者可以通過request(long n)方法來控制接收的資料量,達到了實現背壓的目的。

Java對響應式流的支援

Java 9開始,增加了java.util.concurrent.Flow API,把響應式流標準的介面整合到了JDK中,並和響應式流標準介面定義完全一致,之前需要通過Maven引用的介面API,Java 9之後可以直接使用了。第三方實現庫也會逐步遷移到JDK的提供的API,當然這個遷移過程需要一段時間。

之前的Maven依賴:

<dependency>
 <groupId>org.reactivestreams</groupId>
 <artifactId>reactive-streams</artifactId>
</dependency>

 

下面具體介紹一下java響應式流的處理方式:

java9通過java.util.concurrent.Flow 和java.util.concurrent.SubmissionPublisher 類來實現響應式流
Flow 類中定義了四個巢狀的靜態介面,用於建立流量控制的元件,釋出者在其中生成一個或多個供訂閱者使用的資料項:

    Publisher:資料項釋出者、生產者
    Subscriber:資料項訂閱者、消費者
    Subscription:釋出者與訂閱者之間的關係紐帶,訂閱令牌
    Processor:資料處理器

釋出者(Publisher)以流的方式釋出資料項,並註冊訂閱者,並且實現 Flow.Publisher 介面,該介面聲明瞭一個方法,我們通過呼叫它來為釋出者註冊訂閱者:

    void subscribe(Flow.Subscriber<? super T> subscriber)

    呼叫此方法來向釋出者註冊訂閱者,但是,如果此訂閱者已被其他釋出者註冊或註冊失敗(策略衝突),這個方法就會呼叫訂閱者的onError() 方法來丟擲IllegalStateException 異常,除此之外,訂閱者的onSubscribe() 方法會呼叫一個新的Flow.Subscription ,當空物件傳給訂閱者時,subscribe() 方法會丟擲NullPointerException異常。

訂閱者(Subscriber)從訂閱的釋出者中返回資料項,並且實現Flow.Subscriber<T> ,這個介面宣告的方法如下:

    void onSubscribe(Flow.Subscription subscription)
    void onComplete()
    void onError(Throwable throwable)
    void onNext(T item)

    onSubscribe() 方法用來確認訂閱者註冊到釋出者是否註冊成功,它以引數列表的方式接收一個Flow.Subscription型別的引數,而這個引數型別裡面宣告的方法允許向釋出者請求釋出新的資料項,或請求釋出者不再發布更多的資料項。

    onComplete() 方法用在當訂閱者沒有呼叫其他方法,而Subscription 發生錯誤沒有終止的情況下。呼叫這個方法之後,此訂閱者就不能呼叫其他方法。

    onError(Throwable throwable) 方法用在當釋出者或訂閱者遭遇不可恢復的錯誤的時候, 呼叫這個方法之後,此訂閱者也不能呼叫其他方法。

    onNext() 方法用於宣告下一個資料項的訂閱,如果在此過程中丟擲異常,結果將得不到確認,甚至會導致訂閱被取消。

一個訂閱令牌(Subscription)為釋出者和訂閱者定義一種關係, 使得訂閱者接收特定的資料項或者在特定時間取消接收請求,訂閱令牌實現自Flow.Subscription 介面,該介面宣告方法如下:

    void request(long n)
    void cancel()
    
    request() 方法新增n個數據項到當前未滿的訂閱請求中。如果n小於或等於0,訂閱者的onError() 方法會被呼叫,並且丟擲IllegalArgumentException 異常,此外,如果n大於0,訂閱者就會在onNext() 方法的呼叫下接收到n個數據項,除非中間異常終止。 從Long.MAX_VALUE次到n次中間是無界的呼叫。

    cancel() 用來終止訂閱者接收資料項,它有一種嘗試機制,也就是說,在呼叫它之後也有可能收到資料項。
    
資料處理器(Processor)在不改變釋出者與訂閱者的情況下基於流做資料處理,可以在釋出者與訂閱者之間放多個數據處理器,成為一個處理器鏈,釋出者與訂閱者不依賴於資料處理,它們是單獨的過程。JDK9中不提供具體的資料處理器,必須由開發者來通過實現無方法宣告的Processor介面來自行構建

SubmissionPublisher 實現自Flow.Publisher 介面,向當前訂閱者非同步提交非空的資料項,直到它被關閉。每個當前訂閱者以一個相同的順序接收新提交的資料項,除非資料項丟失或者遇到異常。SubmissionPublisher 允許資料項在丟失或阻塞的時候扮演釋出者角色。

SubmissionPublisher 提供了三個構造方法來獲取例項。無參的構造器依賴於 ForkJoinPool.commonPool() 方法來提交發布者,以此實現生產者向訂閱者提供資料項的非同步特性。

    import java.util.Arrays;
     
    import java.util.concurrent.Flow.*;
    import java.util.concurrent.SubmissionPublisher;  
     
    public class FlowDemo
    {
       public static void main(String[] args)
       {
          // Create a publisher.
     
          SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
     
          // Create a subscriber and register it with the publisher.
     
          MySubscriber<String> subscriber = new MySubscriber<>();
          publisher.subscribe(subscriber);
     
          // Publish several data items and then close the publisher.
     
          System.out.println("Publishing data items...");
          String[] items = { "jan", "feb", "mar", "apr", "may", "jun",
                             "jul", "aug", "sep", "oct", "nov", "dec" };
          Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
          publisher.close();
     
          try
          {
             synchronized("A")
             {
                "A".wait();
             }
          }
          catch (InterruptedException ie)
          {
          }
       }
    }
     
    class MySubscriber<T> implements Subscriber<T>
    {
       private Subscription subscription;
     
       @Override
       public void onSubscribe(Subscription subscription)
       {
          this.subscription = subscription;
          subscription.request(1);
       }
     
       @Override
       public void onNext(T item)
       {
          System.out.println("Received: " + item);
          subscription.request(1);
       }
     
       @Override
       public void onError(Throwable t)
       {
          t.printStackTrace();
          synchronized("A")
          {
             "A".notifyAll();
          }
       }
     
       @Override
       public void onComplete()
       {
          System.out.println("Done");
          synchronized("A")
          {
             "A".notifyAll();
          }
       }
    }