1. 程式人生 > >RxJava2.x學習總結

RxJava2.x學習總結

公司專案中網路框架使用了目前主流的Retrofit+Okhttp+RxJava框架進行開發,三者聯合使用極大簡化了網路請求及請求結果的處理。對於RxJava其繁多的操作符讓人眼花繚亂,但是隻有掌握了這些知識,專案中才能運用自如。鑑於此,本篇將系統的學習總結RxJava2.x中的知識體系。

create

create操作符作用為建立一個Observable物件(上游):

io.reactivex.Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }) .subscribe(new
Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer i) { Log.d(TAG, "onNext receive: " + i); } @Override
public void onError(@NonNull Throwable e) { Log.e(TAG, "onError receive: " + e.getLocalizedMessage()); } @Override public void onComplete() { Log.d(TAG, "onComplete receive!"); } });

注:

  • 建立Observable的同時,需要傳入ObservableOnSubscribe物件,裡面有個subscribe()回撥函式,用於向下遊發射事件
  • 上游呼叫onComplete()之後下游將停止接受新的事件,但是並不阻止上游繼續呼叫onNext()傳送事件
  • subscribe()是連結上游與下游的橋樑,傳入的Observer物件即下游接受事件的物件
  • onCompete()onError()兩者互斥,不可以同時觸發

Map

map

將上游傳送的事件變換成新的Observable物件:

io.reactivex.Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                        e.onNext(1);
                        e.onNext(2);
                        e.onNext(3);

                        e.onComplete();
                    }
                })
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(@NonNull Integer integer) throws Exception {
                        return "the " + integer;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull String str) {
                        Log.d(TAG, "onNext receive: " + str);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        Log.e(TAG, "onError receive: " + e.getLocalizedMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete receive!");
                    }
                });

Zip

zip

zip操作符的作用主要是將多個Observable物件發射的事件進行合併,而且每個Observable發射的事件都是按照先後順序進行組合的,因此變換後發射的事件個數由上述Observable中發射的事件個數最少的Observable決定:

io.reactivex.Observable
                .zip(io.reactivex.Observable
                                .create(new ObservableOnSubscribe<String>() {
                                    @Override
                                    public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                                        if (!e.isDisposed()) {
                                            e.onNext("A");
                                            Log.d(TAG, "String emit : A\n");

                                            e.onNext("B");
                                            Log.d(TAG, "String emit : B\n");

                                            e.onNext("C");
                                            Log.d(TAG, "String emit : C\n");
                                        }
                                    }
                                }),
                        io.reactivex.Observable
                                .create(new ObservableOnSubscribe<Integer>() {
                                    @Override
                                    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                                        if (!e.isDisposed()) {
                                            e.onNext(1);
                                            Log.d(TAG, "Integer emit : 1\n");

                                            e.onNext(2);
                                            Log.d(TAG, "Integer emit : 2\n");

                                            e.onNext(3);
                                            Log.d(TAG, "Integer emit : 3\n");

                                            e.onNext(4);
                                            Log.d(TAG, "Integer emit : 4\n");

                                            e.onNext(5);
                                            Log.d(TAG, "Integer emit : 5\n");
                                        }
                                    }
                                }), new BiFunction<String, Integer, String>() {
                            @Override
                            public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
                                return s + integer;
                            }
                        })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "Observer accept: " + s);
                    }
                });

concat

concat操作符將多個Observable連線成一個Observable,並按照順序發射事件:

io.reactivex.Observable
                .concat(io.reactivex.Observable.just(1, 7, 3), io.reactivex.Observable.just(2, 4, 6))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "conact: " + integer);
                    }
                });

flatmap

flatmap的作用是將一個Observable轉換成多個Observables,然後將多個Observables再裝進單獨的Observable中。該操作符不能保證事件的有序性。

io.reactivex.Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                        e.onNext(1);
                        e.onNext(2);
                        e.onNext(3);
                        e.onNext(4);
                        e.onNext(5);
                        e.onNext(6);
                        e.onNext(7);
                        e.onNext(8);
                    }
                })
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
                        Log.d(TAG, "after flatMap: " + integer);
                        return io.reactivex.Observable
                                .just("converted " + integer)
                                .delay(2, TimeUnit.SECONDS);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "accept: " + s);
                    }
                });

下游接收的事件無序,log如下:

12-26 14:46:00.541 30993-31091/com.esimtek.fortest D/MainActivity: after flatMap: 1
12-26 14:46:00.547 30993-31091/com.esimtek.fortest D/MainActivity: after flatMap: 2
12-26 14:46:00.548 30993-31091/com.esimtek.fortest D/MainActivity: after flatMap: 3
12-26 14:46:00.554 30993-31091/com.esimtek.fortest D/MainActivity: after flatMap: 4
12-26 14:46:00.555 30993-31091/com.esimtek.fortest D/MainActivity: after flatMap: 5
12-26 14:46:00.555 30993-31091/com.esimtek.fortest D/MainActivity: after flatMap: 6
12-26 14:46:00.558 30993-31091/com.esimtek.fortest D/MainActivity: after flatMap: 7
12-26 14:46:00.558 30993-31091/com.esimtek.fortest D/MainActivity: after flatMap: 8
12-26 14:46:02.548 30993-30993/com.esimtek.fortest D/MainActivity: accept: converted 1
12-26 14:46:02.549 30993-30993/com.esimtek.fortest D/MainActivity: accept: converted 3
12-26 14:46:02.552 30993-30993/com.esimtek.fortest D/MainActivity: accept: converted 2
12-26 14:46:02.555 30993-30993/com.esimtek.fortest D/MainActivity: accept: converted 4
12-26 14:46:02.555 30993-30993/com.esimtek.fortest D/MainActivity: accept: converted 5
12-26 14:46:02.556 30993-30993/com.esimtek.fortest D/MainActivity: accept: converted 6
12-26 14:46:02.558 30993-30993/com.esimtek.fortest D/MainActivity: accept: converted 7
12-26 14:46:02.559 30993-30993/com.esimtek.fortest D/MainActivity: accept: converted 8

concatMap功能與flatMap相同,唯一區別是concatMap保證了發射事件的順序。

distinct

該操作符功能如示意圖,去重:

io.reactivex.Observable
                .just(1, 2, 3, 3, 3, 4, 5, 5, 5)
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "accept: " + integer);
                    }
                });

filter

filter顧名思義,是個過濾器,過濾掉回撥函式test()返回false的值:

io.reactivex.Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 10; i++) {
                            e.onNext(i);
                        }
                    }
                })
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@NonNull Integer s) throws Exception {
                        return s % 2 == 0;
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "accept: " + integer);
                    }
                });

Log:

12-26 15:20:59.540 2376-2376/com.esimtek.fortest D/MainActivity: accept: 0
12-26 15:20:59.540 2376-2376/com.esimtek.fortest D/MainActivity: accept: 2
12-26 15:20:59.540 2376-2376/com.esimtek.fortest D/MainActivity: accept: 4
12-26 15:20:59.540 2376-2376/com.esimtek.fortest D/MainActivity: accept: 6
12-26 15:20:59.540 2376-2376/com.esimtek.fortest D/MainActivity: accept: 8

這裡寫圖片描述

buffer操作符有兩個引數buffer(count, skip),其中count 為需要被髮射的事件的最大個數,skip作用是確定每個每個buffer起始位置的索引值,當count與skip的值相等時,效果同buffer(count)

io.reactivex.Observable
                .just(1, 2, 3, 4, 5, 6, 7, 8)
                .buffer(3, 2)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        Log.d(TAG, "accept: " + integers);
                    }
                });

Log如下:

12-26 15:48:24.406 26104-26104/com.esimtek.fortest D/MainActivity: accept: [1, 2, 3]
12-26 15:48:24.406 26104-26104/com.esimtek.fortest D/MainActivity: accept: [3, 4, 5]
12-26 15:48:24.406 26104-26104/com.esimtek.fortest D/MainActivity: accept: [5, 6, 7]
12-26 15:48:24.406 26104-26104/com.esimtek.fortest D/MainActivity: accept: [7, 8]

Timer

timer

timer操作符實現計時器功能,預設執行在工作執行緒:

Log.d(TAG, "start time: " + System.currentTimeMillis()
                + "\ncurr thread: " + Thread.currentThread().getName());

        io.reactivex.Observable
                .timer(2, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG, "end time: " + System.currentTimeMillis()
                                + "\n end thread: " + Thread.currentThread().getName());
                    }
                });

Log:

12-26 16:01:42.546 7890-7890/com.esimtek.fortest D/MainActivity: start time: 1514275302546
                                                                 curr thread: main
12-26 16:01:44.627 7890-7988/com.esimtek.fortest D/MainActivity: end time: 1514275304624
                                                                  end thread: RxComputationThreadPool-1

interval

interval操作符用於間隔指定時間執行某個操作,其接受三個引數:initalDelay(首次發射延時), period(發射間隔時間), timeUnit(時間單位):

mDisposable = io.reactivex.Observable
                .interval(3, 2, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {

                    }
                });

這裡有個問題,就是當accept()方法中如果存在UI操作,由於持有Context引用可能會造成Activity記憶體洩露。因此需要在ActivityOnDestory()方法中執行mDisposable.dispose():

    private Disposable mDisposable;

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null && !mDisposable.isDisposed())
            mDisposable.dispose();
    }

Skip

skip

如示意圖,skip接受一個引數count,這個引數指定了跳過count個數目開始接收。

Take

take

take操作符接受一個引數count,代表至多接受count個引數。

Just

just

just操作符代表一次發射資料,並執行onNext方法。

相關推薦

RxJava2.x學習總結

公司專案中網路框架使用了目前主流的Retrofit+Okhttp+RxJava框架進行開發,三者聯合使用極大簡化了網路請求及請求結果的處理。對於RxJava其繁多的操作符讓人眼花繚亂,但是隻有掌握了這些知識,專案中才能運用自如。鑑於此,本篇將系統的學習總結RxJ

webpack 1.x 學習總結

chunk 圖片 格式 模板 common git nod ack 單位 webpack介紹(from github):   A bundler for javascript and friends. Packs many modules into a few bundle

SpringBoot2.X學習總結:(1)註解部分

@SpringBootApplication包含了@SpringBootConfiguration,@EnableAutoConfiguration,@ComponentScan,一個註解相當於三個註解

RxJava2.x 學習教程(一)基本概念

RxJava是什麼、優點 RxJava (本文所有RxJava全都代指RxJava2)在 GitHub 主頁上的自我介紹是:RxJava – Reactive Extensions for the JVM – a library for composing asynch

Rxjava2.x學習記錄(三)

Rxjava2.x的內容和Rxjava1.x稍有差別,操作符部分基本不變 使用新增依賴 compile 'io.reactivex.rxjava2:rxandroid:2.0.1' compile

RxJava2.x 學習教程(三)常用操作符

錯誤處理類  Retry 當原始Observable在遇到錯誤時進行重試,目的是希望本次訂閱不以失敗事件結束! Observable.just(1, "2") .cast(Integer.class)//將被觀察者傳送的事件資料型別強轉為Intege

Rxjava2.x 封裝總結

封裝作為java的三大特性之一,相信每個開發者對封裝技術點都能說出一些,但是真正到專案實際開發,很多開發者都沒有注意,今天這篇部落格主要總結一下rxjava2.0使用過程中一些封裝。 一、執行緒排程封裝 先上一段簡單的程式碼,比較常見。 Flowa

MyBatis 學習總結 05 Mybatis3.x與Spring3.x整合 OLD

into 包括 mybatis 方法 成對 nec 自動化工具 tty red   本文通過加載mybatis-configuration.xml 文件來產生SqlSessionFactory,然後通過SqlSessionFactory去產生sqlSession,然後在通過

ElasticSearch6.x官方文件學習總結

一:配置。     配置JVM引數,系統引數,配配置檔案。系統啟動前通過Bootstrap Checks檢查配置。 二:升級ES。 三:資料副本模型。       針對資料讀寫時過程,以及在這個過程的錯誤處理進行介紹。 四:文件A

Rxjava2學習總結

Rxjava2基礎認知 形式正確的有限Observable 呼叫觀察者的onCompleted正好一次或者它的onError正好一次,而且此後不能再呼叫觀察者的任何其它方法。如果onComplete 或者 onError 走任何一個 都會 主動解除訂閱關

MyBatis學習總結(八)——Mybatis3.x與Spring4.x整合

一、搭建開發環境 1.1、使用Maven建立Web專案   執行如下命令: mvn archetype:create -DgroupId=me.gacl -DartifactId=spring4-mybatis3 -DarchetypeArtifactId=maven-archetype-webap

【cocos2d-x 3.x 學習與應用總結】4: 理解CC_CALLBACK_0, CC_CALLBACK_1, CC_CALLBACK_2, CC_CALLBACK_3

前言 得益於C++11的新特性,cocos 3.x版本在很多地方的程式碼看起來都優美了許多。這其中就包括一些回撥函式的寫法,CC_CALLBACK_N系列巨集的作用是對一個成員函式進行適配並返回一個回撥函式。本文介紹一下我對CC_CALLBACK_N系列巨集的

Vue.js學習總結(2)——Vue.js2.X + ElementUI開發環境搭建

一、開發前準備: Vue專案通常通過webpack工具來構建,而webpack命令的執行是依賴node.js的環境的,所以首先要安裝node.js。(官方地址:https://nodejs.org/e

設計模式學習總結(八)策略模式(Strategy)

isp 筆記本 override div ont 角色 write stat 通過   策略模式,主要是針對不同的情況采用不同的處理方式。如商場的打折季,不同種類的商品的打折幅度不一,所以針對不同的商品我們就要采用不同的計算方式即策略來進行處理。   一、示例展示:   以

設計模式學習總結(七)適配器模式(Adapter)

實現接口 國外 手機 額外 sed ges program ebe 通過   適配器模式主要是通過適配器來實現接口的統一,如要實現國內手機在國外充電,則需要在不同的國家采用不同的適配器來進行兼容!   一、示例展示:   以下例子主要通過給筆記本電腦添加類似手機打電話和發短

Linux下常用命令之sed學習總結

linux sed sed命令 正則表達式 sed總結 Sed功能說明:Sed是linux下一個強大的文本文件處理工具,通過對文件增加、刪除、查找、查詢操作,配合正則表達式以實現工作中的各種需求。同時也是一名運維人員必須掌握的核心技能。---------------------------

Paxos 學習總結

max 更強 分開 由於 zab ted 偽代碼 big commit 近期學習了分布式領域的重要算法Paxos,這裏羅列下關鍵點當作總結。自己水平有限,難免存在謬誤,懇請讀者指正。本篇不包含Paxos的基本理論介紹。Paxos基礎能夠參考以下的學習資料

cocos2d-x學習筆記(c++與lua交互回調函數的處理)

回調函數 tolua++ cocos2dx lua 本文假設讀者已經會使用tolua++進行C++與lua之間的通訊1、在頭文件中定義註冊回調函數,定義在MyClass類中void register(unsigned short cmdID, LUA_FUNCTION func);//LUA_

Java IO流學習總結

系統 指針 數組 rar amr redo 修改 接收 學習 Java IO流學習總結 Java流操作有關的類或接口: Java流類圖結構: 流的概念和作用 流是一組有順序的,有起點和終點的字節集合,是對數據傳輸的總稱或抽象。即數據在兩設備間的傳輸稱為流,流

201521044091 《Java程序設計》第11周學習總結

概念 ray 本周 art pre sign 繼續 not 生產 1. 本章學習總結 2. 書面作業 Q1.1.互斥訪問與同步訪問完成題集4-4(互斥訪問)與4-5(同步訪問) 1.1 除了使用synchronized修飾方法實現互斥同步訪問,還有什麽辦法實現互斥同步訪