1. 程式人生 > >RxJava 2.0 使用詳解

RxJava 2.0 使用詳解

前言

在上一篇部落格中,提到了RxJava的一些比較核心的東西,還有與1.x版本的一些區別!
現在我們具體瞭解一下它的使用!

使用

最基本的的使用

我們知道一個簡單的RxJava的應用,需要一個觀察者或者訂閱者Observer,一個被觀察者Observable,最後呼叫subscribe()方法將兩者繫結起來!
示例:

//建立觀察者或者訂閱者
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
    //Disposable是1.x的Subscription改名的,因為Reactive-Streams規範用這個名稱,為了避免重複
//這個回撥方法是在2.0之後新新增的 //可以使用d.dispose()方法來取消訂閱 } @Override public void onNext(String value) { Log.e("onNext", value); } @Override public void onError(Throwable e) { Log.e("onError", e.getMessage()); } @Override public void onComplete() { Log.e("onComplete"
, "complete"); } }; //建立被觀察者 Observable observable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext("Hello World!"); } }); observable.subscribe(observer);

這是一個非常簡單的例子,由於1.x中Observable不能合理的背壓,導致了無法意料的 MissingBackpressureException,所以在2.x中,添加了Flowable來支援背壓,而把Observable設計成非背壓的。


還有一點需要注意的就是,在上邊註釋中也有,onSubscribe(Disposable d)這個回撥方法是在2.x中新增的,Dispose引數是由1.x中的Subscription改名的,為了避免名稱衝突!
所以上邊的例子在2.x中,最好這麼寫:

//建立訂閱者
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onSubscribe(Subscription s) {
    //這一步是必須,我們通常可以在這裡做一些初始化操作,呼叫request()方法表示初始化工作已經完成
    //呼叫request()方法,會立即觸發onNext()方法
    //在onComplete()方法完成,才會再執行request()後邊的程式碼
    s.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(String value) {
        Log.e("onNext", value);
    }

    @Override
    public void onError(Throwable t) {
        Log.e("onError", t.getMessage());
    }

    @Override
    public void onComplete() {
    //由於Reactive-Streams的相容性,方法onCompleted被重新命名為onComplete
        Log.e("onComplete", "complete");
    }
};

Flowable.create(new FlowableOnSubscribe<String>() {
    @Override
    public void subscribe(FlowableEmitter<String> e) throws Exception {
        e.onNext("Hello,I am China!");
    }
}, BackpressureStrategy.BUFFER)
    .subscribe(subscriber);     

在2.x中,我們在onSubscribe()回撥中必須呼叫s.request()方法去請求資源,引數就是要請求的數量,一般如果不限制請求數量,可以寫成Long.MAX_VALUE,之後會立即觸發onNext()方法!所以當你在onSubscribe()/onStart()中做了一些初始化的工作,而這些工作是在request()後面時,會出現一些問題,在onNext()執行時,你的初始化工作的那部分程式碼還沒有執行。為了避免這種情況,請確保你呼叫request()時,已經把所有初始化工作做完了。

更簡潔的寫法

Flowable.just("Hello,I am China!")
    .subscribe(subscriber);
    //.subscribeWith(subscriber)//在1.x中此方法返回Subscription,而在2.x中是沒有返回值的
    //所以增加subscribeWith()方法,用來返回一個Disposable物件
    //使得使用者可以CompositeDisposable.add()方法新增物件。1.x為CompositeSubscription
    //其他subscribe()過載方法返回Disposable

RxJava提供了just()方法來建立一個發射字串的Flowable,然後呼叫subcribe()即可!
這裡還有一個需要注意的問題,就是在註釋中寫的subcribe()方法有多種過載方法,只有subscribe(subscriber)這個過載方法時沒有返回值的,但是在1.x中,此方法返回Subscription(上邊也提到過,在2.x中改名為Disposable),使用者經常新增SubscriptionCompositeSubscription(2.x中改名為CompositeDisposable),為了彌補這一點,我們增加了E subscribeWith(E subscriber)方法,返回一個Disposable物件,使得使用者可以CompositeDisposable.add()方法新增物件。

而對於 Subscriber 來說,我們目前僅僅關心onNext方法。所以又可以這樣寫:

Flowable.just("Hello,I am China!")
    //替代1.x中的action1,接收一個引數,如果是兩個引數action2使用BiCustomer,而且刪除了action3-9
    //多個引數用Custom<Object[]>
    .subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.e("consumer", s);
        }
    });

需要注意的問題:在1.x的API中,這裡是Action1,在2.x中使用Consumer來代替,如果是兩個引數,則用BiConsumer來代替Action2,而且在2.x中刪除了Action3-9,如果是多個引數則用Custom<Object[]>代替ActionN。

RxJava還有一個API能達到類似的效果,就是from(),但是因為在使用java8編譯時,javac不能夠區分功能介面型別,所以它在2.x中被拆分為:fromArray,fromIterable,fromFuture
所以上邊又可以這樣寫:

Flowable.fromArray("Hello,I am China!")
    .subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.e("consumer", s);
        }
        });

操作符

map

首先看一個map的例子

Flowable.just("Hello,I am China!")
    //將1.x中的Func1,2改為Function和BiFunction,Func3-9改為Function3-9
    //多引數FuncN改為Function<Object[],R>

    //這個第一個泛型為接收引數的資料型別,第二個泛型為轉換後要發射的資料型別
    .map(new Function<String, String>() {
        @Override
        public String apply(String s) throws Exception {
            return s+"__by Mars";
        }
    })
    .subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.e("consumer", s);
        }
    });*/

可以看出,例子中map()將一個字串物件,轉換為另一個字串物件返回,當然我們也可以將其轉換為與之不同的物件,對應的返回的Flowable物件引數也會變為轉換後的物件。另外Function的泛型第一個為接收引數的資料型別,第二個為轉換後要發射的資料型別。
需要注意的問題:在2.x中將1.x的Func1Func2改為FunctionBiFunctionFunc3-9改為Function3-9,多引數FuncN改為Function<Object[],R>

map()的邏輯操作圖:
這裡寫圖片描述

flatMap

首先看一個例子:

ArrayList<String[]> list=new ArrayList<>();
String[] words1={"Hello,","I am","China!"};
String[] words2={"Hello,","I am","Beijing!"};
list.add(words1);
list.add(words2);
Flowable.fromIterable(list)
    .flatMap(new Function<String[], Publisher<String>>() {
        @Override
        public Publisher<String> apply(String[] strings) throws Exception {
            return Flowable.fromArray(strings);
        }
    })
    .subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.e("consumer", s);
        }
    });

從上邊這個例子可以看出,flatMap和map還是有共同點的,都是將一個物件轉換為另一個物件,不同的是map只是一對一的轉換,而flatMap可以是一對多的轉換,並且是轉換為另外一個Flowable物件!

flatMap()的邏輯操作圖:
這裡寫圖片描述

lift和compose

關於這些轉換的使用和原理,可以參考扔物線的
給 Android 開發者的 RxJava 詳解
2.x中的用法基本相同

concat和merge

concat

邏輯操作圖:
這裡寫圖片描述

merge

邏輯操作圖:
這裡寫圖片描述

上述所有邏輯操作圖來自這裡

其他api

Flowable.range(5,10)//從5開始數10個數(5——14)
    .filter(new Predicate<Integer>() {//過濾為偶數
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer%2==0;
        }
    })
    .take(2)//只要前2個數據
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.e("consumer", integer+"");
        }
    });

上邊註釋已經寫的很清楚了!
range()方法,第一個引數為開始值,第二個引數為數量,所以別搞錯了,以為第二個引數為結束值;filter()方法用於對資料進行過濾;take(n)方法用於取前n個值。

在Android中的使用

RxJava在android中的使用,主要就體現在非同步這一點。對應RxJava,RxAndroid也已經到2.x版本。
我在上一篇部落格中也提到過,涉及兩個比較核心的方法subscribeOn和observeOn這兩個方法都傳入一個Scheduler物件,subscribeOn指定發射事件的執行緒,observeOn指定消費事件的執行緒。
在2.x的API中仍然支援主要的預設scheduler: computation, io, newThreadtrampoline,可以通過io.reactivex.schedulers.Schedulers這個實用的工具類來排程。

我們在android中主要就使用下邊這兩個就夠了:
Schedulers.io(): I/O 操作(讀寫檔案、讀寫資料庫、網路資訊互動等)所使用的 Scheduler。行為模式和 newThread() 差不多,區別在於 io() 的內部實現是是用一個無數量上限的執行緒池,可以重用空閒的執行緒,因此多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的執行緒。
AndroidSchedulers.mainThread(),它指定的操作將在 Android 主執行緒執行。

這裡一個最簡單的例子:

Flowable.just("Hello,I am China!")
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber)

所以在Android中建立Flowable時,即發射資料來源的時候的耗時操作,可以指定在io()執行緒中,得到資料後,更新UI可以指定在mainThread()中。

當然現在最經典的就是RxAndroid和Retrofit的結合使用了:
這裡有一個比較牛逼的寫法總結:
RxJava 與 Retrofit 結合的最佳實踐
這篇文章是基於1.x寫的,不過在2.x中用法大同小異。
另外需要注意的問題就是,retrofit現在還未支援RxJava2.x,不過不用擔心,jake大神已經給我們寫好了介面卡:

compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'

在gradle中新增依賴即可!
然後在建立Retrofit物件時,這樣寫:

Retrofit retrofit = new Retrofit.Builder()
    .baseUrl(BASE_URL)
    .addConverterFactory(GsonConverterFactory.create())
    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())//1.X為RxJavaCallAdapterFactory
    .build();

就可以在Retrofit2中盡情使用RxJava2了!

好了,先這樣吧,上邊就是RxJava涉及到的比較基礎的東西!

相關推薦

OAuth 2.0

# OAuth 2.0詳解 `概念`:OAuth(開放授權)是一個開放標準,允許使用者讓第三方應用訪問該使用者在某一網站上儲存的私密的資源(如基本訊息,照片,聯絡人列表), 而無需將 **使用者名稱** 和 **密碼** 提供給第三方應用。 ## 一、應用場景 為了理解OAuth的適用場合,這裡舉一

RxJava 2.0 使用

前言 在上一篇部落格中,提到了RxJava的一些比較核心的東西,還有與1.x版本的一些區別! 現在我們具體瞭解一下它的使用! 使用 最基本的的使用 我們知道一個簡單的RxJava的應用,需要一個觀察者或者訂閱者Observer,一個被觀察者Ob

0 httpd2.2配置-Apache配置檔案-(二)

httpd-2.2 15 curl命令 curl是基於URL語法在命令列方式下工作的檔案傳輸工具,它支援FTP, FTPS, HTTP, HTTPS, GOPHER, TELNET, DICT, FILE及LDAP等協議。curl支援HTTPS認證,並且支援HTTP的POST、PU

0 httpd2.2配置-Apache配置文件-(二)

切換 more 簡化 css 程序 ip地址 在服務器 filter utf httpd-2.2 15 curl命令 curl是基於URL語法在命令行方式下工作的文件傳輸工具,它支持FTP, FTPS, HTTP, HTTPS, GOPHER,

Oracle Database 12c Release 2安裝

x64 onclick -1 ron ocs failed tput could not 中間 第1章 Oracle Database 12c Release 2安裝詳解 1.1 下載方法 oracle官網https://www.oracle.com 1)打開官方網站

【TP3.2_initialize() 和 __construct() 的區別和聯系

instance ins 執行 構造方法 ces 實例化 direct control 初始化 1、假設 一個AdminController.class.php 集成至 \Think\Controller 類, 我們來看看Controller.class.php的構造方法源

第5章:座標和依賴/5.2 座標

座標詳解 座標內容包括 groupid:必選 概念:通用用java包的形式表示(也就是.(點)表示法),內容一般是組織或者公司下的某個專案 例如:org.sonatype.nexus,org.sonatype 為非盈利組織

rxJava 2.0入門之觀察者模式

前言 RxJava其實已經推出很久了,可以說是已經很火了,但是目前仍然還有相當一部分Android開發者沒有使用過,甚至說是想用,卻不知道怎麼用,或者不知道自己的專案哪裡可以用到,從本篇開始我們將以一些列文章逐步揭開rxJava神奇的面紗,從入門到實戰,讓你也可以輕鬆上手rxJava

rxJava 2.0介紹

Retrofit單獨使用示例 1,首先新增依賴和許可權 compile 'com.squareup.retrofit2:retrofit:2.1.0' compile 'com.squareup.retrofit2:converter-gson:2.1.0' <uses-pe

Spring boot 2 -配置

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

30分鐘學會EventBus3.0詳解(一)(引入和初始化EventBus3.0) 30分鐘學會EventBus3.0詳解(二)(EventBus3.0的詳細使用) 30分鐘學會EventBus3.0詳解(一)(引入和初始化EventBus3.0) 30分鐘學會EventBus3.0詳解(二)(Ev

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

RxJava 2.0操作符記錄

建立操作符 用於建立被觀察者物件(Observable)物件和傳送事件 create 建立一個最基本的被觀察者物件Observable just 將物件或者物件集合轉換成一個會發射這些物件的Observable defer

通用漏洞評估方法CVSS3.0

CVSS(Common Vulerability Scoring System, 通用漏洞評估方法),是由NIAC 釋出、FITST維護的開放式行業標準,CVSS 的釋出為資訊保安產業從業人員交流網路中所存在的系統漏洞的特點與影響提供了一個開放式的評價方法。 1.度量(Me

相關 /dev/null 2>&1 

1:> 代表重定向到哪裡,例如:echo “123” > /home/123.txt 2:/dev/null 代表空裝置檔案 3:2> 表示stderr標準錯誤 4:& 表示等同於的意思,2>&1,表示2的輸出重定向等同於1 5:1 表示stdout標準輸出,系統預設值

Retorfit 2 使用

本文中使用 Retrofit 2.3.0 版本 側重於如何使用,至於原理暫不考慮。 面臨秋招的準大四狗,看了看各大公司的面經,什麼 Retrofit、RxJava,還有一眾圖片載入庫 比如 Glide、Picasso 等等都快成了標配了,動輒

UIAutomator2.0(UIDevice篇----獲取控制元件)

UIDevice提供了3個獲取控制元件的方法,和一個判斷控制元件是否存在的方法。 public UiObject findObject(UiSelector selector) public UiObject2 findObject(BySelector

SHA-2 安全雜湊演算法2 演算法

SHA-2 又稱安全雜湊演算法2(Secure Hash Algorithm 2),是一種密碼雜湊函式演算法標準,其輸出長度可取224位、256位、384位、512位,分別對應SHA-224、SHA-256、SHA-384、SHA-512。它含包含另外兩個演算法:SHA-5

Retrofit2.0(一簡單使用)

幾個月前,對Retrofit進行了一個系統的學習,不過沒有做一個整理和總結,正好國慶沒什麼事就寫幾篇部落格對Retrofit的簡單使用,上傳下載進度監聽,封裝使用,原始碼解析做一個學習,記錄如下 1~

UIAutomator2.0(UIDevice篇----獲取UIDevice物件)

UIAutomator2.0在UIDevice類中,提供了兩個靜態方法,用於獲取UIDevice物件。 (1)static UIDevice getInstance() (2)static UIDevice getInstance(Instrumentati

RXJava 2.0 操作符的使用

public class RxjavaXU { @Test public void Flowable() throws Exception { //背壓使用 Flowable.create(new FlowableOnSubscribe<Integer>() {