RxJava(三) flatMap操作符用法詳解
RxJava系列文章目錄導讀:
flatMap操作符的作用
官方文件解釋:
Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.
官方流程圖:
對Observable發射的資料都應用(apply)一個函式,這個函式返回一個Observable,然後合併這些Observables,並且傳送(emit)合併的結果。 flatMap和map操作符很相像,flatMap傳送的是合併後的Observables,map操作符傳送的是應用函式後返回的結果集
flatMap操作符使用示例
繼續map操作符的案例
還是以上一篇map操作符的例子吧,如果對map操作符不是很瞭解的可以點選連結去看看。獲取主機的IP地址:
private Observable<String> processUrlIpByOneFlatMap () {
return Observable.just(
"http://www.baidu.com/",
"http://www.google.com/",
"https://www.bing.com/")
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call (String s) {
return createIpObservable(s);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
printLog(tvLogs, "Consume Data <- ", s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
printErrorLog(tvLogs, "throwable call()", throwable.getMessage());
}
});
}
//根據主機獲取ip
private Observable<String> createIpObservable(final String url) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
String ip = getIPByUrl(url);
subscriber.onNext(ip);
printLog(tvLogs, "Emit Data -> ",url+" : " +ip);
} catch (MalformedURLException e) {
e.printStackTrace();
//subscriber.onError(e);
subscriber.onNext(null);
} catch (UnknownHostException e) {
e.printStackTrace();
//subscriber.onError(e);
subscriber.onNext(null);
}
subscriber.onCompleted();
}
});
}
輸出結果:
Emit Data -> 'http://www.baidu.com/ : 115.239.211.112'
Main Thread:false, Thread Name:RxCachedThreadScheduler-1
Consume Data <- '115.239.211.112'
Main Thread:true, Thread Name:main
Emit Data -> 'http://www.google.com/ : 216.58.199.100'
Main Thread:false, Thread Name:RxCachedThreadScheduler-1
Consume Data <- '216.58.199.100'
Main Thread:true, Thread Name:main
Emit Data -> 'https://www.bing.com/ : 202.89.233.104'
Main Thread:false, Thread Name:RxCachedThreadScheduler-1
Consume Data <- '202.89.233.104'
Main Thread:true, Thread Name:main
flatMap進階使用
我們從上面的輸出結果可以看出,效果和使用map操作符
的效果是一樣。
我們同時也發現執行緒的名稱(Thread Name)都是 RxCachedThreadScheduler-1
,說明他們是通過一個執行緒來完成所有的任務的。
如果任務很多,僅僅通過一個執行緒去做,效率上是不是有點低呢?如果我想使用多個執行緒來完成這些任務該怎麼做呢?
很簡單,只需要在建立Observable的時候加上subscribeOn(Schedulers.io())
即可。完整程式碼如下:
//根據主機獲取ip
private Observable<String> createIpObservable(final String url) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
String ip = getIPByUrl(url);
subscriber.onNext(ip);
printLog(tvLogs, "Emit Data -> ",url+" : " +ip);
} catch (MalformedURLException e) {
e.printStackTrace();
//subscriber.onError(e);
subscriber.onNext(null);
} catch (UnknownHostException e) {
e.printStackTrace();
//subscriber.onError(e);
subscriber.onNext(null);
}
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io());
}
看下執行效果:
“`
Consume Data <- ‘202.89.233.103’
Main Thread:true, Thread Name:main
Emit Data -> ‘https://www.bing.com/ : 202.89.233.103’
Main Thread:false, Thread Name:RxCachedThreadScheduler-8
Emit Data -> ‘http://www.google.com/ : 216.58.203.36’
Main Thread:false, Thread Name:RxCachedThreadScheduler-7
Consume Data <- ‘216.58.203.36’
Main Thread:true, Thread Name:main
Emit Data -> ‘http://www.baidu.com/ : 115.239.211.112’
Main Thread:false, Thread Name:RxCachedThreadScheduler-6
Consume Data <- ‘115.239.211.112’
Main Thread:true, Thread Name:main
“`
從執行可以看出,執行完成任務的不是一個執行緒了,而是三個不同的執行緒 RxCachedThreadScheduler-8
、RxCachedThreadScheduler-7
、RxCachedThreadScheduler-6
。
但是發現一個問題,輸出的結果的順序亂了,不是我們輸入的baidu.com/google.com/bing.com順序了。
那怎麼辦呢?
這時候concatMap操作符
就閃亮登場了,下一篇將介紹concatMap操作符
的用法。