1. 程式人生 > >深入淺出RxJava之操作符篇

深入淺出RxJava之操作符篇

宣告:此部落格是基於大頭鬼Bruce 的http://blog.csdn.net/lzyzsd/article/details/44094895的譯文,RxJava初學者一枚,翻了一些相關文章大多不太容易理解,讓人看不進去,就理解難易來說,原博文的確是比較不錯的,但跟大多數評論中網友一樣,對譯文中lamdba程式碼心生不滿(此處應該有賤笑)(但依舊非常感謝大頭鬼的翻譯,拯救了英語渣渣如我),故依據自己的理解,僅供大家參考,如有紕漏,歡迎斧正~

這篇blog將介紹許多RxJava中的操作符,RxJava的強大性就來自於它所定義的操作符。

假設我有這樣一個方法:
這個方法根據輸入的字串返回一個網站的url列表(猶如:搜尋引擎)
我們先模擬一個查詢函式:

public Observable<List<String>> query(String str) {
    List<String> list = new ArrayList<>();
    if (str.contains("hello")){
      list  = Arrays.asList("www.baidu.com", "www.sina.com", "www.360.cn");
    }
    return Observable.just(list);
}

現在我希望構建一個健壯系統,它可以查詢字串並且顯示結果。根據上一篇blog的內容,我們可能會寫出下面的程式碼:

query("hello guy").subscribe(new Action1<List<String>>() {
    @Override
    public void call(List<String> urls) {
        for (String url : urls) {
            Log.d("mms", " url :: " + url);
        }
    }
});

輸出:

09-23 16:11:33.755 24240-24240/rj.mms.www.rjdemo D/mms:  url :: www.baidu
.com 09-23 16:11:33.755 24240-24240/rj.mms.www.rjdemo D/mms: url :: www.sina.com 09-23 16:11:33.755 24240-24240/rj.mms.www.rjdemo D/mms: url :: www.360.cn

這種程式碼當然是不能容忍的,因為上面的程式碼使我們喪失了變化資料流的能力。一旦我們想要更改每一個URL,只能在Subscriber中來做。我們竟然沒有使用如此酷的map()操作符!!!
當然,我可以使用map操作符,map的輸入是urls列表,處理的時候還是要for each遍歷,一樣很蛋疼。
萬幸,還有Observable.from()方法,from()接收一個集合作為輸入,然後每次輸出一個元素給subscriber:

//from 接收一個數組資料,每次輸出一個元素給subscriber

List list = Arrays.asList("1","2","3");
Observable.from(list).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.d("mms","==="+s);
    }
});

輸出:

09-22 14:19:23.537 30131-30131/rj.mms.www.rjdemo D/mms: ===1
09-22 14:19:23.537 30131-30131/rj.mms.www.rjdemo D/mms: ===2
09-22 14:19:23.537 30131-30131/rj.mms.www.rjdemo D/mms: ===3

我們來把這個方法使用到剛才的場景:

query("hello guy").subscribe(new Action1<List<String>>() {
    @Override
    public void call(List<String> urls) {
        Observable.from(urls).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d("mms"," url :: "+s);
            }
        });
    }
});

雖然去掉了for each迴圈,但是程式碼依然看起來很亂。多個巢狀的subscription不僅看起來很醜,難以修改,更嚴重的是它會破壞某些我們現在還沒有講到的RxJava的特性。
救星來了,他就是flatMap()。
Observable.flatMap()接收一個Observable的輸出作為輸入,同時輸出另外一個Observable。直接看程式碼:

query("hello guy").flatMap(new Func1<List<String>, Observable<String>>() {
    @Override
    public Observable<String> call(List<String> urls) {
        return Observable.from(urls);
    }
}).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.d("mms"," url :: "+s);
    }
});

flatMap()是不是看起來很奇怪?為什麼它要返回另外一個Observable呢?理解flatMap的關鍵點在於,flatMap輸出的新的Observable正是我們在Subscriber想要接收的。現在Subscriber不再收到List,而是收到一些列item,就像Observable.from()的輸出一樣。
這部分也是我當初學RxJava的時候最難理解的部分,一旦我突然領悟了,RxJava的很多疑問也就一併解決了。
還可以更好

flatMap()實在不能更讚了,它可以返回任何它想返回的Observable物件。
比如下面的方法:
// 返回網站的標題,如果404了就返回null
Observable getTitle(String URL);
模擬:

public Observable<String> getTitle(String url) {
    if (url.contains("www")) {
        return Observable.just(“This is a website");
    }
}

接著前面的例子,現在我不想列印URL了,而是要列印收到的每個網站的標題。問題來了,我的方法每次只能傳入一個URL,並且返回值不是一個String,而是一個輸出String的Observabl物件。使用flatMap()可以簡單的解決這個問題。

query("hello guy").flatMap(new Func1<List<String>, Observable<String>>() {
    @Override
    public Observable<String> call(List<String> urls) {
        return Observable.from(urls);
    }
}).flatMap(new Func1<String, Observable<String>>() {

    @Override
    public Observable<String> call(String title) {
        return getTitle(title);
    }
}).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.d("mms", " title :: " + s);
    }
});

輸出:

09-23 17:51:00.342 4531-4531/rj.mms.www.rjdemo D/mms:  title :: This is a website
09-23 17:51:00.342 4531-4531/rj.mms.www.rjdemo D/mms:  title :: This is a website
09-23 17:51:00.342 4531-4531/rj.mms.www.rjdemo D/mms:  title :: This is a website

是不是感覺很不可思議?我竟然能將多個獨立的返回Observable物件的方法組合在一起!帥呆了!
不止這些,我還將兩個API的呼叫組合到一個鏈式呼叫中了。我們可以將任意多個API呼叫連結起來。大家應該都應該知道同步所有的API呼叫,然後將所有API呼叫的回撥結果組合成需要展示的資料是一件多麼蛋疼的事情。這裡我們成功的避免了callback hell(多層巢狀的回撥,導致程式碼難以閱讀維護)。現在所有的邏輯都包裝成了這種簡單的響應式呼叫。
豐富的操作符

目前為止,我們已經接觸了兩個操作符,RxJava中還有更多的操作符,那麼我們如何使用其他的操作符來改進我們的程式碼呢?
讓我們模擬一個場景,先來修改下我們的程式碼(粗體部分):

public Observable<String> getTitle(String url) {
    if (url.contains("www")) {
        return Observable.just("This is a website");
    } else {
        **return Observable.just("404");**
    }

}
public Observable<List<String>> query(String str) {
    List<String> list = new ArrayList<>();
    if (str.contains("hello")) {
        list = Arrays.asList("www.baidu.com", "**sina.com**", "www.360.cn");
    }
    return Observable.just(list);
}

上面的程式碼,其實就是假設url不滿足條件或者不存在,getTitle()返回null,這裡我們定為是404.
輸出:

09-23 17:51:38.702 4531-4531/rj.mms.www.rjdemo D/mms:  title :: This is a website
09-23 17:51:38.702 4531-4531/rj.mms.www.rjdemo D/mms:  title :: 404
09-23 17:51:38.703 4531-4531/rj.mms.www.rjdemo D/mms:  title :: This is a website

我們不想輸出”404”,那麼我們可以從返回的title列表中過濾掉null值!

query("hello guy").flatMap(new Func1<List<String>, Observable<String>>() {
    @Override
    public Observable<String> call(List<String> urls) {
        return Observable.from(urls);
    }
}).flatMap(new Func1<String, Observable<String>>() {

    @Override
    public Observable<String> call(String title) {
        return getTitle(title);
    }
}).filter(new Func1<String, Boolean>() {
    @Override
    public Boolean call(String title) {
        return title!="404";
    }
})
        .subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.d("mms", " title :: " + s);
    }
});

filter()是輸出和輸入相同的元素,並且會過濾掉那些不滿足檢查條件的。
看下原始碼:

/*** 
Filters items emitted by an Observable by only emitting those that satisfy a specified predicate.
**(對源Observable按照特定條件進行過濾,僅滿足條件的才會被提交給訂閱者)**

    * @param predicate
     *            a function that evaluates each item emitted by the source Observable, returning {@code true}
     *            if it passes the filter
     * @return an Observable that emits only those items emitted by the source Observable that the filter
     *         evaluates as {@code true}
     * @see <a href="http://reactivex.io/documentation/operators/filter.html">ReactiveX operators documentation: Filter</a>
     */

    public final Observable<T> filter(Func1<? super T, Boolean> predicate) {
        return lift(new OperatorFilter<T>(predicate));
    }

繼續,假設我只想要最多1個結果,就可以這麼處理:

query("hello guy").flatMap(new Func1<List<String>, Observable<String>>() {
    @Override
    public Observable<String> call(List<String> urls) {
        return Observable.from(urls);
    }
}).flatMap(new Func1<String, Observable<String>>() {

    @Override
    public Observable<String> call(String title) {
        return getTitle(title);
    }
}).filter(new Func1<String, Boolean>() {
    @Override
    public Boolean call(String title) {
        return title!="404";
    }
}).<span style="color:#ff0000;">take(1)</span>
        .subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.d("mms", " title :: " + s);
    }
});

take()原始碼:

/**
* Returns an Observable that emits only the first {@code count} items emitted by the source Observable. If the source emits fewer than
* {@code count} items then all of its items are emitted.
**(輸出最多指定數量的結果,次數大於源Observable發出的,全部輸出)**
* @param count
*            the maximum number of items to emit
* @return an Observable that emits only the first {@code count} items emitted by the source Observable, or
*        all of the items from the source Observable if that Observable emits fewer than {@code count} items
* @see <a href="http://reactivex.io/documentation/operators/take.html">ReactiveX operators documentation: Take</a>
*/
public final Observable<T> take(final int count) {
    return lift(new OperatorTake<T>(count));
}

繼續假設,如果我們想在列印之前,把每個標題儲存到磁碟:

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .filter(title -> title != null)
    .take(5)
    .doOnNext(new Action1<String>() {
         @Override
         public void call(String s) {
             //saveTitle();
         }
     }).subscribe(title -> System.out.println(title));

PS:
**doOnNext()允許我們在每次輸出一個元素之前做一些額外的事情,比如這裡的儲存標題。
首先看到帶do操作符,可以理解為是給Observable的生命週期的各個階段添一系列的回撥監聽,當Observable執行到這個階段的時候,回撥就會觸發。比如doOnEach,就是Observable每發射一個數據的時候都會觸發這個回撥,不僅包括onNext還包括OnError和OnCompleted,而這裡的doOnNext()和doOnEach類似,區別就在於它只在onNext的時候被觸發。它們是一些side effert ,不會改變資料流。**

看到這裡操作資料流是多麼簡單了麼。你可以新增任意多的操作,並且不會搞亂你的程式碼。

RxJava包含了大量的操作符。操作符的數量是有點嚇人(http://reactivex.io/documentation/operators.html),但是很值得你去挨個看一下,這樣你可以知道有哪些操作符可以使用。弄懂這些操作符可能會花一些時間,但是一旦弄懂了,你就完全掌握了RxJava的威力。

你甚至可以編寫自定義的操作符!這篇blog不打算將自定義操作符,如果你想的話,清自行Google吧。
感覺如何?

好吧,你是一個懷疑主義者,並且還很難被說服,那為什麼你要關心這些操作符呢?

因為操作符可以讓你對資料流做任何操作。

將一系列的操作符連結起來就可以完成複雜的邏輯。程式碼被分解成一系列可以組合的片段。這就是響應式函式程式設計的魅力。用的越多,就會越多的改變你的程式設計思維。
另外,RxJava也使我們處理資料的方式變得更簡單。在最後一個例子裡,我們呼叫了兩個API,對API返回的資料進行了處理,然後儲存到磁碟。但是我們的Subscriber並不知道這些,它只是認為自己在接收一個Observable物件。良好的封裝性也帶來了編碼的便利!
在第三部分中,我將介紹RxJava的另外一些很酷的特性,比如錯誤處理和併發,這些特性並不會直接用來處理資料。