1. 程式人生 > >Android RxJava的基本介紹

Android RxJava的基本介紹

RxJava 到底是什麼

RxJava 是一個響應式程式設計框架,採用觀察者設計模式。RxJava 在 GitHub 主頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫)。其實, RxJava 的本質可以壓縮為非同步這一個詞。說到根上,它就是一個實現非同步操作的庫。它的優點就是在於讓非同步的程式碼變得非常簡潔,完全可以代替Android的AsyncTask和Handler來實現非同步。

觀察者模式

我們之前說到RxJava的非同步實現是採用觀察者模式實現的,所以先來了解一下觀察者模式。

定義物件間的一種一對多的依賴關係,當一個物件的狀態傳送改變時,所以依賴於它的 物件都得到通知並被自動更新 。觀察者模式有個重要的作用就是解耦,將被觀察者和觀察者解耦,降低依賴,只依賴於Observer和Observable的抽象。比如A 物件(觀察者)對 B 物件(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應。觀察者模式並不需要時刻盯著被觀察者(例如 A 不需要每過 2ms 就檢查一次 B 的狀態),而是採用訂閱(Subscribe)的方式,告訴被觀察者:我需要你的某某狀態,你要在它變化的時候通知我。Android 開發中一個比較典型的例子是點選監聽器 OnClickListener 。對設定 OnClickListener 來說, View 是被觀察者, OnClickListener 是觀察者,二者通過 setOnClickListener() 方法達成訂閱關係。訂閱之後使用者點選按鈕的瞬間,Android Framework 就會將點選事件傳送給已經註冊的 OnClickListener 。採取這樣被動的觀察方式,既省去了反覆檢索狀態的資源消耗,也能夠得到最高的反饋速度。

ListView 的觀察者模式

我們熟悉的觀察者模式應該就是ListView的adapter中的notifyDataSetChanged方法,說說大致的實現過程,當然瞭解的可以直接跳過。當我們呼叫adapter的notifyDataSetChanged方法時,實際上就是呼叫BaseAdapter中的DataSetObservable的notifyChanged方法,DataSetObservable也就是一個被觀察者,在這個被觀察者的方法中會遍歷所以的觀察者並且呼叫它們的onChanged方法,從而告知了觀察者發生了變化。所以現在有了被觀察者,那麼觀察者又是在哪裡呢?看原始碼可以發現在呼叫setAdapter的方法中AdapterDataSetObserver也就是我們的觀察者,然後把這個觀察者註冊到了Adapter中,實際上也就是註冊到了DataSetObservable中。所以我們呼叫notifyDataSetChanged方法就會呼叫觀察者的onChanged方法,這個onChanged方法是在AdapterDataSetObserver中實現的,在這裡就是通知ListView重新整理重新佈局介面,從而實現了觀察者模式!!

RxJava 的觀察者模式

RxJava 有四個基本概念:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。Observable 和 Observer 通過 subscribe() 方法實現訂閱關係,從而 Observable 可以在需要的時候發出事件來通知 Observer。
與傳統觀察者模式不同, RxJava 的事件回撥方法除了普通事件 onNext() (相當於 onClick() / onEvent())之外,還定義了兩個特殊的事件:onCompleted() 和 onError()。
onCompleted(): 事件佇列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個佇列。RxJava 規定,當不會再有新的 onNext() 發出時,需要觸發 onCompleted() 方法作為標誌。
onError(): 事件佇列異常。在事件處理過程中出異常時,onError() 會被觸發,同時佇列自動終止,不允許再有事件發出。
在一個正確執行的事件序列中, onCompleted() 和 onError() 有且只有一個,並且是事件序列中的最後一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在佇列中呼叫了其中一個,就不應該再呼叫另一個。

Gradle配置

    compile 'io.reactivex:rxjava:1.0.14'
    compile 'io.reactivex:rxandroid:1.0.1'

Observer

Observer 即觀察者,它決定事件觸發的時候將有怎樣的行為,就比如之前說到點選之後會產生怎樣的行為。

Observer<String> observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.i(TAG, "onCompleted: ");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError: ");
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext: "+s);
            }
        };

Subscriber

這是一個實現了Observer介面和Subscription介面的抽象類,所以它也是一個觀察者,也就是說也可以使用方式和Observer一樣,當然也有它的不同之處。Subscriber中新增了onStart方法,這個方法是在事件被訂閱但是還沒有釋出之前呼叫,在這裡面可以做一些初始化的操作,當然如果初始化操作設計一些UI操作需要在主執行緒中那麼可能就不是那麼適用,因為它是在subscribe 所發生的執行緒中被呼叫,這個執行緒不一定就是主執行緒。

Subscriber中還增加了一個unsubscribe方法,這個方法是用來取消訂閱的,可以避免記憶體洩漏,在呼叫前可以使用isUnsubscribed方法來判斷一下。

實質上Observer在訂閱的過程中也會被轉化為一個Subscriber,所以它們的使用方法是基本相似的。

Observable

Observable 即被觀察者,它決定什麼時候觸發事件以及觸發怎樣的事件。下面看看Observable的建立

 rx.Observable observable = rx.Observable.create(new rx.Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("你好啊1");
                subscriber.onNext("你好啊2");
                subscriber.onNext("你好啊3");
                subscriber.onCompleted();
            }
        });
這裡通過create()方法來建立Observable物件,create方法傳入一個OnSubscribe物件。當Observable被訂閱的時候就會呼叫這個OnSubscribe物件裡面的call方法,在這個方法中引數得到了一個subscriber觀察者,這裡就可以呼叫觀察者的方法,於是將會觸發觀察者對應的行為。實現了觀察者模式,實現了被觀察者向觀察者的事件傳遞。

當然建立Observable的方式也不只有這一種,create是最基本的一種,RxJava還提供了一些快捷的建立方式:

Observable.just()

Observable<String> observable = Observable.just("hi1", "hi2", "hi3");
這樣就如上面一樣會依次呼叫subscriber.onNext("hi1“),subscriber.onNext("hi2“),subscriber.onNext("hi3“),subscriber.onCompleted()。

Observable.from()

        String[] strings = {"難過", "快樂", "微笑"};
        Observable<String> observable = Observable.from(strings);
和上面的呼叫是一樣的,不過這裡是傳遞的一個數組

Subscribe (訂閱)

當我們寫好了Observable和Observer之後,還需要一個東西把它們關聯起來,也就是訂閱

 observable.subscribe(observer);
好了,這樣寫就可以完整的工作運行了,subscribe()方法其實主要做了以下幾件事

1.呼叫subscriber的onStart方法,之前說過Observer都會被轉化為Subscriber。

2.呼叫OnSubscribe的call()方法,OnSubscribe是在建立Observable傳入的物件

3.如果發生異常呼叫Subscriptions的unsubscribed()取消訂閱

4.最後把傳入的subscriber以Subscriptions返回,以便用於取消訂閱

其實在呼叫subscribe方法時你可以發現這裡接收的引數不只是Observer和Subscriber,還有以下三個方法

subscribe(final Action1<? super T> onNext)

subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError)

subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete)

所以我們可以採用Action1來定義onNext和onError,用Action0定義onComplete,當然也可以定義不完整的回撥,不需要onError和onComplete。可以以以下方式呼叫

Action1<String> onNext = new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d(TAG, "onNext");
            }
        };
        Action1<Throwable> onError = new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                Log.d(TAG, "onError");
            }
        };
        Action0 onComplete = new Action0() {
            @Override
            public void call() {
                Log.d(TAG, "onComplete");
            }
        };
        Observable.just("hello", "hi").subscribe(onNext);
        Observable.just("hello", "hi").subscribe(onNext, onError);
        Observable.just("hello", "hi").subscribe(onNext, onError, onComplete);
其中Action0和Action1是RxJava的兩個介面,都只是包含一個call方法,Action0是不帶引數和返回值的,Action1是帶一個引數的。

Scheduler執行緒控制

之前程式碼實現出來的都是同步觀察者,然而非同步才是RxJava的關鍵,所以我們需要引入Scheduler,Scheduler用於來切換執行緒。在預設情況下,是執行緒不變的,在哪個執行緒呼叫subscribe(),那麼就在哪個執行緒生產事件,在哪個執行緒生產事件那麼就在哪個執行緒消費事件。首先看看下面幾個常用的API

Schedulers.immediate()直接在當前執行緒執行,相當於不指定執行緒。這是預設的 Scheduler。

Schedulers.io() I/O 操作(讀寫檔案、讀寫資料庫、網路資訊互動等)所使用的 Scheduler。行為模式和 newThread() 差不多,區別在於 io() 的內部實現是是用一個無數量上限的執行緒池,可以重用空閒的執行緒,因此多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免建立不必要的執行緒。

Schedulers.newThread()總是啟用新執行緒,並在新執行緒執行操作。

Schedulers.computation()   計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的執行緒池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。

AndroidSchedulers.mainThread()指定的操作將在 Android 主執行緒執行。

subscribeOn()指定subscribe()所發生的執行緒,也就是Observable.OnSubscribe 被啟用時所處的執行緒

observeOn()指定 Subscriber 所執行在的執行緒。

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 執行緒
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主執行緒
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });
上面這段程式碼中,由於 subscribeOn(Schedulers.io()) 的指定,被建立的事件的內容 1、2、3、4 將會在 IO 執行緒發出;而由於 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 數字的列印將發生在主執行緒 。事實上,這種在 subscribe() 之前寫上兩句 subscribeOn(Scheduler.io()) 和observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常見,它適用於多數的 『後臺執行緒取資料,主執行緒顯示』的程式策略。


變換

是指對事件序列的變換,也是RxJava的核心功能之一,就是將事件序列中的物件或整個序列進行加工處理,轉換成不同的事件或事件序列。RxJava不僅支援事件物件的變換,也可以實現事件序列的變換。

map()

使用map()可以實現事件物件的變換,這是最常用的變換之一,它把Observable的物件1轉化為物件2傳送給Subscriber,看看下面的用法:

 Observable.just(1)
                .map(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        return String.valueOf(integer) + "--轉化後";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.d(TAG, "call: " + s);
                    }
                });
這裡map接收的引數是一個Func1,和之前的Action1類似,也是一個介面,裡面只有一個call方法,但是Func1是帶返回值的。從上面的程式碼可以看到,是把一個Integer型別轉化為了String型別返回,然後觀察者接收到這個String型別的值。

flatMap()

flatMap()也是一個把一個物件轉化為另外一個物件,但是比之map()更加的難懂一些,下面以一個具體的例子來說明:

比如,有這樣一個需求,有一個物件是Student,一個Student對應著有多個課程,現在要打印出每一個Student的每一個課程。現在該怎麼做?如果還是用map()的話,顯然做不到,map只是一對一的轉換,而現在需要的是一對多的轉換,所以這時就需要用到flatMap了,先看看具體的程式碼

public class Student {
    private List<String> course;

    public List<String> getCourse() {
        return course;
    }

    public void setCourse(List<String> course) {
        this.course = course;
    }
}
//初始化資料
        Student[] students = new Student[3];
        for (int i = 0; i < 3; i++) {
            List<String> list = new ArrayList<>();
            for (int j = 0; j < 2; j++) {
                list.add("學生:" + i + "的課程:" + j);
            }
            Student student = new Student();
            student.setCourse(list);
            students[i] = student;
        }

        Observable.from(students)
                .flatMap(new Func1<Student, Observable<String>>() {
                    @Override
                    public Observable<String> call(Student student) {
                        return Observable.from(student.getCourse());
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.i(TAG, "call: " + s);
                    }
                });
可以從上面看到在flatMap中的call方法返回是一個Observable物件,這個Observable物件是由傳入的事件物件(Student)建立的。這裡返回的Observable物件也不是直接傳送給Subscriber,而是把它啟用,開始傳送事件,這裡傳送的事件就是Subscriber所接受到的事件,所以在最後打印出來的就是每個學生的課程。所以總結來說,就是以下幾個步驟:

1.利用傳入的物件建立一個Observable

2.並不直接傳送這個Observable,而是啟用它,讓它開始傳送事件

3.這個中間產生的Observable傳送的事件就交給了Subscriber

這個過程中就相當於中間添加了一個輔助的Observable,就像把一個物件裡面包含的多個物件平鋪出來,實現一對一的去傳送,這就是所謂的flat。