RxJava(十五)RxJava執行緒的自由切換
RxJava系列文章目錄導讀:
在Android使用RxJava的時候可能需要頻繁的進行執行緒的切換,如耗時操作放在子執行緒中執行,執行完後在主執行緒渲染介面。如下面示例程式碼:
deferObservable(new Callable<String>() {
@Override
public String call() throws Exception {
//執行耗時任務
return "task result";
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
//渲染View
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throwable.printStackTrace();
}
});
這是最簡單的邏輯:子執行緒處理耗時任務,然後處理結果。
但是在實際的開發當中可能比這個更加複雜,比如有這樣的邏輯,先從本地載入資料(子執行緒),然後介面展示本地資料(主執行緒),接著載入線上資料(子執行緒),然後渲染(主執行緒)。這就需要頻繁的切換執行緒。
RxJava中通過subscribeOn
和observeOn
兩個操作符進行執行緒切換。subscribeOn()主要改變的是訂閱的執行緒,即call()執行的執行緒,observeOn()主要改變的是傳送的執行緒,即onNext()執行的執行緒。
為了實現上面自由切換的邏輯(子執行緒->主執行緒->子執行緒->->主執行緒)
deferObservable(new Callable<String>() { //defer observable
@Override
public String call() throws Exception {
Log.d("RxThreadFragment", "defer " + Thread.currentThread().getName());
return "task result";
}
})
.observeOn(AndroidSchedulers.mainThread())//指定下面的 call 在主執行緒中執行
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
Log.d("RxThreadFragment", "flatMap1 " + Thread.currentThread().getName());
return Observable.just(s);
}
})
.observeOn(Schedulers.io())//指定下面的 call 在子執行緒中執行
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
Log.d("RxThreadFragment", "flatMap2 " + Thread.currentThread().getName());
return Observable.just(s);
}
})
.subscribeOn(Schedulers.io())//指定上面沒有指定所線上程的Observable在IO執行緒執行
.observeOn(AndroidSchedulers.mainThread())//指定下面的 call 在主執行緒中執行
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
//etc
Log.d("RxThreadFragment", s + Thread.currentThread().getName());
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throwable.printStackTrace();
}
});
輸出結果:
defer RxIoScheduler-2
flatMap2 main
flatMap3 RxIoScheduler-3
task result main
從上面的程式碼可以看出,observeOn 指定該操作符下面相鄰的Observable 發射資料所在的執行緒。
subscribeOn 指定該操作符上面所有沒有指定執行緒的Observable 所在的執行緒。
例如在剛剛的例子中,subscribeOn操作符上面有3個observable(”defer”,”flatMap1”,”flatMap2”)
由於”flatMap1”,”flatMap2”已經分別被observeOn指定了schedule了,所以呢,該subscribeOn只會對”defer”有效。
下面我們再來看一個例子
final Observable<String> observable1 = RxUtils.deferObservable(new Callable<String>() {
@Override
public String call() throws Exception {
Log.e("RxThreadFragment", "observable1 thread name : " + Thread.currentThread().getName());
return "observable1 Schedulers.io()";
}
}).subscribeOn(Schedulers.io());//指定上面call方法所在的執行緒
final Observable<String> observable2 = RxUtils.deferObservable(new Callable<String>() {
@Override
public String call() throws Exception {
Log.e("RxThreadFragment", "observable2 thread name : " + Thread.currentThread().getName());
return "observable2 AndroidSchedulers.mainThread()";
}
}).subscribeOn(Schedulers.io());//指定上面call方法所在的執行緒
final Observable<String> observable3 = RxUtils.deferObservable(new Callable<String>() {
@Override
public String call() throws Exception {
Log.e("RxThreadFragment", "observable3 thread name : " + Thread.currentThread().getName());
return "observable3 Schedulers.io()";
}
}).subscribeOn(Schedulers.io());//指定上面call方法所在的執行緒
RxUtils.deferObservable(new Callable<String>() {
@Override
public String call() throws Exception {
Log.e("RxThreadFragment", "test thread name : " + Thread.currentThread().getName());
return "test thread";
}
})
.subscribeOn(Schedulers.io())//修改上面Observable call所在的執行緒
.observeOn(AndroidSchedulers.mainThread())//修改下面flatMap1 call所在的執行緒
.flatMap(new Func1<String, Observable<String>>() {//flatMap1
@Override
public Observable<String> call(String s) {
Log.e("RxThreadFragment", "flatMap1 thread name : " + Thread.currentThread().getName());
return observable1;
}
})
.observeOn(AndroidSchedulers.mainThread())//修改下面flatMap2 call所在的執行緒
.flatMap(new Func1<String, Observable<String>>() {//flatMap2
@Override
public Observable<String> call(String s) {
Log.e("RxThreadFragment", "flatMap2 thread name : " + Thread.currentThread().getName());
return observable2;
}
})
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
Log.e("RxThreadFragment", "flatMap3 thread name : " + Thread.currentThread().getName());
return observable3;
}
})
.observeOn(AndroidSchedulers.mainThread())//修改下面subscribe call所在的執行緒
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("RxThreadFragment", "subscribe Action1 thread name : " + Thread.currentThread().getName());
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throwable.printStackTrace();
}
});
輸出結果:
test thread name : RxIoScheduler-2 手動設定為後臺執行緒
flatMap1 thread name : main 手動設定為主執行緒
observable1 thread name : RxIoScheduler-3 手動設定為後臺執行緒
flatMap2 thread name : main 手動設定為主執行緒
observable2 thread name : RxIoScheduler-2 手動設定為後臺執行緒
flatMap3 thread name : RxIoScheduler-2 後臺執行緒
observable3 thread name : RxIoScheduler-3 手動設定為後臺執行緒
subscribe Action1 thread name : main 手動設定為主執行緒
從這個例子中可以看出,flatMap3沒有設定所在的執行緒,會預設使用上一個observable的執行緒模式, flatMap3 就是使用它上面的 observable2 的執行緒模式
如果上一個操作符不是flatMap,而是使用map(這樣就不是返回observable2),這個時候使用的就是map call所在的執行緒。
通過上面兩個例子我相信對RxJava執行緒切換應該差不多了,需要注意的是我們上面基本上都是一個subscribeOn和多個observeOn組合實現執行緒的自由切換的。
如果使用多個subscribeOn沒有意思,只有第一個subscribeOn有效。