RxJava學習 - 9. Multicasting, Replaying, and Caching
RxJava學習 - 9. Multicasting, Replaying, and Caching
我們已經見過hot和cold Observable,雖然大部分是cold(甚至使用Observable.interval()的)。當你有不止一個Observer的時候,預設行為是為每個Observer增加一個分開的流。你也許希望這樣,也許不希望這樣,我們需要意識到:什麼時候,通過multicasting(使用ConnectableObservable)會把一個Observable強制變成hot。
Understanding multicasting
看下面的程式碼:
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable<Integer> threeIntegers = Observable.range(1, 3);
threeIntegers.subscribe(i -> System.out.println("Observer One: " + i) );
threeIntegers.subscribe(i -> System.out.println("Observer Two: " + i));
}
}
輸出是
Observer One: 1
Observer One: 2
Observer One: 3
Observer Two: 1
Observer Two: 2
Observer Two: 3
上面的程式,第一個Observer接收完所有的三個emissions,然後呼叫onComplete()。然後,第二個Observer接收三個emissions(重新生成的),然後呼叫onComplete()。這是兩個分開的流。如果我們想把他們合併到一個流,把每個emission都發給Observers,可以呼叫Observable的publish(),這樣返回一個ConnectableObservable。設定Observers,呼叫connect()來開始發射,兩個Observers收到了相同的emissions:
import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
public class Launcher {
public static void main(String[] args) {
ConnectableObservable<Integer> threeIntegers =
Observable.range(1, 3).publish();
threeIntegers.subscribe(i -> System.out.println("Observer One: " + i));
threeIntegers.subscribe(i -> System.out.println("Observer Two: " + i));
threeIntegers.connect();
}
}
輸出是
Observer One: 1
Observer Two: 1
Observer One: 2
Observer Two: 2
Observer One: 3
Observer Two: 3
使用ConnectableObservable會強制源成為hot,發射一個流給所有的Observers。這就叫multicasting,但是呼叫不同的operators,還有細微的差別。
甚至當你呼叫publish(),使用ConnectableObservable,下來的任何operators還是可以增加分開的流。我們看看這行為,該如何管理。
Multicasting with operators
看看multicasting在一個operators鏈內是如何工作的,我們使用Observable.range(),然後map每個emission成一個隨機整數。因為這些隨機值對每個訂閱來說,是不確定的,也是不同的,可以讓我們觀察multicasting是否工作了。
我們發射數字1-3,對映成一個0-100000的隨機整數。如果有兩個Observers,我們可以希望每個收到不同的值。注意,你的輸出和我的輸出是不同的,我們感興趣的是,確認兩個Observers接收到不同的隨機數:
import io.reactivex.Observable;
import java.util.concurrent.ThreadLocalRandom;
public class Launcher {
public static void main(String[] args) {
Observable<Integer> threeRandoms = Observable.range(1,3).map(i -> randomInt());
threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));
}
public static int randomInt() {
return ThreadLocalRandom.current().nextInt(100000);
}
}
我的輸出是
Observer 1: 6207
Observer 1: 10604
Observer 1: 5121
Observer 2: 7127
Observer 2: 94588
Observer 2: 34253
看,Observable.range()源產生了兩個分開的emission生成器,每個都是cold的。每個流有自己的分開的map()例項,
於是,每個Observer得到了不同的隨機數。流的結構如下:
你會說,那我要給兩個Observers發射相同的三個隨機數,可以首先是在Observable.range()之後呼叫publish(),產生ConnectableObservable。然後,可以呼叫map(),然後是Observers呼叫connect()。但是,你看到了,不是你期望的結果。每個Observer得到了分開的隨機數:
import io.reactivex.Observable;
import java.util.concurrent.ThreadLocalRandom;
public class Launcher {
public static void main(String[] args) {
ConnectableObservable<Integer> threeInts = Observable.range(1,3).publish();
Observable<Integer> threeRandoms = threeInts.map(i -> randomInt());
threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));
threeInts.connect();
}
public static int randomInt() {
return ThreadLocalRandom.current().nextInt(100000);
}
}
輸出是
Observer 1: 94084
Observer 2: 94961
Observer 1: 2308
Observer 2: 84564
Observer 1: 9046
Observer 2: 78881
為什麼呢?Observable.range()之後multicast,但是,multicasting發生在map()之前。map()之後,每個Observer仍然得到一個分開的流。publish()之前的任何事被組合成一個流(或者更技術一點,一個代理Observer)。
但是publish()之後,會fork成分開的流,如下圖:
如果我們想防止map()產生兩個分開的流,我們需要在map()之後呼叫publish():
import io.reactivex.Observable;
import java.util.concurrent.ThreadLocalRandom;
public class Launcher {
public static void main(String[] args) {
ConnectableObservable<Integer> threeRandoms = Observable.range(1,3)
.map(i -> randomInt()).publish();
threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
threeRandoms.subscribe(i -> System.out.println("Observer 2: " + i));
threeRandoms.connect();
}
public static int randomInt() {
return ThreadLocalRandom.current().nextInt(100000);
}
}
輸出是
Observer 1: 33845
Observer 2: 33845
Observer 1: 50389
Observer 2: 50389
Observer 1: 71504
Observer 2: 71504
終於對了!每個Observer得到了相同的三個隨機數。現在,一個流例項通過了整個鏈,因為map()在publish()的前面,而不是後面:
When to multicast
多個Observers的時候,Multicasting有利於防止冗餘工作。也可以提高效能,減少記憶體和CPU的使用,或者能簡化業務邏輯(當所有的Observers需要相同的emissions的時候)。
資料驅動的cold Observables應該只有在你想提高效能或者多個Observers接收相同資料的時候使用multicast。記住,multicasting增加hot ConnectableObservables,你不得不小心並且及時呼叫onnect(),這樣Observers才不會錯過資料。
甚至如果你的源Observable是hot(比如JavaFX或者Android的UI事件),putting operators也可能導致冗餘工作和監聽。當只有一個Observer的時候,用不著multicast,multicasting能導致不必要的負擔。但是,如果有多個Observers,你需要找到可以multicast和合並上遊操作的代理點。這個點是典型的邊界,在這裡,Observers有相同的上游操作,和不同的下游操作。
比如,你有一個Observer,用來列印隨機數,但是另一個Observer使用reduce()求和。在這個點,一個流應該fork成兩個分開的流,因為他們不再冗餘,做的是不同的工作:
import io.reactivex.Observable;
import java.util.concurrent.ThreadLocalRandom;
public class Launcher {
public static void main(String[] args) {
ConnectableObservable<Integer> threeRandoms = Observable.range(1, 3)
.map(i -> randomInt()).publish();
//Observer 1 - print each random integer
threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
//Observer 2 - sum the random integers, then print
threeRandoms.reduce(0, (total, next) -> total + next)
.subscribe(i -> System.out.println("Observer 2: " + i));
threeRandoms.connect();
}
public static int randomInt() {
return ThreadLocalRandom.current().nextInt(100000);
}
}
輸出是
Observer 1: 96689
Observer 1: 9730
Observer 1: 86978
Observer 2: 193397
Automatic connection
有時候,你想手動呼叫ConnectableObservable的connect(),控制emissions開始發射的時間。允許Observable動態connect要小心點,這時候,Observers容易錯過emissions。
autoConnect()
ConnectableObservable可以很方便地使用autoConnect()。對於一個給定的ConnectableObservable,呼叫autoConnect()會返回一個Observable,在一定數量的Observers訂閱以後,它會自動呼叫connect()。前面的例子有兩個Observers,可以在publish()之後,立刻呼叫autoConnect(2):
import io.reactivex.Observable;
import java.util.concurrent.ThreadLocalRandom;
public class Launcher {
public static void main(String[] args) {
Observable<Integer> threeRandoms = Observable.range(1, 3)
.map(i -> randomInt())
.publish()
.autoConnect(2);
//Observer 1 - print each random integer
threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
//Observer 2 - sum the random integers, then print
threeRandoms.reduce(0, (total, next) -> total + next)
.subscribe(i -> System.out.println("Observer 2: " + i));
}
public static int randomInt() {
return ThreadLocalRandom.current().nextInt(100000);
}
}
這樣做,省掉了ConnectableObservable和後面的connect()呼叫。它會在得到兩個訂閱以後開始發射。
甚至當所有的下游Observers完成了或者dispose,autoConnect()會為源儲存它的訂閱。如果源finite並且disposes,如果新的Observer想訂閱,它不會再次訂閱源。
如果我們在上面的例子上增加第三個Observer,autoConnect()的引數還是2,第三個Observer會錯過這些emissions:
import io.reactivex.Observable;
import java.util.concurrent.ThreadLocalRandom;
public class Launcher {
public static void main(String[] args) {
Observable<Integer> threeRandoms = Observable.range(1, 3)
.map(i -> randomInt()).publish().autoConnect(2);
//Observer 1 - print each random integer
threeRandoms.subscribe(i -> System.out.println("Observer 1: " + i));
//Observer 2 - sum the random integers, then print
threeRandoms.reduce(0, (total, next) -> total + next).subscribe(i -> System.out.println("Observer 2: " + i));
//Observer 3 - receives nothing
threeRandoms.subscribe(i -> System.out.println("Observer 3: " + i));
}
public static int randomInt() {
return ThreadLocalRandom.current().nextInt(100000);
}
}
如果你沒傳numberOfSubscribers引數,預設值是1。下面的例子,我們publish和autoConnect Observable.interval(),第一個Observer開始發射emissions,3秒以後,另一個Observer來了,但是會錯過前面幾個emissions,後面的都接收到了:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<Long> seconds = Observable.interval(1, TimeUnit.SECONDS)
.publish()
.autoConnect();
//Observer 1
seconds.subscribe(i -> System.out.println("Observer 1: " + i));
sleep(3000);
//Observer 2
seconds.subscribe(i -> System.out.println("Observer 2: " + i));
sleep(3000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
如果numberOfSubscribers引數是0,它會立刻開始發射,不等任何Observers。
refCount() and share()
ConnectableObservable的refCount()類似autoConnect(1),在得到一個訂閱以後發射。但是,有一個重要的不同:當它不再有Observers的時候,會處置它自己,當有了新的訂閱,就重新開始。當它不再有Observers的時候,它不儲存對源的訂閱,當來了另一個Observer,就重新開始。
讓我們看個例子:有一個每秒發射的Observable.interval(),使用refCount()實現multicast。Observer 1接受了五個emissions,Observer 2接受了兩個。我們錯開這兩個訂閱,中間隔三秒鐘。因為這兩個訂閱是有限的(由於take()),在Observer 3來的時候,他們應該已經終止了,此時,不應該再有之前的Observers。注意,Observer 3開始於0:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<Long> seconds =
Observable.interval(1, TimeUnit.SECONDS)
.publish()
.refCount();
//Observer 1
seconds.take(5).subscribe(l -> System.out.println("Observer 1: " + l));
sleep(3000);
//Observer 2
seconds.take(2).subscribe(l -> System.out.println("Observer 2: " + l));
sleep(3000);
//there should be no more Observers at this point
//Observer 3
seconds.subscribe(l -> System.out.println("Observer 3: " + l));
sleep(3000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
輸出是
Observer 1: 0
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 2: 3
Observer 1: 4
Observer 2: 4
Observer 3: 0
Observer 3: 1
Observer 3: 2
也可以使用publish().refCount()的別名share():
Observable<Long> seconds =
Observable.interval(1, TimeUnit.SECONDS).share();
Replaying and caching
Multicasting也允許快取多個Observers之間共享的值。重播和快取資料是一個multicasting活動,我們將會看到怎麼做才是安全和有效率的。
Replaying
replay()是儲存一定範圍內的從前的emissions的一個強有力的辦法,當來了新的Observer就重新發射他們。它會返回一個ConnectableObservable,既能multicast emissions,也能發射一定範圍內的從前的emissions。它快取的從前的emissions會在來了新的Observer的時候立刻發射,然後它會發射之後的emissions。
讓我們從不帶引數的replay()開始。它會為遲來的Observers重播所有的從前的emissions,然後發射新產生的。如果我們使用每秒發射的Observable.interval(),
呼叫replay()實現multicast,重播從前的整數emissions。因為replay()返回ConnectableObservable,我們使用autoConnect(),這樣會在第一個訂閱之後發射。
三秒以後,來了第二個Observer,看看發生了什麼:
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
public class Launcher {
public static void main(String[] args) {
Observable<Long> seconds =
Observable.interval(1, TimeUnit.SECONDS)
.replay()
.autoConnect();
//Observer 1
seconds.subscribe(l -> System.out.println("Observer 1: " + l));
sleep(3000);
//Observer 2
seconds.subscribe(l -> System.out.println("Observer 2: " + l));
sleep(3000);
}
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
輸出是
Observer 1: 0
Observer 1: 1
Observer 1: 2
Observer 2: 0
Observer 2: 1
Observer 2: 2
Observer 1: 3
Observer 2: 3
Observer 1: 4
Observer 2: 4
Observer 1: 5
Observer 2: 5
看到了吧?三秒以後,Observer 2來了,立刻收到錯過的前面三個emissions:0、1和2.然後,它收到的emissions和Observer 1的一樣。注意,這樣做消耗大量的記憶體,replay()快取了它接收到的全部emissions。如果源是無限的,或者只關心最後的一些emissions,可以指定bufferSize引數,限制重播的數量。如果我們呼叫replay(2),第二個Observer不會收到0,但是會收到1和2。
如果你甚至在沒有訂閱的時候,也想使用replay()儲存快取的值,就和autoConnect()結合,而不是refCount()。比如下面的例子,第二個Observer只能接收到最後的值:
import io.reactivex.Observable;
public class Launcher {
public