RxJava 變換、 組合 、 合併操作符
阿新 • • 發佈:2019-02-08
/** * @author :houde * 時間:2018/1/23 * Des:RxJava 變換操作符 */ public class RxOperateActivity extends AppCompatActivity { private final String TAG = "RxOperateActivity"; Observable<Integer> observable1 = Observable.just(1,2,3,4); Observable<String> observable2 = Observable.just("A","B","C"); private Observer<String> stringObserver = new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG,"開始採用subscribe連線"); } @Override public void onNext(String s) { Log.e(TAG,s); } @Override public voidonError(Throwable e) { Log.e(TAG,e.getMessage()); } @Override public void onComplete() { Log.e(TAG,"對Complete事件作出響應"); } }; private Observer<Integer> intObserver = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG,"開始採用subscribe連線"); } @Override public void onNext(Integer integer) { Log.e(TAG,"事件 = " + integer); } @Override public void onError(Throwable e) { Log.e(TAG,e.getMessage()); } @Override public void onComplete() { Log.e(TAG,"對Complete事件作出響應"); } } ; @Override protected void onCreate(@Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_image); //轉換操作符 /** * 作用 * 對被觀察者傳送的每1個事件都通過指定的函式處理,從而變換成另外一種事件 * 即:將被觀察者傳送的事件轉換為任意的型別事件。 * 應用場景 * 資料型別轉換 * 具體使用 * 下面以將 使用Map()將事件的引數從整型變換成字串型別為例子說明 */ map(); /** * 作用: * 將被觀察者傳送的事件序列進行拆分&單獨轉換,再合併成一個新的事件序列,最後再進行傳送 * * 原理 * 1.為事件序列中每個事件都建立一個 Observable 物件; * 2.將對每個 原始事件 轉換後的 新事件 都放入到對應 Observable物件; * 3.將新建的每個Observable 都合併到一個 新建的、總的Observable 物件; * 4.新建的、總的Observable 物件 將 新合併的事件序列 傳送給觀察者(Observer) * 應用場景 * 無序的將被觀察者傳送的整個事件序列進行變換 */ flatMap(); /** * 作用:類似FlatMap()操作符 * 與FlatMap()的 區別在於:拆分 & 重新合併生成的事件序列 的順序 = 被觀察者舊序列生產的順序 * 應用場景 * 有序的將被觀察者傳送的整個事件序列進行變換 */ concatMap(); /** * 作用 * 定期從 被觀察者(Observable)需要傳送的事件中獲取一定數量的事件&放到快取區中, * 最終傳送 * * 應用場景 * 快取被觀察者傳送的事件 */ buffer(); //組合和並操作符 /** * 作用 * 組合多個被觀察者一起傳送資料,合併後 按傳送順序序列執行 * 二者區別: * 組合被觀察者的數量,即concat()組合被觀察者數量≤4個, * 而concatArray()則可>4個 */ concat(); concatArray(); /** * 作用 * 組合多個被觀察者一起傳送資料,合併後 按時間線並行執行 * * 二者區別: * 組合被觀察者的數量,即merge()組合被觀察者數量≤4個, * 而mergeArray()則可>4個 * * 區別上述concat()操作符: * 同樣是組合多個被觀察者一起傳送資料,但concat()操作符合並後是按傳送順序序列執行 */ merge(); mergeArray(); /** * 背景: * 使用merge和concat操作符時, * 衝突: * 若其中一個被觀察者發出onError事件,則會終止其他被觀察者繼續傳送事件 * 解決方案: * 若希望onError事件推遲到其他被觀察者傳送完事件之後再觸發 * 即需要使用對應的mergeDelayError()或concatDelayError操作符 * */ concatDelayError(); mergeDelayError(); //事件的合併 /** * 作用 * 合併多個被觀察者(Observable)傳送的事件, * 生成一個新的事件序列(即組合過後的事件序列),並最終傳送 * 特別注意: * 事件組合方式 = 嚴格按照原先事件序列 進行對位合併 * 最終合併的事件數量 = 多個被觀察者(Observable)中數量最少的數量 * 特別注意: * 儘管被觀察者2的事件D沒有事件與其合併,但還是會繼續傳送 * 若在被觀察者1 & 被觀察者2的事件序列最後傳送onComplete()事件, * 則被觀察者2的事件D也不會發送,測試結果如下 * 定義: * 屬於Rxjava中的組合 * 作用: * 1.合併多個被觀察者(Observable)傳送的事件 * 2.生成一個新的事件序列(即合併之後的序列),並最終傳送 * 原理: * 1.事件組合方式 = 嚴格按照原先事件序列進行對位合併 * 2.最終合併的事件數量 = 多個被觀察者(Observable)中數量最少的數量 * * 應用場景: * 1.當展示的資訊需要從多個地方獲取(即 資訊 = 資訊1 + 資訊2)& 統一結合後再展示 * 2.如:合併網路請求的傳送 & 統一展示結果 */ zip(); /** * 作用 * 當兩個Observables中的任何一個傳送了資料後,將先發送了資料的Observables * 的最新(最後)一個數據與另外一個Observable傳送的每個資料結合,最終基於該 * 函式的結果傳送資料 * 與Zip()的區別: * Zip() = 按個數合併, * 即1對1合併;CombineLatest() = 按時間合併,即在同一個時間點上合併 * * combineLatestDelayError() * 作用類似於concatDelayError() / mergeDelayError() ,即錯誤處理,此處不作過多描述 */ combineLatest(); /** * 作用 * 把被觀察者需要傳送的事件聚合成1個事件 & 傳送 * * 聚合的邏輯根據需求撰寫,但本質都是前2個數據聚合,然後與後1個數據繼續進行聚合,依次類推 */ reduce(); /** *作用 * 將被觀察者Observable傳送的資料事件收集到一個數據結構裡 */ collect(); //傳送事件前追加發送事件 /** * 作用 * 在一個被觀察者傳送事件前,追加發送一些資料/一個新的被觀察者 */ startWith(); startWithArray(); //統計傳送事件數量 /** * 作用 * 統計被觀察者傳送事件的數量 */ count(); } private void count() { observable1.count().subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.e(TAG,"傳送事件的次數 = " + aLong); } }); } private void startWithArray() { Observable.just(4,5,6,7) .startWith(0) .startWithArray(1,2,3) .subscribe(intObserver); } private void startWith() { Observable.just(1,2,3,4) .startWith(0) .subscribe(intObserver); } private void collect() { observable1.collect( // 1. 建立資料結構(容器),用於收集被觀察者傳送的資料 new Callable<ArrayList<Integer>>() { @Override public ArrayList<Integer> call() throws Exception { return new ArrayList<>(); } // 2. 對傳送的資料進行收集 }, new BiConsumer<ArrayList<Integer>, Integer>() { @Override public void accept(ArrayList<Integer> list, Integer integer) throws Exception { // 引數說明:list = 容器,integer = 後者資料 list.add(integer); // 對傳送的資料進行收集 } }).subscribe(new Consumer<ArrayList<Integer>>() { @Override public void accept(ArrayList<Integer> list) throws Exception { Log.e(TAG, "本次傳送的資料是: " + list); } }); } private void reduce() { observable1.reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer s1, Integer s2) throws Exception { Log.e(TAG, "本次計算的資料是: "+s1 +" 乘 "+ s2); return s1 * s2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.e(TAG, "最終計算的結果是: " + integer); } }); } private void combineLatest() { Log.e(TAG,"-------------------combineLatest-------------------"); Observable.combineLatest(observable1, observable2, new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String s) throws Exception { return s + integer; } }).subscribe(stringObserver); } private void zip() { Log.e(TAG,"-------------------zip-------------------"); Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String s) throws Exception { return integer + s; } }).subscribe(stringObserver); } private void mergeDelayError() { Log.e(TAG,"-------------------mergeDelayError-------------------"); Observable.mergeArrayDelayError( Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); // 傳送Error事件,因為使用了concatDelayError,所以第2個Observable將會發送事件,等傳送完畢後,再發送錯誤事件 emitter.onError(new NullPointerException("這裡傳送了一個onError()")); emitter.onComplete(); } }), Observable.just(4, 5, 6)) .subscribe(intObserver); } private void concatDelayError() { Log.e(TAG,"-------------------concatDelayError-------------------"); Observable.concat( Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); // 傳送Error事件,因為無使用concatDelayError,所以第2個Observable將不會發送事件 emitter.onError(new NullPointerException("這裡傳送了一個onError()")); emitter.onComplete(); } }), Observable.just(4, 5, 6)) .subscribe(intObserver); } private void mergeArray(){ Log.e(TAG,"-------------------mergeArray-------------------"); Observable.mergeArray(Observable.just(1,2,3), Observable.just(4,5,6), Observable.just(7,8,9), Observable.just(10,11,12), Observable.just(13,14,15), Observable.just(16,17,18) ).subscribe(intObserver); } private void merge(){ Log.e(TAG,"-------------------merge-------------------"); Observable.merge(Observable.just(1,2,3,4), Observable.just(5,6), Observable.just(7,8,9), Observable.just(10,11,12,13) ) .subscribe(intObserver); } private void concatArray(){ Log.e(TAG,"-------------------concatArray-------------------"); Observable.concatArray(Observable.just(1,2,3), Observable.just(4,5,6), Observable.just(9,10), Observable.just(11,12,13), Observable.just(14,15,16) ).subscribe(intObserver); } private void concat(){ // concat():組合多個被觀察者(≤4個)一起傳送資料 // 注:序列執行 Log.e(TAG,"-------------------concat-------------------"); Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6), Observable.just(9,10) ).subscribe(intObserver); } private void buffer() { Log.e(TAG,"-------------------buffer-------------------"); Observable.just(1,2,3,4,5,6,7,8) // 設定快取區大小 & 步長 // 快取區大小 = 每次從被觀察者中獲取的事件數量 // 步長 = 每次獲取新事件的數量 .buffer(3,1) .subscribe(new Observer<List<Integer>>() { @Override public void onSubscribe(Disposable d) { Log.e(TAG,"開始採用subscribe連線"); } @Override public void onNext(List<Integer> ints) { Log.e(TAG,"快取區裡的事件個數" + ints.size()); for(int i = 0 ,size = ints.size(); i < size;i++){ Log.e(TAG,"事件 = " + i); } } @Override public void onError(Throwable e) { Log.e(TAG,e.getMessage()); } @Override public void onComplete() { Log.e(TAG,"對Complete事件作出響應"); } }); } private void concatMap() { Log.e(TAG,"-------------------concatMap-------------------"); Observable.just(4,3,2,1) .concatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.fromIterable(getEvents(integer)); } }) .subscribe(stringObserver); } private void flatMap() { Log.e(TAG,"-------------------flatMap-------------------"); Observable.just(1,2,3,4) .flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.fromIterable(getEvents(integer)); } }).subscribe(stringObserver); } @NonNull private List<String> getEvents(Integer integer) { List<String> event = new ArrayList<>(3); for(int i = 0 ; i < 3 ; i++){ event.add("我是事件 " + integer + "拆分後的子事件" + i); } return event; } private void map() { Log.e(TAG,"-------------------map-------------------"); Observable.just(1,2,3,4).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return "使用 Map變換操作符 將事件" + integer +"的引數從 整型"+integer + " 變換成 字串型別" + integer; } }).subscribe(stringObserver); } }