Android RxJava/RxAndroid結合Retrofit使用
概述
RxJava是一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫。更重要的是:使用RxJava在程式碼邏輯上會非常簡潔明瞭,尤其是在複雜的邏輯上。告別迷之縮排。
RxAndroid是RxJava針對Android平臺的拓展。
Retrofit是一個封裝了okHttp的工具庫,在上篇博文 Android 初探Retrofit2.0.1(最新版) 有過介紹,對Retrofit不太瞭解的讀者,建議先行閱讀。
熱身運動 - 觀察者模式
RxJava 的非同步實現,是通過一種擴充套件的觀察者模式來實現的。下面簡單介紹下觀察者模式,熟練掌握觀察者模式可跳過這一小節。
觀察者模式
假設現在有兩個物件A和B,在A發生某種變化時要主動通知B。這就是觀察者模式。Android裡View.setOnClickListener(new OnClickListener)
就是運用觀察者模式的典型例子。在這個例子中View充當物件A的角色,OnClickListener充當B。View通過setOnClickListener將自己和OnClickListener聯絡(訂閱)起來。當View捕獲到點選事件之後,立馬呼叫OnClickListener#onClick()
方法。還有通常我們自己定義的介面回撥都是觀察者模式的運用。
RxJava的觀察者模式
RxJava基本概念:Observable (被觀察者,相當於View)、 Observer (觀察者,相當於OnClickListener)、 subscribe ()(訂閱,相當於setOnClickListener()方法)事件。Observable 和 Observer 通過 subscribe() 方法實現訂閱關係,從而 Observable 可以在需要的時候發出事件來通知 Observer。
RxJava除了普通的回撥方法onNext()還有onCompleted() 和 onError()。
- onCompleted():事件佇列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個佇列。沒有新的onNext()之後,呼叫此方法。
- onError():事件佇列異常。在事件處理過程中出異常時,onError() 會被觸發,同時佇列自動終止,不允許再有事件發出。
- onCompleted() 和 onError()在一個佇列中只能呼叫一個,並且是最後一個。onCompleted() 和 onError()還是互斥的,只能呼叫其中一個
回顧Retrofit
上篇博文我們使用Retrofit實現對網路的訪問以及返回資料的解析,詳情請見
1. 建立WeatherInfoService,並制定請求資料的方式以及需要的查詢引數
2. 建立相應的WeatherInfoBean
3. 建立Retrofit物件並使用GSON解析資料
4. 呼叫 weatherInfoService#getWeatherInfo(FORMAT, CITYNAME, KEY),獲取call
5. 插入佇列,並展示資料
RxJava/RxAndroid結合Retrofit
新增依賴
compile 'com.squareup.retrofit2:retrofit:2.0.1' compile 'com.squareup.retrofit2:converter-gson:2.0.0-beta4' compile 'com.squareup.retrofit2:adapter-rxjava:2.0.1' compile 'io.reactivex:rxandroid:1.1.0' compile 'io.reactivex:rxjava:1.1.0'
新增限權
<uses-permission android:name="android.permission.INTERNET" />
再次見到熟悉的WeatherInfoService
public interface WeatherInfoService {
@GET("http://v.juhe.cn/weather/index?format=2&cityname=北京&key=b952ad7acbc7415f3f3c9bf274e39c45")
Observable<WeatherInfo> getWeatherInfoByRxJava();
}
注意這裡getWeatherInfoByRxJava()
的返回型別為Observable<WeatherInfo>,
這也就意味著我們在這裡直接得到一個被觀察者Observable!
還是那個MainActivity
findViewById(R.id.btn2).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
Gson gson = new GsonBuilder().create();
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(BASE_URL)
//配置轉化庫,預設是Gson
.addConverterFactory(GsonConverterFactory.create(gson))
//配置回撥庫,採用RxJava
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
WeatherInfoService weatherInfoService = retrofit.create(WeatherInfoService.class);
getWeatherInfo(weatherInfoService,gson);
getWeatherInfoByMap(weatherInfoService,gson);
getWeatherInfoByFlatMap(weatherInfoService,gson);
}
});
注意.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
,這行程式碼的作用是配置Retrofit回撥庫,採用RxJava。
這裡我們寫了getWeatherInfo(weatherInfoService,gson);
、getWeatherInfoByMap(weatherInfoService,gson);
和getWeatherInfoByFlatMap(weatherInfoService,gson);
三個方法。接下來會一一講解
MainActivity#getWeatherInfo()
在這個方法裡實現了最基本的RxJava/RxAndroid和Retrofit的結合。程式碼如下
weatherInfoService.getWeatherInfoByRxJava()// 返回型別為Observable<WeatherInfo>
.subscribeOn(Schedulers.io())// 指定訂閱者在io執行緒(第一次指定訂閱者執行緒有效)
.doOnSubscribe(new Action0() { //doOnSubscribe執行緒為最近的subscribeOn指定執行緒
@Override
public void call() {
Toast.makeText(MainActivity.this, "我在網路請求前執行", Toast.LENGTH_SHORT).show();
}
})
.subscribeOn(AndroidSchedulers.mainThread())// 指定在主執行緒
.observeOn(AndroidSchedulers.mainThread())// 指定觀察者在主執行緒
.subscribe(new Subscriber<WeatherInfo>() {
@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
tv.setText("error:" + e.getMessage());
}
@Override
public void onNext(WeatherInfo weatherInfo) {
Log.i(TAG, gson.toJson(weatherInfo));
tv.setText(gson.toJson(weatherInfo));
}
});
程式碼裡註釋很多,補充幾個需要注意的地方。
1. subscribeOn()只有第一次呼叫的時候就指定被觀察者Observable所線上程。以後可以多次呼叫,但被觀察者Observable所線上程已經指定
2. doOnSubscribe()在傳送事件前執行,可以指定執行執行緒。一般在裡面展示loading
3. observeOn()可多次呼叫並且每次都會改變觀察者Observer/Subscriber所線上程。
MainActivity#getWeatherInfoByMap()
Observable的map()是個神奇的方法,它可以對被觀察者Observable的泛型進行操作,並且返回另一個Observable傳遞給觀察者Observer/Subscriber
private void getWeatherInfoByMap(WeatherInfoService weatherInfoService, final Gson gson) {
weatherInfoService.getWeatherInfoByRxJava()// 返回型別為Observable<WeatherInfo>
.subscribeOn(Schedulers.io())// 指定訂閱者在io執行緒(第一次指定訂閱者執行緒有效)
.doOnSubscribe(new Action0() { //doOnSubscribe執行緒為最近的subscribeOn指定執行緒
@Override
public void call() {
Toast.makeText(MainActivity.this, "我在網路請求前執行", Toast.LENGTH_SHORT).show();
}
})
.subscribeOn(AndroidSchedulers.mainThread())// 指定在主執行緒
.map(new Func1<WeatherInfo, Today>() {
@Override
public Today call(WeatherInfo weatherInfo) {
return weatherInfo.getResult().getToday();
}
})
.observeOn(AndroidSchedulers.mainThread())// 指定觀察者在主執行緒
.subscribe(new Subscriber<Today>() {
...
@Override
public void onNext(Today today) {
tv.setText(gson.toJson(today));
}
});
}
在.map()方法中我們獲取WeatherInfo中Today屬性,並且返回Today。然後再觀察者Subscriber我們就可以直接對Today進行操作。是不是很方便?還有更方便的!
MainActivity#getWeatherInfoByFlatMap()
使用.map方法只能返回一個值,屬於一對一型別。RxJava給我們提供一個更神奇的方法.flatMap()。
private void getWeatherInfoByFlatMap(WeatherInfoService weatherInfoService, final Gson gson) {
weatherInfoService.getWeatherInfoByRxJava()
.subscribeOn(Schedulers.io())
.flatMap(new Func1<WeatherInfo, Observable<Future>>(){
@Override
public Observable<Future> call(WeatherInfo weatherInfo) {
return Observable.from(weatherInfo.getResult().getFuture());
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Future>() {
@Override
public void onCompleted() {
tv.setText(sb);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Future future) {
Log.i(TAG,gson.toJson(future));
sb.append(gson.toJson(future)+"\n");
}
});
}
這個方法有些難以理解,下面詳細介紹關鍵部分程式碼。首先在.flatMap()中
第一個引數為被觀察者Observable的泛型WeatherInfo,第二個引數定義為另一個被觀察者,為了敘述方便,下文稱第一個被觀察者A,第二個引數即另一個被觀察者稱為B。A的泛型為WeatherInfo。B的泛型為Future。getFuture是這麼被定義的
public class WeatherInfo {
private String resultcode;
private String reason;
private String error_code;
private Result result;
...
}
public class Result {
private SK sk;
private Today today;
private List<Future> future;
public List<Future> getFuture() {
return future;
}
...
}
原來getFuture()
返回的是個List<Future>
,可是在Func1的call()
返回值怎麼怎麼是Observable<Future>
?這是因為Observable.from()
會將List<Future>
拆分成一個個的Future返回,也就是說訂閱者的onNext
方法將會被執行List<Future>.seze()
次!所以這裡我們定義了一個sb(StringBuilder),用於將每次返回的Future拼接起來,最後在onCompleted()
中呼叫tv.setText(sb);
結束語
至此,RxJava/RxAndroid結合Retrofit講解完畢,希望能對讀者有所幫助。感謝耐心讀到最後!