1. 程式人生 > >RxJava 與RxAndroid 的執行緒控制

RxJava 與RxAndroid 的執行緒控制

通過之前的學習,並沒發現他們說的能取代AsyncTask的優勢,接下來將介紹他如何實現執行緒控制

在RxJava的預設規則中,事件的發出與消費都是在同意執行緒中,也就是是說預設觀察者和被觀察者事件的處理與傳遞都是在一個執行緒中,這似乎不和觀察者本身的意願就是非同步機制,這將會牽扯出寧一個類Scheduler

在不指定執行緒的情況下,RxJava遵循的是執行緒不變原則,即:在哪個執行緒中呼叫subscirbe(),就在那個執行緒生產事件,也就在那個執行緒消費事件,如果需要切換執行緒,就需要Scheduler 排程器

相當於執行緒控制器,RxJava通過它來指定每一段程式碼執行什麼樣的執行緒,RxJava已經內建了幾個Schedulers,他們適合大多數場景,都可以從Schedulers類的靜態方法獲取

Schedulers.immediate()
//Creates and returns a {@link Scheduler} that executes work immediately on the current thread.
意思是都在當前執行緒執行
     rx.Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("sss");
                LogUtils.d("------->call執行緒:" + Thread.currentThread().getName());
            }
        }).subscribeOn(Schedulers.immediate())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String s) {
                        LogUtils.d("------->onNext執行緒:" + Thread.currentThread().getName());
                    }
                });
結果:

12-26 03:16:58.295 1857-1857/com.rxandroid.test1 D/----->: ------->onNext執行緒:main
12-26 03:16:58.295 1857-1857/com.rxandroid.test1 D/----->: ------->call執行緒:main

關於RxJava的執行緒分類;請參考下面這個例子:

@Override
    public void onClick(View v) {
        switch (v.getId()) {
            case R.id.button1:
                SchedulersThread(Schedulers.immediate()); //  Schedulers.immediate();當前執行緒
                break;
            case R.id.button2:
                SchedulersThread(Schedulers.newThread());//Schedulers.newThread() 新執行緒
                break;
            case R.id.button3:
                SchedulersThread(Schedulers.io());//Schedulers.io() I/0 操作(讀寫檔案,讀寫資料庫。網路資訊互動等)所使用的執行緒,行為模式與newThread(0差不多
                /**
                 * 區別在於io()的內部實現是用一個無數量上限的執行緒池,可以重用空閒的執行緒,因此多數io()比newThread()更有效率,不要把計算工作放在io()中,可以避免建立不必要的執行緒
                 *
                 */
                break;
            case R.id.button4:
                SchedulersThread(Schedulers.computation());
                /**
                 * 計算所使用的執行緒,這個計算的是cpu密集計算,即不會被I/0等操作限制性能的操作
                 * 例如圖形計算,這個scheduler使用固定的執行緒池,大小為cpu核數,不要把I/0放在computationx中,
                 * 否則Io操作的等待時間會浪費CPU
                 */
                break;
            case R.id.button5:
                SchedulersThread(AndroidSchedulers.mainThread());
                /**
                 * 操作在android的UI執行緒中
                 */
                break;
        }
    }

    private void SchedulersThread(Scheduler scheduler) {
        rx.Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("sss");
                LogUtils.d("------->call執行緒:" + Thread.currentThread().getName());
            }
        }).subscribeOn(scheduler)
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String s) {
                        LogUtils.d("------->onNext執行緒:" + Thread.currentThread().getName());
                    }
                });
    }
列印結果依次:

12-26 03:57:34.811 11395-11395/com.rxandroid.test1 D/----->: ------->onNext執行緒:main
12-26 03:57:34.811 11395-11395/com.rxandroid.test1 D/----->: ------->call執行緒:main


12-26 03:57:37.180 11395-11565/com.rxandroid.test1 D/----->: ------->onNext執行緒:RxNewThreadScheduler-1
12-26 03:57:37.180 11395-11565/com.rxandroid.test1 D/----->: ------->call執行緒:RxNewThreadScheduler-1


12-26 03:57:39.810 11395-11582/com.rxandroid.test1 D/----->: ------->onNext執行緒:RxCachedThreadScheduler-1
12-26 03:57:39.812 11395-11582/com.rxandroid.test1 D/----->: ------->call執行緒:RxCachedThreadScheduler-1


12-26 03:57:42.112 11395-11601/com.rxandroid.test1 D/----->: ------->onNext執行緒:RxComputationThreadPool-1
12-26 03:57:42.113 11395-11601/com.rxandroid.test1 D/----->: ------->call執行緒:RxComputationThreadPool-1


12-26 03:57:44.373 11395-11395/com.rxandroid.test1 D/----->: ------->onNext執行緒:main
12-26 03:57:44.374 11395-11395/com.rxandroid.test1 D/----->: ------->call執行緒:main

瞭解了這幾個Scheduler,我們就可以使用subscribeOn()和observeOb()兩個方法來對執行緒精選控制了

subscribeOn(): 是指subscribe()所發生的執行緒,即Observable.OnSubscibe被啟用時所處的執行緒,或者事件產生的的執行緒

observeOn() 指定Subscriber所執行的執行緒,或者叫做事件消費的執行緒

例項:常見模型 後臺處理 前臺展示

public void ObserverMainThreadSubscribeIo() {
        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("sss");
                LogUtils.d("------->call執行緒:" + Thread.currentThread().getName());
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                LogUtils.d("------->onNext執行緒:" + Thread.currentThread().getName());
            }
        });
    }

列印結果:

12-26 04:10:22.691 17917-17997/com.rxandroid.test1 D/----->: ------->call執行緒:RxCachedThreadScheduler-1
12-26 04:10:22.712 17917-17917/com.rxandroid.test1 D/----->: ------->onNext執行緒:main


比較好的一個例子:載入圖片

 /**
     * 載入圖片將會發生在 IO 執行緒,而設定圖片則被設定在了主執行緒。這就意味著,即使載入圖片耗費了幾十甚至幾百毫秒的時間,也不會造成絲毫介面的卡頓。
     */
    private void loadIamge() {
        Observable.create(new Observable.OnSubscribe<Drawable>() {
            @Override
            public void call(Subscriber<? super Drawable> subscriber) {
                Drawable drawable = getResources().getDrawable(R.drawable.ic_launcher);
                subscriber.onNext(drawable);
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Drawable>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Drawable drawable) {
                imgView.setImageDrawable(drawable);
            }
        });
    }