RxJava學習 - 13. Transformers and Custom Operators
RxJava學習 - 13. Transformers and Custom Operators
可以使用compose()和lift()實現自己的operators。Observable和Flowable都有這兩個方法。
Transformers
有時候,可能想重新使用Observable或者Flowable鏈的某個片段,可以使用某種方法,把這些operators組合成新的operators。
ObservableTransformer和FlowableTransformer提供了重新使用程式碼的能力。
ObservableTransformer
以前有個例子,使用collect()把Observable轉成Single<ImmutableList>。看下面的例子:
import com.google.common.collect.ImmutableList;
import io.reactivex.Observable;
public class Launcher {
public static void main(String[] args) {
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
.collect(ImmutableList::builder, ImmutableList.Builder::add)
. map(ImmutableList.Builder::build)
.subscribe(System.out::println);
Observable.range(1, 15)
.collect(ImmutableList::builder, ImmutableList.Builder::add)
.map(ImmutableList.Builder::build)
.subscribe(System.out::println);
}
}
下面的程式碼出現了兩次:
collect(ImmutableList::builder, ImmutableList.Builder::add)
.map(ImmutableList.Builder::build)
我們可以想辦法把他們組合成新的operator。對於目標Observable,你可以實現ObservableTransformer<T, R>。這個類有一個apply()方法,接受一個Observable,返回一個Observable。在你的實現裡,你可以返回一個Observable鏈,把任何operators加給上游,然後返回Observable。
對我們的例子,要把Observable轉換成Observable<ImmutableList>。我們包裝一個ObservableTransformer<T, ImmutableList>:
public static <T> ObservableTransformer<T, ImmutableList<T>> toImmutableList() {
return new ObservableTransformer<T, ImmutableList<T>>() {
@Override
public ObservableSource<ImmutableList<T>> apply(Observable<T> upstream) {
return upstream.collect(ImmutableList::<T>builder, ImmutableList.Builder::add)
.map(ImmutableList.Builder::build)
.toObservable(); // must turn Single into Observable
}
};
}
因為collect()返回一個Single,我們呼叫toObservable()(因為ObservableTransformer期待一個Observable,而不是Single)。
ObservableTransformer只有一個抽象方法,所以可以使用lambda:
public static <T> ObservableTransformer<T, ImmutableList<T>> toImmutableList() {
return upstream -> upstream.collect(ImmutableList::<T>builder, ImmutableList.Builder::add)
.map(ImmutableList.Builder::build)
.toObservable(); // must turn Single into Observable
}
想在一個Observable鏈內呼叫Transformer,可以使用compose()方法。它接受一個ObservableTransformer<T, R>,返回轉換後的Observable:
import com.google.common.collect.ImmutableList;
import io.reactivex.Observable;
import io.reactivex.ObservableTransformer;
public class Launcher {
public static void main(String[] args) {
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
.compose(toImmutableList())
.subscribe(System.out::println);
Observable.range(1, 10)
.compose(toImmutableList())
.subscribe(System.out::println);
}
public static <T> ObservableTransformer<T, ImmutableList<T>> toImmutableList() {
return upstream -> upstream.collect(ImmutableList::<T>builder, ImmutableList.Builder::add)
.map(ImmutableList.Builder::build)
.toObservable(); // must turn Single into Observable
}
}
你也可以為特定的emission型別和接受的引數增加Transformers。例如,增加一個joinToString(),它接受一個分隔符,用來級聯字串:
import io.reactivex.Observable;
import io.reactivex.ObservableTransformer;
public class Launcher {
public static void main(String[] args) {
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
.compose(joinToString("/"))
.subscribe(System.out::println);
}
public static ObservableTransformer<String, String> joinToString(String separator) {
return upstream -> upstream
.collect(StringBuilder::new, (b,s) -> {
if (b.length() == 0)
b.append(s);
else
b.append(separator).append(s);
})
.map(StringBuilder::toString)
.toObservable();
}
}
FlowableTransformer
當你實現ObservableTransformer的時候,也許覺得增加FlowableTransformer會更好。這樣,你的operator可用於Observables,也可用於Flowables。
FlowableTransformer和ObservableTransformer差別不大。當然,和Flowables組合時,它支援背壓:
import com.google.common.collect.ImmutableList;
import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
public class Launcher {
public static void main(String[] args) {
Flowable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
.compose(toImmutableList())
.subscribe(System.out::println);
Flowable.range(1, 10)
.compose(toImmutableList())
.subscribe(System.out::println);
}
public static <T> FlowableTransformer<T, ImmutableList<T>> toImmutableList() {
return upstream -> upstream.collect(ImmutableList::<T>builder, ImmutableList.Builder::add)
.map(ImmutableList.Builder::build)
.toFlowable(); // must turn Single into Observable
}
}
Avoiding shared state with Transformers
比如,你想增加一個ObservableTransformer<T, IndexedValue>,它把每個emission和一個從0開始的連續索引配對。首先,你增加一個IndexedValue類:
static final class IndexedValue<T> {
final int index;
final T value;
IndexedValue(int index, T value) {
this.index = index;
this.value = value;
}
@Override
public String toString() {
return index + " - " + value;
}
}
然後,你增加一個ObservableTransformer<T, IndexedValue>,使用一個AtomicInteger做索引:
static <T> ObservableTransformer<T,IndexedValue<T>> withIndex() {
final AtomicInteger indexer = new AtomicInteger(-1);
return upstream -> upstream.map(v -> new IndexedValue<T>(indexer.incrementAndGet(), v));
}
看出問題了嗎?讓我們執行下面的程式。仔細看輸出:
import io.reactivex.Observable;
import io.reactivex.ObservableTransformer;
import java.util.concurrent.atomic.AtomicInteger;
public class Launcher {
public static void main(String[] args) {
Observable<IndexedValue<String>> indexedStrings =
Observable.just("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
.compose(withIndex());
indexedStrings.subscribe(v -> System.out.println("Subscriber 1: " + v));
indexedStrings.subscribe(v -> System.out.println("Subscriber 2: " + v));
}
static <T> ObservableTransformer<T,IndexedValue<T>> withIndex() {
final AtomicInteger indexer = new AtomicInteger(-1);
return upstream -> upstream.map(v -> new IndexedValue<T>(indexer.incrementAndGet(), v));
}
static final class IndexedValue<T> {
final int index;
final T value;
IndexedValue(int index, T value) {
this.index = index;
this.value = value;
}
@Override
public String toString() {
return index + " - " + value;
}
}
}
輸出是
Subscriber 1: 0 - Alpha
Subscriber 1: 1 - Beta
Subscriber 1: 2 - Gamma
Subscriber 1: 3 - Delta
Subscriber 1: 4 - Epsilon
Subscriber 2: 5 - Alpha
Subscriber 2: 6 - Beta
Subscriber 2: 7 - Gamma
Subscriber 2: 8 - Delta
Subscriber 2: 9 - Epsilon
注意,AtomicInteger的一個例項被兩個訂閱共享了。
可以為每個訂閱增加一個資源(比如AtomicInteger),包裝在Observable.defer()內:
static <T> ObservableTransformer<T,IndexedValue<T>> withIndex() {
return upstream -> Observable.defer(() -> {
AtomicInteger indexer = new AtomicInteger(-1);
return upstream.map(v -> new IndexedValue<T>(indexer.incrementAndGet(), v));
});
}
對於這個例子,也可以使用Observable.zip()或者zipWith(),解決我們的問題:
static <T> ObservableTransformer<T,IndexedValue<T>> withIndex() {
return upstream ->
Observable.zip(upstream,
Observable.range(0,Integer.MAX_VALUE),
(v,i) -> new IndexedValue<T>(i, v)
);
}
Operators
理想的情況下,你很少需要從頭開始通過實現ObservableOperator和FlowableOperator構造自己的operator。ObservableTransformer和FlowableTransformer可以滿足大多數需求。
可有時候,你發現不得不做些現有的operators不能做或者不容易做的事情。排除了任何選項,你可能不得不增加一個operator,在上游和下游之間操縱每個onNext()、onComplete()和onError()事件。
在你增加自己的operator之前,要先試試compose()。如果失敗了,推薦你在StackOverflow提問題。
實在不行,再構造自己的operator。
Implementing an ObservableOperator
實現自己的ObservableOperator(或者FlowableTransformer)要做更多的工作。不組合現有的operators,你需要攔截onNext()、onComplete()、onError()和onSubscribe(),實現自己的Observer。該Observer把onNext()、onComplete()和onError()事件傳給下游Observer。
比如,你要增加doOnEmpty() operator,當onComplete()被呼叫的時候,它會執行一個Action。要增加自己的ObservableOperator<Downstream, Upstream>,需要實現它的apply()方法。它接受一個Observer,返回一個Observer。然後,你可以通過呼叫lift(),使用這個ObservableOperator:
import io.reactivex.Observable;
import io.reactivex.ObservableOperator;
import io.reactivex.Observer;
import io.reactivex.functions.Action;
import io.reactivex.observers.DisposableObserver;
public class Launcher {
public static void main(String[] args) {
Observable.range(1, 5)
.lift(doOnEmpty(() -> System.out.println("Operation 1 Empty!")))
.subscribe(v -> System.out.println("Operation 1: " + v));
Observable.<Integer>empty()
.lift(doOnEmpty(() -> System.out.println("Operation 2 Empty!")))
.subscribe(v -> System.out.println("Operation 2: " + v));
}
public static <T> ObservableOperator<T, T> doOnEmpty(Action action) {
return new ObservableOperator<T, T>() {
@Override
public Observer<? super T> apply(Observer<? super T> observer) throws Exception {
return new DisposableObserver<T>() {
boolean isEmpty = true;
@Override
public void onNext(T value) {
isEmpty = false;
observer.onNext(value);
}
@Override
public void onError(Throwable t) {
observer.onError(t);
}
@Override
public void onComplete() {
if (isEmpty) {
try {
action.run();
} catch (Exception e) {
onError(e);
return;
}
}
observer.onComplete();
}
};
}
};
}
}
就像Transformers一樣,當增加自己的operators的時候,不要在訂閱之間共享狀態。
還有,onNext()、onComplete()和onError()呼叫是可以按需要操縱和混合的。比如,toList()不會把收到的每個onNext()都傳給下游。它會在內部列表中收集這些emissions。當上遊呼叫了onComplete(),它就呼叫下游的onNext(),把列表傳給下游,然後呼叫onComplete()。現在,我們實現自己的myToList(),來理解toList()如何工作:
import io.reactivex.Observable;
import io.reactivex.ObservableOperator;
import io.reactivex.observers.DisposableObserver;
import java.util.ArrayList;
import java.util.List;
public class Launcher {
public static void main(String[] args) {
Observable.range(1, 5)
.lift(myToList())
.subscribe(v -> System.out.println("Operation 1: " + v));
Observable.<Integer>empty()
.lift(myToList())
.subscribe(v -> System.out.println("Operation 2: " + v));
}
public static <T> ObservableOperator<List<T>,T> myToList() {
return observer -> new DisposableObserver<T>() {
ArrayList
相關推薦
RxJava學習 - 13. Transformers and Custom Operators
RxJava學習 - 13. Transformers and Custom Operators
Transformers
ObservableTransformer
FlowableTransformer
Avoiding shared
RxJava學習 - 12. Flowables and Backpressure
RxJava學習 - 12. Flowables and Backpressure
Understanding backpressure
An example that needs backpressure
Introducing the Flo
RxJava學習 - 10. Concurrency and Parallelization
RxJava學習 - 10. Concurrency and Parallelization
Introducing RxJava concurrency
Keeping an application alive
Understanding
python 3.x 學習筆記13 (socket_ssh and socket_文件傳輸)
粘包問題 問題 取出 nec imp 傳輸文件 ket color md5 ssh服務端
import socket,os
server = socket.socket()
server.bind((‘localhost‘,6666))
server.listen()
RxJava學習 - 5. Single, Completable, and Maybe
RxJava學習 - 5. Single, Completable, and Maybe
Single
Maybe
Completable
Single
Single實際上只發射一次。它有自己的SingleObserver介面:
i
RxJava學習 - 11. Switching, Throttling, Windowing, and Buffering
RxJava學習 - 11. Switching, Throttling, Windowing, and Buffering
Buffering
Fixed-size buffering
Time-based buffering
Boun
RxJava學習 - 9. Multicasting, Replaying, and Caching
RxJava學習 - 9. Multicasting, Replaying, and Caching
Understanding multicasting
Multicasting with operators
When to multicast
mysql-學習-13-20170619-MySQL備份恢復-xtrabackup-2
soc tar pex cfa nod 遠程 表空間 tid doc mysql-學習-13-20170619-MySQL備份恢復-xtrabackup-2
【管理員】吳炳錫(82565387) 20:34:15基於xtrabackup的增備,只需要了解如果需要增備建
手勢跟蹤論文學習:Realtime and Robust Hand Tracking from Depth(三)Cost Function
引入 tail track col div 理想 問題 from details
iker原創。轉載請標明出處:http://blog.csdn.net/ikerpeng/article/details/39050619
Realtime and Robust Hand
struts2學習(13)struts2文件上傳和下載(1)
action alt for ide 上傳文件 fig .org dac str 一、Struts2文件上傳:
二、配置文件的大小以及允許上傳的文件類型:
三、大文件上傳:
如果不配置上傳文件的大小,struts2默認允許上傳文件最大為2M; 2097152Byte;
最權威的RXJaVa學習資料
學習 android music roi androi andro java學習 oid com aNDROID%E8%AF%BB%E5%86%99%E6%96%87%E4%BB%B6%E7%9A%84N%E7%A7%8D%E5%86%99%E6%B3%95
http:/
OC學習13——Foundation框架中的集合
str 集合類 結構 pan sar set 體系 隊列 數組 OC集合類是一些非常有用的工具類,它可以用於存儲多個數量不等的對象,並可以實現常用的數據結構(棧、隊列等),此外,OC集合還可用於保存具有映射關系的關聯數組。OC的集合大致可以分為:NSArray、NSSe
php之快速入門學習-13(PHP 循環 - While 循環)
style 快速入門 數組 執行 span tro 運行 設置 快速 PHP 循環 - While 循環
循環執行代碼塊指定的次數,或者當指定的條件為真時循環執行代碼塊。
PHP 循環
在您編寫代碼時,您經常需要讓相同的代碼塊一次又一次地重復運行。我們可以在代
C語言學習13
排序 %d uic pri quick class 學習 span bsp 快速排序
1 //快速排序
2 #include <stdio.h>
3
4 void quicksort(int a[], int left, int right);
【c學習-13】
pow(x 字符數 print 判斷 ssa python 常量 ++ 叠代 /*庫函數
1:數學函數庫:math.h
abs():絕對值;
acos(),asin(),atan():cos,sin,tan的倒數
exp():指數的次
matplotlib的學習13-subplot分格顯示
layout nbsp light pyplot 設置 out imp 標題 true
import matplotlib.pyplot as plt
plt.figure()#創建一個圖像窗口
# 使用plt.subplot2grid來創建第1個小圖, (3,3)表
python學習(13)
遞歸實現 切片 非遞歸 max 小數 嵌套 bcd urn file random.uniform(a,b)隨機生成a,b之間的一個浮點數
>> random.uniform(1,20)1.0130916166719703
習題1:生成[“z1”,”y2”,
[論文學習]Private traits and attributes are predictable from digital records of human behavior
cited:
Kosinski M, Stillwell D, Graepel T. Private traits and attributes are predictable from digital records of human behavior[J]. Proceedi
RxJava學習 - 6. Disposing
RxJava學習 - 6. Disposing
Handling a Disposable within an Observer
Using CompositeDisposable
Handling Disposal with Observable.crea
RxJava學習 - 4. Other Observable sources
RxJava學習 - 4. Other Observable sources
Observable.range()
Observable.interval()
Observable.future()
Observable.empty()
Obse