1. 程式人生 > >RxJava練武場之——Token前置請求

RxJava練武場之——Token前置請求

RxJava的用武之地

Rxjava這個庫和其他常見庫不太一樣,一般的庫例如Glide,ButterKnife都是為了解決實際問題出現的,一定程度上是剛需。Glide庫如果不用他,那麼應用自己就要處理圖片下載、壓縮、記憶體管理、多級快取等等複雜的邏輯。這類問題複雜而常見,而像Glide這類的輪子,Api的設計都比較友好,一個簡單的api呼叫就能完成一個原本很複雜的功能,簡直不要太爽。

Glide.with(context)
    .load(url)//圖片載入
    .crossFade()//動畫設定
    .placeholder(R.drawable.place_image)//佔位圖
    .error(R.drawable.error_image)//失敗佔位圖
    .override(width,height)//圖片裁剪
    .thumbnail(thumbnailRequest)//配置縮圖
    .diskCacheStrategy(DiskCacheStrategy.SOURCE)//快取策略
    .into(imageView);
複製程式碼

而Rxjava,你剛開始看起來,都不知道他是幹什麼的。“非同步處理”?不是一般都使用觀察者模式嗎?AsyncTask,Handler也可以,要rxjava幹嘛?如果你有興趣研究過一點rxjava,會發現網上的教程都會說:"zip map flatmap debounce等操作符把非同步回撥變得‘簡潔’‘優雅’",然後對比一下原來的程式碼和使用rxjava後的程式碼,最後感嘆一下rxjava設計的鬼才和功能的強大。我自己在初次接觸rxjava時也感覺,這些rxjava的優點描述比較空洞,這項技術的意義大於實用。 實際情況是這樣麼?在具體開發中,非同步呼叫給我們的最大困擾是:非同步回撥的時間並不可控。當有多個非同步回撥時,這些呼叫相互聯絡和依賴,搞清楚每個回撥何時返回是個重要的問題。在每個關鍵時間節點對‘分散的callback’做正確的事,有過類似程式設計經驗的人都知道,是非常痛苦的事,如果還想程式碼容易看懂,簡直是瘋了。

rxjava號稱非同步呼叫的終極解決方案,能否解決以上困擾?隨著學習和應用的深入,體會會更明顯。以下會用一個稍複雜的例子,實操一個複雜非同步場景,看看rxjava處理的怎麼樣。

典型複雜非同步場景 -- Token的前置校驗

經常遇到這種需求,介面的請求依賴token資訊。一個請求需要先請求token(token如果存在快取則使用快取),依賴這個token才能進行正常網路請求。這個token有一定的時效性,在時效性內可以使用快取,過期後需要重新請求token並重新發起一次請求。這個流程可以歸納如下圖:

token前置請求.png
光看這些需求,是不是覺得已經夠你喝一壺了,別忙,還有些潛在的邏輯這個圖沒有表現出來: 1 高併發網路請求時,如果token正在請求,需要對請求阻塞(token請求過程中,不再接受新的token請求) 2 阻塞的同時,要把這些請求記錄下來,token請求成功後,再‘依次’傳送這些阻塞的請求。 3 token失效情況下,網路請求限制重試次數。(防止遞迴呼叫) 4 token請求本身,重試策略需單獨配置。

不使用rxjava,我們如何實現上述需求:

1、網路請求前,對token是否有快取判斷,如果沒有先請求token,並把這個請求阻塞且快取 2、token請求過程中,如果有新的token請求進來,加入阻塞佇列 3、token請求後,通知阻塞的佇列(廣播等方式),依次進行阻塞的請求 4、對兩種次數限制,分別做邏輯判斷

以上就是傳統實現方法,就不貼程式碼了,這樣實現有以下特點: 1、要時刻維護一個阻塞佇列 (注意其新增和清空的時機) 2、token請求結束後,有一個回撥機制通知阻塞佇列,(這個回撥需要註冊和反註冊) 3、兩處的次數限制,次數維護的變數,不好維護(一般動態祕鑰為了便於使用會做成單例,單例內的變數類似static,維護較複雜) 4、請求重試的邏輯不好實現,

我們可以看到這裡涉及到很多靜態變數的維護,廣播等非同步回撥的處理,這種情況一多,程式設計者會變得很被動。而且token的非同步請求和真正的網路非同步請求雜糅在一起,增大了問題的複雜性。

我們來看下rxjava如何處理:

一些程式碼網路請求部分與前一篇部落格《基於RxJava Retrofit的網路框架》相關。

先看看完整的請求過程
public static <R> Observable send(final MapiHttpRequest request, final MapiTypeReference<R> t){
    return Observable.defer(new Callable<ObservableSource<String>>() {
                @Override
                public ObservableSource<String> call() throws Exception {
                    //傳入token快取
                    return Observable.just(Store.sToken);
                }
            }).flatMap(new Function<String, ObservableSource<R>>() {
                @Override
                public ObservableSource<R> apply(String key) throws Exception {
                    if(TextUtils.isEmpty(key) && !request.skipCheckKeyValid()){
                        //token沒有快取,需要請求Token
                        return Observable.<R>error(new KeyNotValidThrowable());
                    } else {
                        //Token存在快取,直接請求
                        return sendRequestInternal(request,t);
                    }
                }
            })
            //進入失敗重試流程
            .retryWhen(new Function<Observable<? extends Throwable>, ObservableSource<String>>() {
                private int retryCount = 0;
                @Override
                public ObservableSource<String> apply(Observable<? extends Throwable> throwableObservable) throws Exception {
                    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<String>>() {
                        @Override
                        public ObservableSource<String> apply(Throwable throwable) throws Exception {
                            if (throwable instanceof KeyNotValidThrowable){
                                //同一Request,有過一次KeyNotValidThrowable,則不再重試
                                if (retryCount > 0){
                                    return Observable.error(throwable);
                                } else {
                                //token快取不在,進入TokenLoader請求token
                                    retryCount++;
                                    return TokenLoader.getInstance().getNetTokenLocked();
                                }
                            } else if (throwable instanceof ApiException){
                                  //token過期的情況,重新獲取token,並重試
                                  ApiException apiException = (ApiException)throwable;
                                  if (apiException.getCode() == MapiResultCode.SECRETKEY_EXPIRED.value()){
                                      if (retryCount > 0){
                                          return Observable.error(throwable);
                                      } else {
                                          //token快取失效,進入TokenLoader請求token
                                          retryCount++;
                                          return DynamicKeyLoader.getInstance().getNetTokenLocked();
                                      }
                                  }
                            }
                            //其他型別錯誤,直接丟擲,不再重試
                            return Observable.error(throwable);
                        }
                    });
                }
            });
}
複製程式碼

也許你第一次看也挺暈,別怕,你順著註釋捋捋邏輯,是不是感覺程式碼的實現好像畫了一個時序圖。 除了註釋以外,幾點說明: 1、defer操作符的作用是在retry時,會重新建立新的Observable,否則會使用上次的Observable,不會重新獲取Store.sToken 2、retryWhen操作符,與sendRequestInternal內部統一配置的retryWhen並不衝突,相當於二次retry 3、retryWhen中如果丟擲error ,則不再重試; 4、重試請求,通過返回getNetTokenLocked這個subject實現。(下面詳述)

階段總結:

整體的流程被壓縮到了一個函式中,rxjava本身的retrywhen和subject機制,已經替我們完成了這麼幾點: 1、自動重試的註冊和反註冊,subject被回撥完直接失效,再次請求要重新註冊。 2、高併發request,維護佇列,通過mTokenObservable的回撥自動解決了這個問題 3、retry次數的維護,由於每次request的retry都是重新建立的內部類,所以變數的維護變的簡單。 4、重試的邏輯被retry操作符自動實現了,只要重寫retry的返回值就可以控制重試的策略。

TokenLoader:Token的獲取過程
public class TokenLoader {

    public static final String TAG = TokenLoader.class.getSimpleName();

    private AtomicBoolean mRefreshing = new AtomicBoolean(false);
    private PublishSubject<String> mPublishSubject;
    private Observable<String> mTokenObservable;

    private TokenLoader() {
        final TokenRequest request = new TokenRequest(CarOperateApplication.getInstance());
        mTokenObservable = Observable
                  .defer(new Callable<ObservableSource<TokenRequest>>() {
                      @Override
                      public ObservableSource<TokenRequest> call() throws Exception {
                          return Observable.just(request);
                      }
                  })
                  .flatMap(new Function<TokenRequest, ObservableSource<MapiHttpResponse<Boolean>>>() {
                      @Override
                      public ObservableSource<MapiHttpResponse<Boolean>> apply(RefreshKeyRequest refreshKeyRequest) throws Exception {
                          //Token請求介面
                          return ApiHelper.sendDynamicKey(refreshKeyRequest,new MapiTypeReference<MapiHttpResponse<Boolean>>(){});
                      }
                  })
                  .retryWhen(new Function<Observable<Throwable>, ObservableSource<TokenRequest>>() {
                      private int retryCount = 0;
                      @Override
                      public ObservableSource<TokenRequest> apply(Observable<Throwable> throwableObservable) throws Exception {
                          return throwableObservable.flatMap(new Function<Throwable, ObservableSource<TokenRequest>>() {
                              @Override
                              public ObservableSource<RefreshKeyRequest> apply(Throwable throwable) throws Exception {
                                  retryCount++;
                                  if (retryCount == 3){
                                      //失敗次數達到閾值,更改請求策略
                                      request.setFlag(0);
                                      return Observable.just(request);
                                  } else if (retryCount > 3){
                                      //失敗次數超過閾值,丟擲失敗,放棄請求
                                      mRefreshing.set(false);
                                      return Observable.error(throwable);
                                  } else {
                                      //再次請求token
                                      return Observable.just(request);
                                  }
                              }
                          });

                      }
                  })
    //                      .delay(6000, TimeUnit.MILLISECONDS) //模擬token請求延遲
                  .map(new Function<MapiHttpResponse<Boolean>,String>() {
                      @Override
                      public String apply(MapiHttpResponse<Boolean> response) throws Exception {
                          //成功,儲存token快取
                          if (response.getContent().booleanValue() == true){
                              setCacheToken(response.getToken());
                          } else if (response.getContent().booleanValue() == false){
                              setCacheToken(UcarK.getSign());
                          }
                          //請求完成標識
                          mRefreshing.set(false);
                          return getCacheToken();
                      }
                  });
    }

    public static TokenLoader getInstance() {
        return Holder.INSTANCE;
    }

    private static class Holder {
        private static final TokenLoader INSTANCE = new TokenLoader();
    }

    public String getCacheToken() {
        return Store.sToken;
    }

    public void setCacheToken(String key){
        Store.sToken = key;
    }

    /**
     *
     * @return
     */
    public Observable<String> getNetTokenLocked() {
        if (mRefreshing.compareAndSet(false, true)) {
            Log.d(TAG, "沒有請求,發起一次新的Token請求");
            startTokenRequest();
        } else {
            Log.d(TAG, "已經有請求,直接返回等待");
        }
        return mPublishSubject;
    }

    private void startTokenRequest() {
        mPublishSubject = PublishSubject.create();
        mTokenObservable.subscribe(mPublishSubject);
    }

}
複製程式碼

還是讀註釋,除了註釋以外,幾點說明: 1、mRefreshing的作用是在token請求過程中,不再允許新的token請求, 變數採用原子類,而非boolean;這樣在多執行緒環境下,原子類的方法是執行緒安全的。 compareAndSet(boolean expect, boolean update)這個方法兩個作用 1)比較expect和mRefresh是否一致 2)將mRefreshing置為update

2、startTokenRequest()方法開啟token請求,注意Observable在subscribe時才正式開始

3、這裡使用了PublishSubject較為關鍵,在rxjava中Subject既是observable,又是observer,在TokenLoader中,mPublishSubject是mTokenObservable的觀察者,token請求的會由mPublishSubject響應,同時mPublishSubject也作為Observable返回給TokenLoader的呼叫者作為retryWhen的返回值返回。(所以這裡PublishSubject的泛型與send()方法中Observable的泛型應該是一致的)

4、對於mRefreshing是true的情況,直接返回mPublishSubject,這樣每個阻塞的請求retryWhen都會等待mPublishSubject的返回值,回撥通知的順序與加入阻塞的順序是佇列關係(先請求的介面,先回調),滿足我們的需求。

最後: 感覺怎麼樣,是豁然開朗還是越陷越深,不管那樣都沒有關係,你需要的是瞭解還存在另一種處理非同步任務的方法。在你下一次遇到同樣讓你頭疼的問題時,你可以把這篇文章拿起來再看看,也許你的頭疼會好一點了。。。