1. 程式人生 > >Reactor:深入理解reactor core

Reactor:深入理解reactor core

[toc] # 簡介 上篇文章我們簡單的介紹了Reactor的發展史和基本的Flux和Mono的使用,本文將會進一步挖掘Reactor的高階用法,一起來看看吧。 # 自定義Subscriber 之前的文章我們提到了4個Flux的subscribe的方法: ~~~java Disposable subscribe(); Disposable subscribe(Consumer consumer); Disposable subscribe(Consumer consumer, Consumer errorConsumer); Disposable subscribe(Consumer consumer, Consumer errorConsumer, Runnable completeConsumer); Disposable subscribe(Consumer consumer, Consumer errorConsumer, Runnable completeConsumer, Consumer subscriptionConsumer); ~~~ 這四個方法,需要我們使用lambda表示式來自定義consumer,errorConsumer,completeSonsumer和subscriptionConsumer這四個Consumer。 寫起來比較複雜,看起來也不太方便,我們考慮一下,這四個Consumer是不是和Subscriber介面中定義的4個方法是一一對應的呢? ~~~java public static interface Subscriber { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); } ~~~ 對的,所以我們有一個更加簡單點的subscribe方法: ~~~java public final void subscribe(Subscriber actual) ~~~ 這個subscribe方法直接接收一個Subscriber類。從而實現了所有的功能。 自己寫Subscriber太麻煩了,Reactor為我們提供了一個BaseSubscriber的類,它實現了Subscriber中的所有功能,還附帶了一些其他的方法。 我們看下BaseSubscriber的定義: ~~~java public abstract class BaseSubscriber implements CoreSubscriber, Subscription, Disposable ~~~ > 注意,BaseSubscriber是單次使用的,這就意味著,如果它首先subscription到Publisher1,然後subscription到Publisher2,那麼將會取消對第一個Publisher的訂閱。 因為BaseSubscriber是一個抽象類,所以我們需要繼承它,並且重寫我們需要自己實現的方法。 下面看一個自定義的Subscriber: ~~~java public class CustSubscriber extends BaseSubscriber { public void hookOnSubscribe(Subscription subscription) { System.out.println("Subscribed"); request(1); } public void hookOnNext(T value) { System.out.println(value); request(1); } } ~~~ BaseSubscriber中有很多以hook開頭的方法,這些方法都是我們可以重寫的,而Subscriber原生定義的on開頭的方法,在BaseSubscriber中都是final的,都是不能重寫的。 我們看一個定義: ~~~java @Override public final void onSubscribe(Subscription s) { if (Operators.setOnce(S, this, s)) { try { hookOnSubscribe(s); } catch (Throwable throwable) { onError(Operators.onOperatorError(s, throwable, currentContext())); } } } ~~~ 可以看到,它內部實際上呼叫了hook的方法。 上面的CustSubscriber中,我們重寫了兩個方法,一個是hookOnSubscribe,在建立訂閱的時候呼叫,一個是hookOnNext,在收到onNext訊號的時候呼叫。 在這些方法中,給了我們足夠的自定義空間,上面的例子中我們呼叫了request(1),表示再請求一個元素。 其他的hook方法還有: hookOnComplete, hookOnError, hookOnCancel 和 hookFinally。 # Backpressure處理 我們之前講過了,reactive stream的最大特徵就是可以處理Backpressure。 什麼是Backpressure呢?就是當consumer處理過不來的時候,可以通知producer來減少生產速度。 我們看下BaseSubscriber中預設的hookOnSubscribe實現: ~~~java protected void hookOnSubscribe(Subscription subscription){ subscription.request(Long.MAX_VALUE); } ~~~ 可以看到預設是request無限數目的值。 也就是說預設情況下沒有Backpressure。 通過重寫hookOnSubscribe方法,我們可以自定義處理速度。 除了request之外,我們還可以在publisher中限制subscriber的速度。 ~~~java public final Flux limitRate(int prefetchRate) { return onAssembly(this.publishOn(Schedulers.immediate(), prefetchRate)); } ~~~ 在Flux中,我們有一個limitRate方法,可以設定publisher的速度。 比如subscriber request(100),然後我們設定limitRate(10),那麼最多producer一次只會產生10個元素。 # 建立Flux 接下來,我們要講解一下怎麼建立Flux,通常來講有4種方法來建立Flux。 ## 使用generate 第一種方法就是最簡單的同步建立的generate. 先看一個例子: ~~~java public void useGenerate(){ Flux flux = Flux.generate( () -> 0, (state, sink) -> { sink.next("3 x " + state + " = " + 3*state); if (state == 10) sink.complete(); return state + 1; }); flux.subscribe(System.out::println); } ~~~ 輸出結果: ~~~java 3 x 0 = 0 3 x 1 = 3 3 x 2 = 6 3 x 3 = 9 3 x 4 = 12 3 x 5 = 15 3 x 6 = 18 3 x 7 = 21 3 x 8 = 24 3 x 9 = 27 3 x 10 = 30 ~~~ 上面的例子中,我們使用generate方法來同步的生成元素。 generate接收兩個引數: ~~~java public static Flux generate(Callable stateSupplier, BiFunction, S> generator) ~~~ 第一個引數是stateSupplier,用來指定初始化的狀態。 第二個引數是一個generator,用來消費SynchronousSink,並生成新的狀態。 上面的例子中,我們每次將state+1,一直加到10。 然後使用subscribe來將所有的生成元素輸出。 ## 使用create Flux也提供了一個create方法來建立Flux,create可以是同步也可以是非同步的,並且支援多執行緒操作。 因為create沒有初始的state狀態,所以可以用在多執行緒中。 create的一個非常有用的地方就是可以將第三方的非同步API和Flux關聯起來,舉個例子,我們有一個自定義的EventProcessor,當處理相應的事件的時候,會去呼叫註冊到Processor中的listener的一些方法。 ~~~java interface MyEventListener { void onDataChunk(List chunk); void processComplete(); } ~~~ 我們怎麼把這個Listener的響應行為和Flux關聯起來呢? ~~~java public void useCreate(){ EventProcessor myEventProcessor = new EventProcessor(); Flux bridge = Flux.create(sink -> { myEventProcessor.register( new MyEventListener() { public void onDataChunk(List chunk) { for(String s : chunk) { sink.next(s); } } public void processComplete() { sink.complete(); } }); }); } ~~~ 使用create就夠了,create接收一個consumer引數: ~~~java public static Flux create(Consumer> emitter) ~~~ 這個consumer的本質是去消費FluxSink物件。 上面的例子在MyEventListener的事件中對FluxSink物件進行消費。 ## 使用push push和create一樣,也支援非同步操作,但是同時只能有一個執行緒來呼叫next, complete 或者 error方法,所以它是單執行緒的。 ## 使用Handle Handle和上面的三個方法不同,它是一個例項方法。 它和generate很類似,也是消費SynchronousSink物件。 ~~~java Flux handle(BiConsumer>); ~~~ 不同的是它的引數是一個BiConsumer,是沒有返回值的。 看一個使用的例子: ~~~java public void useHandle(){ Flux alphabet = Flux.just(-1, 30, 13, 9, 20) .handle((i, sink) -> { String letter = alphabet(i); if (letter != null) sink.next(letter); }); alphabet.subscribe(System.out::println); } public String alphabet(int letterNumber) { if (letterNumber < 1 || letterNumber > 26) { return null; } int letterIndexAscii = 'A' + letterNumber - 1; return "" + (char) letterIndexAscii; } ~~~ 本文的例子[learn-reactive](https://github.com/ddean2009/learn-reactive/tree/master/reactorIntroduction) > 本文作者:flydean程式那些事 > > 本文連結:[http://www.flydean.com/reactor-core-in-depth/](http://www.flydean.com/reactor-core-in-depth/) > > 本文來源:flydean的部落格 > > 歡迎關注我的公眾號:「程式那些事」最通俗的解讀,最深刻的乾貨,最簡潔的教程,眾多你不知道的小技巧等你來發現!