從零開始的RxJava2.0教程(一)基礎
1. 為什麼寫這篇文章
RxJava
這些年越來越流行,而上月末(2016.10.29)釋出了2.0正式版,但網上大部分關於RxJava
的教程都是1.x
的。關於2.0
的教程基本是介紹1.x
和2.x
的區別,對於RxJava
的老使用者來說,自然看看和1.x
的區別就大致會用了,但是對於新手來說,就不得不先學1.x
。這樣來說,學習成本就提高了,本身RxJava
就不容易上手。
為了讓年輕的司機可以直接從2.0開始學習,我就寫了這篇文章。RxJava的老使用者可以直接看我這篇文章 RxJava 2.0有什麼不同(譯)。
由於本人文筆拙略,於是仿照著 Grokking RxJava 來寫,望 Dan Lew 大大不要介意。
2. 基礎
RxJava 2.0 最核心的是Publisher
和Subscriber
。Publisher
可以發出一系列的事件,而Subscriber
負責和處理這些事件。
平常用得最多的Publisher
是Flowable
,它支援背壓,教程剛開始不適合介紹太多概念,有興趣的可以看一下 RxJava 2.0中backpressure(背壓)概念的理解。
要使用RxJava 2,你需要先引入相應的jar包。
compile 'io.reactivex.rxjava2:rxjava:2.0.0'
compile 'org.reactivestreams:reactive-streams:1.0.0'
注意,和1.x中不一樣,2.0有一個依賴包。
3. Hello RxJava 2
建立一個Flowable
物件很簡單,直接呼叫Flowable.create
即可。
// create a flowable
Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
e.onNext("hello RxJava 2" );
e.onComplete();
}
}, BackpressureStrategy.BUFFER);
上述程式碼僅僅是發射了一個字串"hello RxJava 2"
。
下面我們還需要建立一個Subscriber
。
// create
Subscriber subscriber = new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
System.out.println("onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
需要注意的是,在onSubscribe
中,我們需要呼叫request
去請求資源,引數就是要請求的數量,一般如果不限制請求數量,可以寫成Long.MAX_VALUE
。如果你不呼叫request
,Subscriber
的onNext
和onComplete
方法將不會被呼叫。
onNext
方法裡面傳入的引數就是Flowable
中發射出來的。
為了讓”發射器”和”接收器”工作起來,我們還需要把他們組裝在一起。
flowable.subscribe(subscriber);
一旦 flowable.subscribe
被執行,就會分別列印 hello RxJava 2
和 onComplete
。
4. 更簡潔的程式碼
上面一大串程式碼僅僅就達到了列印兩個字串的效果,你可能會想:”RxJava只不過是把事情變複雜了”。
或許是這樣的,但RxJava也提供了很多便利的方法來做這種簡單的事情。
Flowable<String> flowable = Flowable.just("hello RxJava 2");
我們可以直接呼叫Flowable.just
建立一個發射字串的”發射器”。
而對於 Subscriber
來說,我們目前僅僅關心onNext
方法。所以可以簡寫成下面這樣。
Consumer consumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
};
當然這只是一個 Consumer
,但 subscribe
方法提供了過載,讓我們可以只傳入一個Consumer
。
所以訂閱程式碼是這樣的。
flowable.subscribe(consumer);
如果省去單獨定義變數,最終可以寫成下面這樣。
Flowable.just("hello RxJava 2")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
5. 變換
讓我們做一些更有意思的事情把!
比如我想在hello RxJava 2
後面加上我的簽名,你可能會去修改Flowable
傳入的引數:
Flowable.just("hello RxJava 2 -ittianyu")
.subscribe(s -> System.out.println(s));
這當然是可以的,但是這樣做,就導致所有的接收者都會受到影響。我只想針對某個訂閱者做修改,那麼你可能會寫出這樣的程式碼:
Flowable.just("hello RxJava 2")
.subscribe(s -> System.out.println(s + " -ittianyu"));
這樣的方式仍然不讓人滿意,因為我希望訂閱者做的事越少越好,因為一般來說,訂閱者都是在主執行緒中執行的。這個時候我們就可以利用操作符在資料傳遞的途中進行變換。
6. 操作符
操作符是為了解決 Flowable
物件變換問題而設計的,操作符可以在傳遞的途中對資料進行修改。
RxJava提供了很多實用的操作符。比如 map
操作符,可以把一個事件轉換成另一個事件。
Flowable.just("map")
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s + " -ittianyu";
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
上面程式碼中, map
是把傳遞過來的結果末尾加上了簽名,然後在傳遞給了訂閱者。
是不是覺得神奇?
map
的作用就變換 Flowable
然後返回一個指定型別的 Flowable
物件。
7. map
操作符進階
map
操作符更神奇的地方是,你可以返回任意型別的 Flowable
,也就是說你可以使用 map
操作符發射一個新的資料型別的 Flowable
物件。
比如上面的例子,訂閱者想要得到字串的hashcode
。
Flowable.just("map1")
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return s.hashCode();
}
})
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
這裡用了兩個map,一個是把字串轉成hashcode
,另一個是把hashcode
轉成字串。
8. 總結
- 你可以在
Publisher
中查詢資料庫或者從網路上獲取資料,然後在Subscriber
中顯示。 Publisher
不只有一種,事實上Flowable
和Processor
所有的子類都屬於Publisher
。- 在資料發射途中,你可以利用操作符對資料進行變換。