RxJava2.0 學習(2)----實際使用場景 2018年
阿新 • • 發佈:2019-01-03
RxJava2.0 學習(2)----實際使用場景 2018年
看了 nanchen 大神得 Rxjava2.0 demo 自己做下總結
demo地址
0 執行緒切換
subscribeOn() 指定的就是發射事件的執行緒,多次呼叫 subscribeOn() 只有第一次的有效
observerOn 指定的就是訂閱者接收事件的執行緒,但多次指定訂閱者接收執行緒是可以的,也就是說每呼叫一次 observerOn(),下游的執行緒就會切換一次
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName()); e.onNext(1); e.onComplete(); } }).subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName()); } }) .observeOn(Schedulers.io()) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName()); } });
1 使用 Map 做簡單的網路請求
執行順序是 Map --> doOnNext --> subscribe (訂閱者)
ps:doOnNext 在 訂閱者收到訊息之前 做點什麼
Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception { Builder builder = new Builder() .url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512") .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); } }) .subscribeOn(Schedulers.io()) .map(new Function<Response, MobileAddress>() { @Override public MobileAddress apply(@NonNull Response response) throws Exception { Log.e(TAG, "map 執行緒:" + Thread.currentThread().getName() + "\n"); if (response.isSuccessful()) { ResponseBody body = response.body(); if (body != null) { Log.e(TAG, "map:轉換前:" + response.body()); return new Gson().fromJson(body.string(), MobileAddress.class); } } return null; } }).observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<MobileAddress>() { @Override public void accept(@NonNull MobileAddress s) throws Exception { Log.e(TAG, "doOnNext 執行緒:" + Thread.currentThread().getName() + "\n"); mRxOperatorsText.append("\ndoOnNext 執行緒:" + Thread.currentThread().getName() + "\n"); Log.e(TAG, "doOnNext: 儲存成功:" + s.getResult().toString() + "\n"); mRxOperatorsText.append("doOnNext: 儲存成功:" + s.getResult().toString() + "\n"); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<MobileAddress>() { @Override public void accept(@NonNull MobileAddress data) throws Exception { Log.e(TAG, "subscribe 執行緒:" + Thread.currentThread().getName() + "\n"); mRxOperatorsText.append("\nsubscribe 執行緒:" + Thread.currentThread().getName() + "\n"); Log.e(TAG, "成功:" + data.toString() + "\n"); mRxOperatorsText.append("成功:" + data.toString() + "\n"); } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { Log.e(TAG, "subscribe 執行緒:" + Thread.currentThread().getName() + "\n"); mRxOperatorsText.append("\nsubscribe 執行緒:" + Thread.currentThread().getName() + "\n"); Log.e(TAG, "失敗:" + throwable.getMessage() + "\n"); mRxOperatorsText.append("失敗:" + throwable.getMessage() + "\n"); } });
2 使用Rx2-Networking
Rx2AndroidNetworking 使用此框架 進行網路請求。
getObjectObservable(MobileAddress.class) 方便了 實體類結構。
Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512") .build() .getObjectObservable(MobileAddress.class) .observeOn(AndroidSchedulers.mainThread()) // 為doOnNext() 指定在主執行緒,否則報錯 .doOnNext(new Consumer<MobileAddress>() { @Override public void accept(@NonNull MobileAddress data) throws Exception { Log.e(TAG, "doOnNext:"+Thread.currentThread().getName()+"\n" ); mRxOperatorsText.append("\ndoOnNext:"+Thread.currentThread().getName()+"\n" ); Log.e(TAG,"doOnNext:"+data.toString()+"\n"); mRxOperatorsText.append("doOnNext:"+data.toString()+"\n"); } }) .map(new Function<MobileAddress, ResultBean>() { @Override public ResultBean apply(@NonNull MobileAddress mobileAddress) throws Exception { Log.e(TAG, "\n" ); mRxOperatorsText.append("\n"); Log.e(TAG, "map:"+Thread.currentThread().getName()+"\n" ); mRxOperatorsText.append("map:"+Thread.currentThread().getName()+"\n" ); return mobileAddress.getResult(); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<ResultBean>() { @Override public void accept(@NonNull ResultBean data) throws Exception { Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName()+"\n" ); mRxOperatorsText.append("\nsubscribe 成功:"+Thread.currentThread().getName()+"\n" ); Log.e(TAG, "成功:" + data.toString() + "\n"); mRxOperatorsText.append("成功:" + data.toString() + "\n"); } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName()+"\n" ); mRxOperatorsText.append("\nsubscribe 失敗:"+Thread.currentThread().getName()+"\n" ); Log.e(TAG, "失敗:"+ throwable.getMessage()+"\n" ); mRxOperatorsText.append("失敗:"+ throwable.getMessage()+"\n"); } });
使用ZIP 結合多個介面的資料再更新ui
將observable1 和 observable2同時請求 處理合並後叫給訂閱者
其中有一個請求異常就視為【失敗】
Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
.build()
.getObjectObservable(MobileAddress.class);
Observable<CategoryResult> observable2 = Network.getGankApi()
.getCategoryData("Android",1,1);
Observable.zip(observable1, observable1, new BiFunction<MobileAddress, CategoryResult, String>() {
@Override
public String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception {
return "合併後的資料為:手機歸屬地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept: 成功:" + s+"\n");
mRxOperatorsText.setText(s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
mRxOperatorsText.setText("失敗");
Log.e(TAG, "accept: 失敗:" + throwable+"\n");
}
});
使用 flatmap (連續請求)
(多個網路請求依次依賴,比如:
1、註冊使用者前先通過介面A獲取當前使用者是否已註冊,再通過介面B註冊;
2、註冊後自動登入,先通過註冊介面註冊使用者資訊,註冊成功後馬上呼叫登入介面進行自動登入。
例子所用API來自天狗網)
Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
.addQueryParameter("rows", 1 + "")
.build()
.getObjectObservable(FoodList.class) // 發起獲取食品列表的請求,並解析到FootList
.subscribeOn(Schedulers.io()) // 在io執行緒進行網路請求
.observeOn(AndroidSchedulers.mainThread()) // 在主執行緒處理獲取食品列表的請求結果
.doOnNext(new Consumer<FoodList>() {
@Override
public void accept(@NonNull FoodList foodList) throws Exception {
// 先根據獲取食品列表的響應結果做一些操作
Log.e(TAG, "accept: doOnNext :" + foodList.toString());
mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n");
}
})
.observeOn(Schedulers.io()) // 回到 io 執行緒去處理獲取食品詳情的請求
.flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() {
@Override
public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception {
if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) {
return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show")
.addBodyParameter("id", foodList.getTngou().get(0).getId() + "")
.build()
.getObjectObservable(FoodDetail.class);
}
return null;
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FoodDetail>() {
@Override
public void accept(@NonNull FoodDetail foodDetail) throws Exception {
Log.e(TAG, "accept: success :" + foodDetail.toString());
mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n");
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "accept: error :" + throwable.getMessage());
mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n");
}
});
使用 concat (進行快取資料 和 網路新資料 處理)
使用場景:讓使用者回到 載入過的頁面時有想重新整理資料,不那麼突然。
(先讀取快取資料並展示UI再獲取網路資料重新整理UI)
isFromNet 用來標記是否有本地資料
e.onNext(data); 看情況是否需要下一步
e.onComplete(); 沒快取的話 進入下一個請求
Observable<FoodList> cache = Observable.create(new ObservableOnSubscribe<FoodList>() {
@Override
public void subscribe(@NonNull ObservableEmitter<FoodList> e) throws Exception {
Log.e(TAG, "create當前執行緒:"+Thread.currentThread().getName() );
FoodList data = CacheManager.getInstance().getFoodListData();
// 在操作符 concat 中,只有呼叫 onComplete 之後才會執行下一個 Observable
if (data != null){ // 如果快取資料不為空,則直接讀取快取資料,而不讀取網路資料
isFromNet = false;
Log.e(TAG, "\nsubscribe: 讀取快取資料:" );
runOnUiThread(new Runnable() {
@Override
public void run() {
mRxOperatorsText.append("\nsubscribe: 讀取快取資料:\n");
}
});
e.onNext(data);
}else {
isFromNet = true;
runOnUiThread(new Runnable() {
@Override
public void run() {
mRxOperatorsText.append("\nsubscribe: 讀取網路資料:\n");
}
});
Log.e(TAG, "\nsubscribe: 讀取網路資料:" );
e.onComplete();
}
}
});
Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
.addQueryParameter("rows",10+"")
.build()
.getObjectObservable(FoodList.class);
// 兩個 Observable 的泛型應當保持一致
Observable.concat(cache,network)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<FoodList>() {
@Override
public void accept(@NonNull FoodList tngouBeen) throws Exception {
Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName() );
if (isFromNet){
mRxOperatorsText.append("accept : 網路獲取資料設定快取: \n");
Log.e(TAG, "accept : 網路獲取資料設定快取: \n"+tngouBeen.toString() );
CacheManager.getInstance().setFoodListData(tngouBeen);
}
mRxOperatorsText.append("accept: 讀取資料成功:" + tngouBeen.toString()+"\n");
Log.e(TAG, "accept: 讀取資料成功:" + tngouBeen.toString());
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "subscribe 失敗:"+Thread.currentThread().getName() );
Log.e(TAG, "accept: 讀取資料失敗:"+throwable.getMessage() );
mRxOperatorsText.append("accept: 讀取資料失敗:"+throwable.getMessage()+"\n");
}
});
}
使用 debounce ()
RxView 使用了的 Rxbanding (不是重點)
.debounce(2,TimeUnit.SECONDS) 點選事件,在2秒內重複點選,及時重新算。2秒後觸發。
RxView.clicks(mRxOperatorsBtn)
.debounce(2,TimeUnit.SECONDS) // 過濾掉髮射頻率小於2秒的發射事件
.subscribe(new Consumer<Object>() {
@Override
public void accept(@NonNull Object o) throws Exception {
clickBtn();
}
});
}
private void clickBtn() {
Rx2AndroidNetworking.get("http://120.77.35.147:8080/goods/homepage")
// .addQueryParameter("rows",2+"") // 只獲取兩條資料
.build()
.getObjectObservable(ShopMallFristBean.class)
.subscribeOn(Schedulers.io()) // 在 io 執行緒進行網路請求
.observeOn(AndroidSchedulers.mainThread()) // 在主執行緒進行更新UI等操作
.subscribe(new Consumer<ShopMallFristBean>() {
@Override
public void accept(@NonNull ShopMallFristBean foodList) throws Exception {
Log.e(TAG, "accept: 獲取資料成功:"+foodList.toString()+"\n" );
mRxOperatorsText.append("accept: 獲取資料成功:"+foodList.toString()+"\n" );
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.e(TAG, "accept: 獲取資料失敗:"+throwable.getMessage() +"\n");
mRxOperatorsText.append("accept: 獲取資料失敗:"+throwable.getMessage() +"\n");
}
});
使用 interval 實現心跳機制
Disposable 一次性資源;
interval(1, TimeUnit.SECONDS) 每個一秒傳送一次事件
doOnNext 提前處理事件
mDisposable =
Flowable.interval(1, TimeUnit.SECONDS)
.doOnNext(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: doOnNext : "+aLong );
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept: 設定文字 :"+aLong );
mRxOperatorsText.append("accept: 設定文字 :"+aLong +"\n");
}
});