RxJava學習筆記---簡單使用
如果覺得一篇文章寫得好,不要放到收藏夾裡面,馬上把它看完,如果兩天內還沒開始看,那就可以刪掉了
如果覺得一樣技術很好,那就馬上去學,不要拖延,不要找藉口。如果你一週內還沒開始行動,還不如坦蕩點放棄
恰如克林克茲所說:
與其感嘆路難行,不如馬上出發
去年就在看RxJava,每次看一點點,遇到障礙,感嘆要理解的東西太麻煩就放棄了。今年決心要搞定它,學習的過程註定孤獨單調,恰好現在開始寫Blog,就順便記錄下學習筆記,以此自勉。在後面的2-4周計劃:
- 明白RxJava的基本用法,API中常用類、函式、變換
- 學習使用RxJava寫的開源App,提升熟練度
- 研究RxJava內部實現,研究他為什麼要如此設計API,如此好用
- 用RxJava寫一個App,開源
RxJava是什麼
Rx是什麼
Rx全稱Reactive Extensions,譯為響應式拓展。
微軟最先提出這個概念,借用MSDN上的定義:Reactive Extensions(Rx)是一個類庫,它集成了非同步、基於可觀察(observable)序列的事件驅動程式設計和LINQ-style的查詢操作,使用Rx,開發人員
- 可以用observable物件描述非同步資料流
- 使用LINQ操作符非同步查詢資料
- 使用Schedulers控制非同步過程中的併發
簡而言之
Rx = Observables + LINQ + Schedulers
Rx已經滲透到各個語言中,於是有了:
RxJava、RxJS、Rx.NET、UniRx、RxScala、RxCpp、 RxSwift、RxPHP、Ruby: Rx.rb、Python: RxPY等等
基於frameworks上還有
RxNetty、RxAndroid、RxCocoa
RxJava的定義
RxJava是Rx在Java語言上的擴充套件。
RxJava
– Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
譯為:一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫
RxJava怎麼用
在build.gradle
—Module檔案中匯入
compile 'io.reactivex:rxandroid:1.2.1'
compile 'io.reactivex:rxjava:1.1.6'
RxJava觀察者模式
RxJava觀察者有4個基本概念組成:
- Observer 觀察者
- Observable 被觀察者
- subscribe() 訂閱
- 事件
建立Observer
Observer譯為觀察者,直接建立實現
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: ");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.toString());
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
};
對比常用的給控制元件註冊一個監聽事件
class MyListener extends View.OnClickListener {
@Override
public void onClick(View v) {
Log.d(TAG, "onClick: ");
}
}
這裡的RxJava的onNext()
方法就對應MyListener的onClick()
方法,不過比起後者Observer還多了onCompleted()
和onError()
onNext()
事件響應onCompleted()
事件佇列結束。不再有新的onNext()
發出時,需要觸發其作為結束。onError()
事件佇列異常。事件佇列處理中若出現異常,onError
觸發,佇列終止。
一個事件佇列中,Error
和Completed
有且只有一個,並且是事件佇列的最後一個。
建立Subscriber
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: ");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.toString());
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
};
基本使用的話,Subscriber
和Observer
使用完全一致,實際上在RxJava的subscribe
過程中,Observer
也常常轉換為Subscriber
再使用。但是Subscriber
是Oberver
的功能升級
檢視其實現程式碼
public abstract class Subscriber<T> implements Observer<T>, Subscription {
....
}
- 我們知道,Subscriber是一個抽象類,所以我們剛才實現的時候需要實現
onCompleted
、onError
、onNext
三個方法 - 實現
Observer<T>
介面 - 實現
Subscription
介面
Observer<T>
介面的實現表明Observer
能實現的東西他都可以實現
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
那麼Subscription
介面實現呢
public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}
顧名思義,unsubscribe()
表示反註冊;isUnsubscribed()
表示是否已經註冊,反註冊和是否已經註冊出現了,哪註冊呢,在這裡
private final SubscriptionList subscriptions;
public final void add(Subscription s) {
subscriptions.add(s);
}
@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}
@Override
public final boolean isUnsubscribed() {
return subscriptions.isUnsubscribed();
}
add()
就是註冊。通過SubscriptionList subscriptions
方法轉發實現,我們看看SubscriptionList
是什麼
public final class SubscriptionList implements Subscription {
private LinkedList<Subscription> subscriptions;
private volatile boolean unsubscribed;
public SubscriptionList(Subscription s) {
this.subscriptions = new LinkedList<Subscription>();
this.subscriptions.add(s);
}
@Override
public boolean isUnsubscribed() {
return unsubscribed;
}
public void add(final Subscription s) {
if (s.isUnsubscribed()) {
return;
}
if (!unsubscribed) {
synchronized (this) {
if (!unsubscribed) {
LinkedList<Subscription> subs = subscriptions;
if (subs == null) {
subs = new LinkedList<Subscription>();
subscriptions = subs;
}
subs.add(s);
return;
}
}
}
s.unsubscribe();
}
@Override
public void unsubscribe() {
if (!unsubscribed) {
List<Subscription> list;
synchronized (this) {
if (unsubscribed) {
return;
}
unsubscribed = true;
list = subscriptions;
subscriptions = null;
}
unsubscribeFromAll(list);
}
}
private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
if (subscriptions == null) {
return;
}
List<Throwable> es = null;
for (Subscription s : subscriptions) {
try {
s.unsubscribe();
} catch (Throwable e) {
if (es == null) {
es = new ArrayList<Throwable>();
}
es.add(e);
}
}
Exceptions.throwIfAny(es);
}
看到開始定義的
private LinkedList<Subscription> subscriptions;
private volatile boolean unsubscribed;
大概就能猜出來,就是通過連結串列來操作的,具體不再深究。回到主題比起Observer
來說Subsriber
大概有以下幾個優點
- 可以使用
unsubscribe()
取消訂閱。有什麼好處呢?其實和我們使用廣播的時候,註冊和反註冊是一樣的道理,防止記憶體洩漏。 - 可以通過
isUnsubscribed()
獲取當前事件是否註冊,封裝好的方法可以更容易去判斷當前的事件註冊與否的狀態。 onStart()
可以在Subscriber
剛開始的時候做一些準備工作。我們看到他是一個空實現
public void onStart() {
// do nothing by default
}
就如同在使用Volley或者okhttp的時候也有這種同名方法,做一些初始化操作在請求網路最開始的部分。
建立Observable
Observable.create
Observable observable = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Object o) {
subscriber.onNext("1");
subscriber.onNext("2");
subscriber.onNext("3");
subscriber.onCompleted();
}
});
儲存一個OnSubscribe
物件作為計劃表,一旦Observable
被訂閱,call()
方法就會被呼叫,然後按照對onNext()
和onCompleted()
設定的順序依次執行。
Observable.just
上面的例子可以等價於
Observable observableJust = Observable.just("1", "2", "3");
just—將引數依次發出去
Observable.from
也等價於這種寫法
String[] tests = {"1", "2", "3"};
Observable observableFrom = Observable.from(tests);
from—顧名思義從什麼地方來
執行
使用
observable.subscribe(subscriber);
observable.subscribe(observer);
事件鏈就可以跑起來了。
這裡subscribe()
方法通過下面的方法轉發
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
//...省略部分程式碼
//執行onStart初始方法
subscriber.onStart();
//執行Subscriber預設的call()方法
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
//返回傳入的subscriber
return hook.onSubscribeReturn(subscriber);
}
列印陣列
把陣列內的值依次打印出來
private static final Integer[] INT_ARRAYS = {1, 2, 3, 4, 5};
Observable.from(INT_ARRAYS).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "call: " + integer);
}
});
顯示圖片
把mipmap檔案裡的圖片顯示到介面
private ImageView ivShow;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
ivShow = (ImageView) findViewById(R.id.ivShow);
Observable.create(new Observable.OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(R.mipmap.bg_splash);
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribe(new Observer<Drawable>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(MainActivity.this, e.toString(), Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(Drawable drawable) {
ivShow.setImageDrawable(drawable);
}
});
}
但是上面的實現沒有任何意義,因為在同一個執行緒裡面使用還不如直接呼叫。RxJava最大的特性是前臺回撥,後臺處理,是伴隨著多執行緒應運而生的。
Scheduler
//直接執行在當前執行緒
Schedulers.immediate();
//開啟一個新執行緒執行任務
//內部有一個沒有上限的執行緒池,可以重用空閒執行緒
//相當於 ExecutorService executorService = Executors.newCachedThreadPool();
Schedulers.io();
//有一個固定執行緒池,大小為CPU核心數+1
//等價於 Executors.newFixedThreadPool(nCore+1)
Schedulers.computation();
剛才的例子:
被建立的陣列值在IO執行緒發出,在主執行緒處理列印
Observable.from(INT_ARRAYS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "call: " + integer);
}
});
在IO執行緒載入圖片,在主執行緒顯示圖片
Observable
.create(new Observable.OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(R.mipmap.bg_splash);
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Drawable>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(MainActivity.this, e.toString(), Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(Drawable drawable) {
ivShow.setImageDrawable(drawable);
}
});