1. 程式人生 > >你知道RxJava也可以實現AsyncTask嗎?

你知道RxJava也可以實現AsyncTask嗎?

使用RxJava實現非同步操作(AsyncTask)

常見的非同步操作我們可以聯想到AsyncTask或者handler,其實google創造出的目的也就是為了讓程式碼更加清晰明瞭,讓程式碼更加簡潔.
而Rx系列的出現也就為了實現程式碼的邏輯清晰,結構簡單問題.在gitHub上的介紹是 a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫)沒錯,就是這樣,我們可以通過他的操作符,通過他的執行緒切換,來實現各種Android上遇到的問題.如非同步任務,事件匯流排,資料處理,Retrofit框架等.

提出需求

AsyncTask是非同步操作最常見的處理方式,幾乎只要涉及到網路請求的,或者耗時操作的,都會使用到AsyncTask,下面看看一個小需求.

  1. 請求引數是
    IP地址: ip=192.168.0.1
  2. 功能實現
    請求時彈出進度框,請求完成隱藏進度框,並Toast出獲取到的資料(需要將資料封裝成bean).

常見的非同步操作AsyncTask

下面我們使用AsyncTask來實現提出的需求.talk is cheap,請看程式碼.

//點擊發出請求
comTitleStartTv.setOnClickListener(new View.OnClickListener() {
            @Override
public void onClick(View v) { asyncTask(); } });

具體的AsyncTask處理如下.

private void asyncTask() {
        new AsyncTask<String, Void, IPInfo>() {
            @Override
            protected IPInfo doInBackground(String... params) {
                // 在後臺請求介面,並對請求到的json進行校驗,解析成json
String resultJsonStr = OkHttpUtil.get(Constants.ipUrl + getParam(params)); IPInfo infos = null; if (resultJsonStr != null && !"".equals(resultJsonStr)) { infos = new Gson().fromJson(resultJsonStr, IPInfo.class); } return infos; } @Override protected void onPreExecute() { super.onPreExecute(); // 請求過程耗時操作,彈出Loading的進度框. if (dialog == null) { dialog = ProgressDialog.show(MainActivity.this, "Loading...", "正在載入...", true, false); } } @Override protected void onPostExecute(IPInfo info) { // 請求完成,關閉進度框,並Toask請求後的結果資料. if (dialog != null) CHelper.disimissDialog(dialog); if (info != null) { Toast.makeText(MainActivity.this, "AsyncTask:" + info.toString(), Toast.LENGTH_SHORT).show(); } } }.execute("192.168.0.1"); } protected String getParam(String... p) { return "?ip=" + p[0]; }

下面是AsyncTask的效果.
這裡寫圖片描述

RxJava封裝的非同步操作

下面使用到了RxJava的操作符有執行緒切換subscribeOn,observeOn,doOnSubscribe,doOnCompleted,doOnError.

  • 封裝BaseRxTask,主要是抽取公共方法和封裝共性的功能.
    如: getBaseDilogView 主要是封裝進度框,處理了io和UI執行緒.
/**
 * @Author: Relice
 * BaseRX 非同步任務基類
 */
public abstract class BaseRxTask<D, P> {

    /**
     * 引數陣列
     */
    protected P[] p;
    protected Context mContext;
    private boolean needDialog = false;
    private ProgressDialog dialog;

    /**
     * 介面不需要傳引數 複寫此方法
     */
    protected BaseRxTask(Context context, boolean needDialog) {
        this.mContext = context;
        this.needDialog = needDialog;
    }

    /**
     * 介面需要傳引數調 複寫此方法
     *
     * @param context 上下文
     * @param needDialog 是否顯示進度
     * @param p 引數陣列
     */
    @SafeVarargs
    protected BaseRxTask(Context context, boolean needDialog, P... p) {
        this.mContext = context;
        this.needDialog = needDialog;
        this.p = p;
    }

    protected Observable<D> doInBackgroundObserVable() {
        return doInBackground();
    }

    public Observable<D> execute() {
        return getBaseDilogView();
    }

    /**
     * 進度View
     *
     * @return
     */
    private Observable<D> getBaseDilogView() {
        Observable<D> tObservable = doInBackgroundObserVable();
 //subscribeOn(Schedulers.io())事件所產生的執行緒,也就是subscribe所發生的執行緒.
        return tObservable.subscribeOn(Schedulers.io())
 //doOnSubscribe的作用是如它後面有subscribeOn(),那麼它將執行
//離它最近的subscribeOn()所指定的執行緒,也就是下面的              //observeOn(AndroidSchedulers.mainThread())執行緒.
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        if (needDialog && mContext != null) {
                            dialog = ProgressDialog.show(mContext, "Loading...", "正在載入...", true, false);
                        }
                    }
// 注意這裡添加了doOnCompleted,doOnError兩個預處理的操作,
//所以在後面對Observable的訂閱就要用subscribe(new Observer),如果使用subscribe(new Action)會報錯.
                }).doOnCompleted(new Action0() {
                    @Override
                    public void call() {
                        CHelper.disimissDialog(dialog);
                    }
                }).doOnError(new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        CHelper.disimissDialog(dialog);
                    }
                })
//為了讓進度框可以在UI執行緒裡執行,影響了上面doOnSubscribe的執行緒
                .observeOn(AndroidSchedulers.mainThread());
    }

    protected abstract Observable<D> doInBackground();

    /**
     * 介面沒引數空實現即可
     *
     * @param p 引數陣列
     */
    protected abstract String getParam(P... p);

    /**
     * @return BaseUrl
     */
    protected abstract String getBaseUrl();
}
  • 實現BaseRxTask
    因為在基類對所以的操作都封裝好了,所以對BaseRxTask的實現就很簡單了,程式碼中都有註釋.

    public class GetRxIPInfoTask extends BaseRxTask<IPInfo, String> {
    protected GetRxIPInfoTask(Context context, boolean needDialog) {
        super(context, needDialog);
    }
    
    public GetRxIPInfoTask(Context context, boolean needDialog, String... p) {
        super(context, needDialog, p);
    }
    
    @Override
    public Observable<IPInfo> execute() {
        return super.execute();
    }
    
    @Override
    protected Observable<IPInfo> doInBackground() {
        return Observable.create(new Observable.OnSubscribe<IPInfo>() {
            @Override
            public void call(Subscriber<? super IPInfo> subscriber) {
                if (!subscriber.isUnsubscribed()) {
                    String resultJsonStr = OkHttpUtil.get(nullToEmpty(getBaseUrl()) + nullToEmpty(getParam(p)));
                    if (resultJsonStr != null && !"".equals(resultJsonStr)) {
                        IPInfo infos = new Gson().fromJson(resultJsonStr, IPInfo.class);
                        subscriber.onNext(infos);
                        subscriber.onCompleted();
                    } else {
                        subscriber.onCompleted();
                    }
                }
            }
        });
    }
    
    private String nullToEmpty(String str) {
        return (str == null) ? "" : str;
    }
    
    /**
     * ip
     *
     * @param p 引數陣列
     */
    @Override
    protected String getParam(String... p) {
        return "?ip=" + p[0];
    }
    
    /**
     * @return base url
     */
    @Override
    protected String getBaseUrl() {
        return Constants.ipUrl;
    }
    }
  • 如何去呼叫
    機智的你們可以看到在基類中的execute()方法封裝了 進度框的功能,

 public Observable<D> execute() {
        return getBaseDilogView();
    }

具體呼叫看下面程式碼

 private void rxAsyncTask() {
        new GetRxIPInfoTask(MainActivity.this, true, new String[]{"192.168.0.1"})
                .execute()
                //提前過濾掉不想要的資料
//                .filter(new Func1<IPInfo, Boolean>() {
//                    @Override
//                    public Boolean call(IPInfo ipInfo) {
//                        return ipInfo == null;
//                    }
//                })
                .subscribe(new Observer<IPInfo>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("Throwable:" + e.getMessage());
                    }

                    @Override
                    public void onNext(IPInfo infos) {
                        System.out.println("onNext:" + infos.toString());
                        Toast.makeText(MainActivity.this, "RxAsyncTask:" + infos.toString(), Toast.LENGTH_SHORT).show();
                    }
                });
    }

最後我們看看RxAsyncTask的效果.
這裡寫圖片描述