1. 程式人生 > >RxJava學習筆記---簡單使用

RxJava學習筆記---簡單使用

如果覺得一篇文章寫得好,不要放到收藏夾裡面,馬上把它看完,如果兩天內還沒開始看,那就可以刪掉了
如果覺得一樣技術很好,那就馬上去學,不要拖延,不要找藉口。如果你一週內還沒開始行動,還不如坦蕩點放棄
恰如克林克茲所說:
與其感嘆路難行,不如馬上出發
去年就在看RxJava,每次看一點點,遇到障礙,感嘆要理解的東西太麻煩就放棄了。今年決心要搞定它,學習的過程註定孤獨單調,恰好現在開始寫Blog,就順便記錄下學習筆記,以此自勉。在後面的2-4周計劃:

  1. 明白RxJava的基本用法,API中常用類、函式、變換
  2. 學習使用RxJava寫的開源App,提升熟練度
  3. 研究RxJava內部實現,研究他為什麼要如此設計API,如此好用
  4. 用RxJava寫一個App,開源

RxJava是什麼

Rx是什麼

Rx全稱Reactive Extensions,譯為響應式拓展。

微軟最先提出這個概念,借用MSDN上的定義:Reactive Extensions(Rx)是一個類庫,它集成了非同步、基於可觀察(observable)序列的事件驅動程式設計和LINQ-style的查詢操作,使用Rx,開發人員

  • 可以用observable物件描述非同步資料流
  • 使用LINQ操作符非同步查詢資料
  • 使用Schedulers控制非同步過程中的併發
    簡而言之
Rx = Observables + LINQ + Schedulers

Rx已經滲透到各個語言中,於是有了:
RxJava、RxJS、Rx.NET、UniRx、RxScala、RxCpp、 RxSwift、RxPHP、Ruby: Rx.rb、Python: RxPY等等
基於frameworks上還有
RxNetty、RxAndroid、RxCocoa

RxJava的定義

RxJava是Rx在Java語言上的擴充套件。
RxJava
– Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.


譯為:一個在 Java VM 上使用可觀測的序列來組成非同步的、基於事件的程式的庫

RxJava怎麼用

build.gradle—Module檔案中匯入

    compile 'io.reactivex:rxandroid:1.2.1'
    compile 'io.reactivex:rxjava:1.1.6'

RxJava觀察者模式

RxJava觀察者有4個基本概念組成:

  1. Observer 觀察者
  2. Observable 被觀察者
  3. subscribe() 訂閱
  4. 事件

建立Observer

Observer譯為觀察者,直接建立實現

        Observer<String> observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted: ");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: " + s);
            }
        };

對比常用的給控制元件註冊一個監聽事件

        class MyListener extends View.OnClickListener {
            @Override
            public void onClick(View v) {
                Log.d(TAG, "onClick: ");
            }
        }

這裡的RxJava的onNext()方法就對應MyListener的onClick()方法,不過比起後者Observer還多了onCompleted()onError()

  1. onNext()事件響應
  2. onCompleted()事件佇列結束。不再有新的onNext()發出時,需要觸發其作為結束。
  3. onError() 事件佇列異常。事件佇列處理中若出現異常,onError觸發,佇列終止。
    一個事件佇列中,ErrorCompleted有且只有一個,並且是事件佇列的最後一個。

建立Subscriber

 Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted: ");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: " + s);
            }
        };

基本使用的話,SubscriberObserver使用完全一致,實際上在RxJava的subscribe過程中,Observer也常常轉換為Subscriber再使用。但是SubscriberOberver的功能升級
檢視其實現程式碼

public abstract class Subscriber<T> implements Observer<T>, Subscription {
    ....
}
  1. 我們知道,Subscriber是一個抽象類,所以我們剛才實現的時候需要實現onCompletedonErroronNext三個方法
  2. 實現Observer<T>介面
  3. 實現Subscription介面
    Observer<T>介面的實現表明Observer能實現的東西他都可以實現
public interface Observer<T> {
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);
}

那麼Subscription介面實現呢

public interface Subscription {
    void unsubscribe();
    boolean isUnsubscribed();
}

顧名思義,unsubscribe()表示反註冊;isUnsubscribed()表示是否已經註冊,反註冊和是否已經註冊出現了,哪註冊呢,在這裡

    private final SubscriptionList subscriptions;

    public final void add(Subscription s) {
        subscriptions.add(s);
    }

    @Override
    public final void unsubscribe() {
        subscriptions.unsubscribe();
    }

        @Override
    public final boolean isUnsubscribed() {
        return subscriptions.isUnsubscribed();
    }

add()就是註冊。通過SubscriptionList subscriptions方法轉發實現,我們看看SubscriptionList是什麼

public final class SubscriptionList implements Subscription {

    private LinkedList<Subscription> subscriptions;
    private volatile boolean unsubscribed;

    public SubscriptionList(Subscription s) {
        this.subscriptions = new LinkedList<Subscription>();
        this.subscriptions.add(s);
    }

    @Override
    public boolean isUnsubscribed() {
        return unsubscribed;
    }

    public void add(final Subscription s) {
        if (s.isUnsubscribed()) {
            return;
        }
        if (!unsubscribed) {
            synchronized (this) {
                if (!unsubscribed) {
                    LinkedList<Subscription> subs = subscriptions;
                    if (subs == null) {
                        subs = new LinkedList<Subscription>();
                        subscriptions = subs;
                    }
                    subs.add(s);
                    return;
                }
            }
        }
        s.unsubscribe();
    }


    @Override
    public void unsubscribe() {
        if (!unsubscribed) {
            List<Subscription> list;
            synchronized (this) {
                if (unsubscribed) {
                    return;
                }
                unsubscribed = true;
                list = subscriptions;
                subscriptions = null;
            }
            unsubscribeFromAll(list);
        }
    }

    private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
        if (subscriptions == null) {
            return;
        }
        List<Throwable> es = null;
        for (Subscription s : subscriptions) {
            try {
                s.unsubscribe();
            } catch (Throwable e) {
                if (es == null) {
                    es = new ArrayList<Throwable>();
                }
                es.add(e);
            }
        }
        Exceptions.throwIfAny(es);
    }

看到開始定義的

    private LinkedList<Subscription> subscriptions;
    private volatile boolean unsubscribed;

大概就能猜出來,就是通過連結串列來操作的,具體不再深究。回到主題比起Observer來說Subsriber大概有以下幾個優點

  1. 可以使用unsubscribe()取消訂閱。有什麼好處呢?其實和我們使用廣播的時候,註冊和反註冊是一樣的道理,防止記憶體洩漏。
  2. 可以通過isUnsubscribed()獲取當前事件是否註冊,封裝好的方法可以更容易去判斷當前的事件註冊與否的狀態。
  3. onStart()可以在Subscriber剛開始的時候做一些準備工作。我們看到他是一個空實現
    public void onStart() {
        // do nothing by default
    }

就如同在使用Volley或者okhttp的時候也有這種同名方法,做一些初始化操作在請求網路最開始的部分。

建立Observable

Observable.create

  Observable observable = Observable.create(new Observable.OnSubscribe() {
        @Override
        public void call(Object o) {
            subscriber.onNext("1");
            subscriber.onNext("2");
            subscriber.onNext("3");
            subscriber.onCompleted();
        }
    });

儲存一個OnSubscribe物件作為計劃表,一旦Observable被訂閱,call()方法就會被呼叫,然後按照對onNext()onCompleted()設定的順序依次執行。

Observable.just

上面的例子可以等價於

    Observable observableJust = Observable.just("1", "2", "3");

just—將引數依次發出去

Observable.from

也等價於這種寫法

    String[] tests = {"1", "2", "3"};
    Observable observableFrom = Observable.from(tests);

from—顧名思義從什麼地方來

執行

使用

        observable.subscribe(subscriber);
        observable.subscribe(observer);

事件鏈就可以跑起來了。
這裡subscribe()方法通過下面的方法轉發

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        //...省略部分程式碼
        //執行onStart初始方法
        subscriber.onStart();

        //執行Subscriber預設的call()方法
        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);

        //返回傳入的subscriber
        return hook.onSubscribeReturn(subscriber);
    }

列印陣列

把陣列內的值依次打印出來

    private static final Integer[] INT_ARRAYS = {1, 2, 3, 4, 5};

    Observable.from(INT_ARRAYS).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d(TAG, "call: " + integer);
            }
        });

顯示圖片

把mipmap檔案裡的圖片顯示到介面

  private ImageView ivShow;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        ivShow = (ImageView) findViewById(R.id.ivShow);
        Observable.create(new Observable.OnSubscribe<Drawable>() {
            @Override
            public void call(Subscriber<? super Drawable> subscriber) {
                Drawable drawable = getTheme().getDrawable(R.mipmap.bg_splash);
                subscriber.onNext(drawable);
                subscriber.onCompleted();
            }
        }).subscribe(new Observer<Drawable>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                Toast.makeText(MainActivity.this, e.toString(), Toast.LENGTH_SHORT).show();
            }

            @Override
            public void onNext(Drawable drawable) {
                ivShow.setImageDrawable(drawable);
            }
        });
    }

但是上面的實現沒有任何意義,因為在同一個執行緒裡面使用還不如直接呼叫。RxJava最大的特性是前臺回撥,後臺處理,是伴隨著多執行緒應運而生的。

Scheduler

        //直接執行在當前執行緒
        Schedulers.immediate();

        //開啟一個新執行緒執行任務

        //內部有一個沒有上限的執行緒池,可以重用空閒執行緒
        //相當於    ExecutorService executorService = Executors.newCachedThreadPool();
        Schedulers.io();

        //有一個固定執行緒池,大小為CPU核心數+1
        //等價於  Executors.newFixedThreadPool(nCore+1)
        Schedulers.computation();

剛才的例子:
被建立的陣列值在IO執行緒發出,在主執行緒處理列印

        Observable.from(INT_ARRAYS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d(TAG, "call: " + integer);
                    }
                });

在IO執行緒載入圖片,在主執行緒顯示圖片

        Observable
                .create(new Observable.OnSubscribe<Drawable>() {
                    @Override
                    public void call(Subscriber<? super Drawable> subscriber) {
                        Drawable drawable = getTheme().getDrawable(R.mipmap.bg_splash);
                        subscriber.onNext(drawable);
                        subscriber.onCompleted();
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Drawable>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {
                        Toast.makeText(MainActivity.this, e.toString(), Toast.LENGTH_SHORT).show();
                    }

                    @Override
                    public void onNext(Drawable drawable) {
                        ivShow.setImageDrawable(drawable);
                    }
                });