從零學習RxJava2.0-簡單入門
阿新 • • 發佈:2018-12-31
前言
- 函數語言程式設計:函數語言程式設計是種程式設計方式,它將電腦運算視為函式的計算。函式程式語言最重要的基礎是λ演算(lambda calculus),而且λ演算的函式可以接受函式當作輸入(引數)和輸出(返回值),和指令式程式設計相比,函數語言程式設計強調函式的計算比指令的執行重要。和過程化程式設計相比,函數語言程式設計裡函式的計算可隨時呼叫。
RXJava
- 當我們的非同步網路請求用的越來越多的時候,rxjava是一種能依舊讓我們的邏輯保持清晰的操作,他的原理就是建立一個Observable物件來幹活,然後使用各種操作符建立起來的鏈式操作,就如同流水線一樣,把你的資料一步一步加工,最終達到想要的效果。
- rxjava的非同步操作是通過擴充套件的觀察者模式來實現的,rxjava有四個角色,Observable,Observer,Subscriber和Subject,其中Observable和Observer通過Subscriber方法實現訂閱關係,Observable就可以在需要的時候通知Observer。
- 其實RxJava是通過擴充套件的觀察者模式來實現的,不瞭解觀察者模式的童鞋可以移駕這裡
- emmmmm,還是等會程式碼解釋
使用前新增依賴
implementation "io.reactivex.rxjava2:rxjava:2.x.y"
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
- 具體版本直接上git官方檢視就ok
- 其中RxAndroid是RxJava在android平臺上的擴充套件,它包含了一些能夠簡化Android開發的工具,比如特殊的排程器
一.使用入門
- 基本用法分三步
1.建立Observer(觀察者)
- 他決定事件觸發時的行為,先看看怎麼建立
Observer<String> mObserver = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: 我是張三,他給我發的訊息是 message = " +s );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
- 先不說他的具體執行步驟,我們先往下看
2.建立被觀察者(Observable)
Observable<String> mObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
//執行一些其他操作
//.............
//執行完畢,觸發回撥,通知觀察者
emitter.onNext("這是第一條訊息");
emitter.onNext("我來發射資料");
emitter.onComplete();
}
});
- 再往下看,實現訂閱
3.訂閱(subscribe)
mObservable.subscribe(mObserver);
- 訂閱的方式很簡單,就是用被觀察者呼叫subscribe方法來訂閱觀察者
4.運作方式
- 在我們訂閱的時候,實際上就是呼叫的被觀察者在建立的時候我們實現的ObservableOnSubscribe介面的匿名類的subscriber方法
- 不過此時並不是立即呼叫我們實現的subscribe方法裡面的邏輯,先呼叫的是觀察者的onSubscribe方法
- 然後再去執行Subscribe方法裡面的邏輯
- 而Subscribe方法的引數emitter實際上就是我們在訂閱的時候傳入的觀察者例項
- 所以我們在被觀察者裡面呼叫的emitter.onNext(“這是第一條訊息”);實際上就是回撥的是觀察者的onNext方法
- 當執行異常的時候,觀察者的onError方法就會被呼叫,同時事件佇列自動終止,不允許再有事件發出
- complete方法是在事件完畢之後執行,不過在這種方式之下我們需要手動去呼叫,就像我上面寫的
5.其他的建立被觀察者的方式
(1).Just方式
mObservable = Observable.just("這是just方式建立的被觀察者傳送的事件");
- 這裡的just引數將直接被作為Onserver的onNext方法的引數傳入
- 所以在mObservable.subscribe(mObserver);之後,log打印出來的資訊就是
onSubscribe:
onNext: 我是張三,他給我發的訊息是 message = 這是just方式建立的被觀察者傳送的事件
onComplete:
- 這裡不清楚為什麼這裡的complete方法得到了執行,百思不得其解
- 這裡的just方式是過載了十個方法,分別對應一個引數,兩個引數,…一直到十個引數
(2).formLiterable方式
List<String> list = new ArrayList<>();
for(int i = 0 ; i < 10 ;i++){
list.add("我是formIterable方式建立的第 " + i+ " 條訊息");
}
mObservable = Observable.fromIterable((Iterable<String>)list);
- 訂閱之後,得到的log為
onSubscribe:
onNext: 我是張三,他給我發的訊息是 message = 我是formIterable方式建立的第 0 條訊息
onNext: 我是張三,他給我發的訊息是 message = 我是formIterable方式建立的第 1 條訊息
onNext: 我是張三,他給我發的訊息是 message = 我是formIterable方式建立的第 2 條訊息
onNext: 我是張三,他給我發的訊息是 message = 我是formIterable方式建立的第 3 條訊息
onNext: 我是張三,他給我發的訊息是 message = 我是formIterable方式建立的第 4 條訊息
onNext: 我是張三,他給我發的訊息是 message = 我是formIterable方式建立的第 5 條訊息
onNext: 我是張三,他給我發的訊息是 message = 我是formIterable方式建立的第 6 條訊息
onNext: 我是張三,他給我發的訊息是 message = 我是formIterable方式建立的第 7 條訊息
onNext: 我是張三,他給我發的訊息是 message = 我是formIterable方式建立的第 8 條訊息
onNext: 我是張三,他給我發的訊息是 message = 我是formIterable方式建立的第 9 條訊息
onComplete:
- 所以說,這裡的list集合其實就相當於我們上面在just裡面傳入的多條引數
- 而且, Collection介面是Iterable介面的子介面,所以所有Collection介面的實現類都可以作為Iterable物件直接傳入fromIterable()方法。
(3).defer方式
mObservable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
return Observable.just("我是defer方式建立的被觀察者","我也是defer方式建立的被觀察者");
}
});
- 打印出的log資訊為
onSubscribe:
onNext: 我是張三,他給我發的訊息是 message = 我是defer方式建立的被觀察者
onNext: 我是張三,他給我發的訊息是 message = 我也是defer方式建立的被觀察者
onComplete:
(4).interval方式
Observable<Long> o = Observable.interval(4, TimeUnit.SECONDS);
o.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "interval方式建立 onSubscribe: ");
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "interval方式建立 onNext: 他給我發的數字是 " + aLong);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "interval方式建立 onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "interval方式建立 onComplete: ");
}
});
- 這種方式的作用時就像定時器一樣,按照我們在interval裡面設定的時長間隔傳送從0開始的整數序列
- 因為他所需要的泛型是長整型,所以這裡我重新建立了一個被觀察者,並且在訂閱的時候重新建立了一個觀察者,由於我在interval裡面寫的是4,TimeUnit.SECONDS,所以他將每隔四秒傳送一個整型值,所以打出的log為
interval方式建立 onSubscribe:
interval方式建立 onNext: 他給我發的數字是 0
interval方式建立 onNext: 他給我發的數字是 1
interval方式建立 onNext: 他給我發的數字是 2
interval方式建立 onNext: 他給我發的數字是 3
interval方式建立 onNext: 他給我發的數字是 4
interval方式建立 onNext: 他給我發的數字是 5
interval方式建立 onNext: 他給我發的數字是 6
- 這個方法正常情況是不會主動執行complete方法的,因為他會一直髮送下去
(5).range方式
Observable<Integer> observable = Observable.range(1,5);
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: 這次他傳送的數字是 " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
- 還是因為他只接受整型或長整型泛型的Observer,所以我重新建立了觀察者
- 其中range的第一個引數表示開始傳送的數字,第二個引數表示傳送的個數,看一下log的資訊
onSubscribe:
onNext: 這次他傳送的數字是 1
onNext: 這次他傳送的數字是 2
onNext: 這次他傳送的數字是 3
onNext: 這次他傳送的數字是 4
onNext: 這次他傳送的數字是 5
onComplete:
- 其中,第二個引數不可以為負值,當為0時就不列印
(7).Timer方式
Observable<Long> observable1 = Observable.timer(2, TimeUnit.SECONDS);
observable1.subscribe(mLongObserver);
- 啊,這裡我沒再去建立一個Long泛型的Observer,而是在外面建立了一個私有變數,啊,這些不管
- 這種方式的timer的引數為延遲第一個引數為第二個引數為單位的時間,傳送一個東西(額,我也不知道發什麼東西,反正就是延遲這麼長時間,呼叫觀察者的onNext方法)
- 看一下他的log資訊
onSubscribe:
onNext: 0
onComplete:
(8).repeat方式
- 這種方式就是將以上幾種方式一直重複,比如說我重複TImer
Observable<Long> observable2 = Observable.timer(3, TimeUnit.SECONDS).repeat();
observable2.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Long integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
- 打印出的log為
onSubscribe:
onNext: 0
onNext: 0
onNext: 0
onNext: 0
onNext: 0
onNext: 0
- 那麼他就會一直重複Timer這個動作,至於其他的也一樣,只需要給後面加一個repeat就可以
6.簡單部分補充
- 我們在使用訂閱方法的時候,常常看到有好幾個過載的subscriber方法
public final Disposable subscribe() {}
表示觀察者不對被觀察者傳送的事件作出任何響應(但被觀察者還是可以繼續傳送事件)
public final Disposable subscribe(Consumer<? super T> onNext) {}
表示觀察者只對被觀察者傳送的Next事件作出響應
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
表示觀察者只對被觀察者傳送的Next事件 & Error事件作出響應
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
表示觀察者只對被觀察者傳送的Next事件、Error事件 & Complete事件作出響應
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
表示觀察者只對被觀察者傳送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出響應
public final void subscribe(Observer<? super T> observer) {}
表示觀察者對被觀察者傳送的任何事件都作出響應
- 我們可以看到,最後一種是我們基本使用的那種
- 第一種表示當呼叫之後,只調用被觀察者實現的subscribe方法中除過事件之外的其他語句
- 比方說
mObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
//執行一些其他操作
//.............
//執行完畢,觸發回撥,通知觀察者
Log.d(TAG, "subscribe: 我要傳送第一條訊息啦");
emitter.onNext("這是第一條訊息");
Log.d(TAG, "subscribe: 我要傳送第二條訊息啦");
emitter.onNext("我來發射資料");
}
});
- 如果我們的被觀察者這樣寫,那麼當mObservable.subscriber()之後,只會執行兩個log語句,而不會指向其他的兩個語句
- 第二個到第四個方法,代表我們只實現簡單的觀察者,這裡我寫全的最後一個方法的呼叫方式為
Observable<String> observable = Observable.just("我是訊息");
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "accept: Error");
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "run: complete");
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, "accept: Subscriber");
}
});
- 其中,subscriber方法第一第二第四個引數分別實現Consumer介面,第三個引數實現Action介面
- 當我按照上面所寫的話,打印出來的log為
accept: Subscriber
accept: 我是訊息
run: complete
- 可以看出,第一個引數表示我們觀察者的onNext方法呼叫,第二個相當於onError方法呼叫,第三個引數代表complete方法呼叫,第四個引數相當於subscriber方法呼叫
- 由四個不同的過載方法比較得出,我們可以只實現第一個引數的方法,那麼他相當於只執行觀察者的onNext方法,以此類推
7.中斷觀察者與被觀察者的連線
- 通過這種方式我們可以中斷觀察者與被觀察者的連線,也就是說被觀察者你愛發不發,我都可以通過這種方式來不去接收你的訊息
Observable<String> observable = Observable.just("我是訊息1","我是訊息2","我是訊息3","我是訊息4");
observable.subscribe(new Observer<String>() {
//這是攔截器
private Disposable mDisposable;
private int cnt = 0;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: 攔截器賦值");
mDisposable = d;
}
@Override
public void onNext(String s) {
cnt++;
Log.d(TAG, "onNext: 我收到了第 "+ cnt +" 條訊息 == "+s);
if(cnt > 2){
Log.d(TAG, "onNext: 好了,我已經中斷連線了");
mDisposable.dispose();
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
- 打印出來的log資訊為
onSubscribe: 攔截器賦值
onNext: 我收到了第 1 條訊息 == 我是訊息1
onNext: 我收到了第 2 條訊息 == 我是訊息2
onNext: 我收到了第 3 條訊息 == 我是訊息3
onNext: 好了,我已經中斷連線了
- 在中斷之後就不會執行complete方法了
8.簡單小結
- 可以看到,RxJava的整個運作流程大概是這樣的
- 被觀察者 (Observable) 通過 訂閱(Subscribe) 按順序傳送事件 給觀察者 (Observer), 觀察者(Observer) 按順序接收事件 & 作出對應的響應動作
- 同時我們也可以分開分別實現帶四個引數的subscriber方法,將我們的觀察者的四個方法分開
- 同時,在合適的地方採取合適的中斷連線等等