1. 程式人生 > >RxJava2.0初學總結筆記

RxJava2.0初學總結筆記

(個人知識總結,學習RxJava的時候主要是參照作者Carson_Ho的簡書)

一.RxJava的作用

RxJava主要是基於事件流,實現非同步操作的一個庫,鏈式呼叫。且RxJava採用的是Java中很出名的“觀察者模式”。(在學習了RxJava之後,大家除了什麼“單例模式”之外,又懂一種新的模式了---觀察者模式。)

類似於Handler,AsyncTask。

二.使用RxJava的優點

個人體會還是主要是使用RxJava會使邏輯清晰。

三.RxJava主要的幾個類

1.Observable(被觀察者)

2.Observe(觀察者)

3.Suscribe(訂閱)

三個類的事件傳遞方向為:Observable(產生事件

)---subscribe(將被觀察者和觀察者進行繫結)--->Observe(接收處理事件

通俗可以理解為:(大哥)Observe,(小弟)Observable(),被觀察者(小弟Observable)一有事,就要用電話(subscribe)給大哥Observe說。大哥接收到小弟(Observable)發出來的訊息,然後開始處理小弟(Observable所發的資訊。(個人理解,若比喻有錯誤,請指正)

既然通俗的理解了,在專案中大概哪些地方才能用到它呢?

1.很多使用是用RxJava配合Retrofit來搭建網路框架。(也是很多開發者最主要使用的方式)

2.代替Handler,AsyncTask。(對於技術不上不下的人來說,用原生AsyncTask來處理非同步操作真的要命)。

其他我還沒發現到。

四.如何初步的使用RxJava

想要使用首先肯定是要建立好我們的---小弟(Observable),大哥(Observe),還有我們的電話(Suscribe),然後再把大哥的心,小弟的心,手機的心串一串~串一個鏈式呼叫~(玩笑一下)

1.建立小弟-->Observable

create方法建立是一種很基礎簡單的方式

Observable<String> observableDemo=Observable.create(new ObservableOnSubscribe<String>() {
    @Override
public void subscribe
(@NonNull ObservableEmitter<String> observableEmitter) throws Exception { observableEmitter.onNext("1"); observableEmitter.onNext("2"); observableEmitter.onError(new Exception("發生錯誤了")); observableEmitter.onComplete(); } });

除此之外,還有一種用just()快速建立被觀察者,由程式碼可以看出這種方式很明瞭簡潔。

Observable observable2 = Observable.just("A", "B", "C");

fromXXX();//在2.0以前可以直接用from來呼叫,但是在2.0之後,我們需要確定你的資料型別,指明你的資料型別是fromArray還是fromIterator...

String[] testFrom = {"1", "2"};
//被觀察者
Observable observable = Observable.fromArray(testFrom);

from的型別一般如下所示


2.建立大哥-->Observer

這種是最基本的建立方式

//觀察者
Observer<String> observer = new Observer<String>() {
    @Override
public void onSubscribe(Disposable disposable) {
    }

    @Override
public void onNext(String s) {
    }

    @Override
public void onError(Throwable throwable) {
    }

    @Override
public void onComplete() {
    }
};
還有一種是由Subscriber來建立
Subscriber subscriber=new Subscriber() {
    @Override
public void onSubscribe(Subscription s) {
    }

    @Override
public void onNext(Object o) {
    }

    @Override
public void onError(Throwable t) {
    }

    @Override
public void onComplete() {
    }
};

相對於最基本的建立方式來說。subscriber新增了onStart(),unsubscribe()方法。

onStart():未響應事件前呼叫,可以用來做一些準備工作

unsubscribe();取消訂閱,當呼叫這個方法之後,觀察者將不再接收和響應事件

*在呼叫unsubscribe()方法之前,需要用isunsubscribe()判斷狀態,判斷被觀察者(Observable)是否還持有觀察者(Observer)Subscriber的引用,如果引用不能及時釋放,就會出現記憶體洩漏。

3.訂閱(subscribe)---->電話

observable.subscribe(observer);//被觀察者,訂閱,觀察者

4.鏈式建立

一開始就說RxJava是基於事件流的鏈式呼叫,現在把它們連結起來看一下。

Observable.create(new ObservableOnSubscribe<Integer>() {//建立被觀察者
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> observableEmitter) throws Exception {
        observableEmitter.onNext(1);
observableEmitter.onNext(2);
observableEmitter.onComplete();
}
}).subscribe(//繫結
new Observer<Integer>() {//建立觀察者
@Override
public void onSubscribe(@NonNull Disposable disposable) {
    }

    @Override
public void onNext(@NonNull Integer integer) {
        Log.i("t","被觀察者所發出的值="+integer);
}

    @Override
public void onError(@NonNull Throwable throwable) {
    }

    @Override
public void onComplete() {
    }
});

這種鏈式的好處就是使邏輯上變得簡單清晰。這也是RxJava的一個優點。

5.切斷小弟(Observable)和大哥(Observer)的連結---->Disposable.dispose();

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);emitter.onComplete();emitter.onNext(4);
}
}).subscribe(new Observer<Integer>() {
    private Disposable mDisposable;
    private int i;
@Override
public void onSubscribe(Disposable d) {mDisposable = d;
}

    @Override
public void onNext(Integer value) {
        Log.i("t", "onNext: " + value);
i++;
        if (i == 2) {
            Log.i("t", "dispose");
mDisposable.dispose();
Log.i("t", "isDisposed : " + mDisposable.isDisposed());//若已切斷,isDisposed=true;
}
    }

    @Override
public void onError(Throwable e) {}

    @Override
public void onComplete() {}
});

五.一些常見常用操作符介紹

just();//快速建立被觀察者物件。最多隻能傳送10個事件。

Observable observable2 = Observable.just("A", "B", "C");

類似於:onNext("A");onNext("B");onNext("C");

fromArray();//快速建立被觀察者物件。傳送10個以上的事件。陣列形式

fromIterable();//快速建立被觀察者物件。傳送10個以上的事件。集合形式

觀察者接收到的資料為遍歷資料。

empty();//僅傳送complete事件,直接通知完成

error();//僅傳送error事件,直接通知異常

never();//不傳送任何事件

-----------------------------------延遲操作符---------------------------------------

defer();//快速建立被觀察者物件。此方法作用:直到有觀察者訂閱才動態建立被觀察者物件,併發送事件。每次訂閱後都會得到一個新的被觀察者(Observable),可以確保Observable的資料是最新的。

//延時操作.有觀察者訂閱才能動態建立被觀察者
Observable<Integer> mobservable=Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
    @Override
public ObservableSource call() throws Exception {
        return Observable.just(i);
}
});
i=15;
//觀察者被訂閱,開始動態建立被觀察者
mobservable.subscribe(new Observer<Integer>() {
    @Override
public void onSubscribe(@NonNull Disposable disposable) {
    }

    @Override
public void onNext(@NonNull Integer integer) {
         //i=15
    }

    @Override
public void onError(@NonNull Throwable throwable) {
    }

    @Override
public void onComplete() {
    }
});

timer();//快速建立被觀察者物件。方法作用:延遲指定時間後,傳送一個onNext(0);的事件。(Long型別)

*timer預設執行在新執行緒,可用scheduler來指定執行緒。

//timer(延遲時間,時間單位)
Observable.timer(5, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
    @Override
public void onSubscribe(@NonNull Disposable disposable) {
    }

    @Override
public void onNext(@NonNull Long aLong) {
    }

    @Override
public void onError(@NonNull Throwable throwable) {
    }

    @Override
public void onComplete() {
    }
});

interval();//快速建立被觀察者物件。每隔指定時間就傳送事件。  

//1第一次延遲的時間5s  2執行事件,每隔1s傳送一次。  3計數單位為秒
Observable.interval(5,1,TimeUnit.SECONDS).subscribe(new Observer<Long>() {
    @Override
public void onSubscribe(@NonNull Disposable disposable) {
    }

    @Override
public void onNext(@NonNull Long aLong) {
    }

    @Override
public void onError(@NonNull Throwable throwable) {
    }

    @Override
public void onComplete() {
    }
});

傳送事件序列:從0開始,五險遞增1的整數序列。

intervalRange();//快速建立被觀察者物件。每隔一段時間就傳送事件,可指定事件數量

//1.起始資料 2.事件數量 3.第一次延遲時間 4.執行的間隔秒數 5.計數單位
Observable.intervalRange(2,5,3,1,TimeUnit.SECONDS).subscribe(new Observer<Long>() {
    @Override
public void onSubscribe(@NonNull Disposable disposable) {

    }

    @Override
public void onNext(@NonNull Long aLong) {
         //事件資料=2,3,4,5,6
    }

    @Override
public void onError(@NonNull Throwable throwable) {

    }

    @Override
public void onComplete() {

    }
});

range();//快速建立被觀察者物件。類似於intervalRange(),但是range()無延遲傳送事件。

//1.事件序列起點=0  2.指定事件數量=3
Observable.range(0,3).subscribe(new Observer<Integer>() {
    @Override
public void onSubscribe(@NonNull Disposable disposable) {
    }

    @Override
public void onNext(@NonNull Integer integer) {
        //收到的事件=0,1,2
}

    @Override
public void onError(@NonNull Throwable throwable) {
    }

    @Override
public void onComplete() {
    }
});

rangeLong();//快速建立被觀察者物件。類似於range(),區別是,rangeLong()支援Long類資料。

------------------------------------變換操作符--------------------------------------

map();//將被觀察者傳送的事件轉換為任意的型別的事件

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
public void subscribe(@NonNull ObservableEmitter<String> observableEmitter) throws Exception {
        //事件傳遞為string="1"
observableEmitter.onNext("1");
}
    //map操作Function來對被觀察者所發出的事件型別進行變換。string->integer
}).map(new Function<String, Integer>() {
    @Override
public Integer apply(@NonNull String s) throws Exception {
        return null;
}
}).subscribe(new Consumer<Integer>() {
    @Override
public void accept(Integer integer) throws Exception {
        //接收到的integer=1
}
});

FlatMap();//將被觀察者傳送的事件序列進行拆分轉換,再合併為一個新的事件序列,最後再發送。(類似於打亂事件順序)

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
public void subscribe(@NonNull ObservableEmitter<Integer> observableEmitter) throws Exception {
        observableEmitter.onNext(100);
observableEmitter.onNext(101);
observableEmitter.onNext(102);
}
    //floatMap把原來的事件生產序列進行拆分,再將每個事件拆分轉換成新的,傳送三個string最後合併,再發送給被觀察者
}).flatMap(new Function<Integer, ObservableSource<String>>() {
    @Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
        List<String> list=new ArrayList<String>();
        for(int i=0;i<3;i++){
            list.add("原事件="+integer+"拆分的新事件="+i);
}
        return Observable.fromIterable(list);
}
}).subscribe(new Consumer<String>() {
    @Override
public void accept(String s) throws Exception {

    }
});

*新事件的合成順序與舊事件的順序無關。

concatMap();//類似於flatmap(),用法和flatmap差不多.區別是新事件的順序=被觀察者舊序列生產的順序。有序的變換事件序列。

Buffer();//定期從被觀察者中獲取一定數量的事件,並放到快取區中。然後傳送。用於快取被觀察者中的事件。

//buffer->1.設定快取區的大小 2.設定每次獲取新事件的數量
//例如第一次是3,4,5被存入,第二次是2,3,4(新事物增加為1,向指標移動一位),一直到nullnull1終止儲存
Observable.just(1,2,3,4,5).buffer(3,1).subscribe(new Observer<List<Integer>>() {
    @Override
public void onSubscribe(@NonNull Disposable disposable) {

    }

    @Override
public void onNext(@NonNull List<Integer> integers) {

    }

    @Override
public void onError(@NonNull Throwable throwable) {

    }

    @Override
public void onComplete() {

    }
});
----------------------------------組合/合併操作符-----------------------------------

concat:組合多個被觀察者一起傳送資料,合併後按傳送順序串執行。組合被觀察者數量<=4

concatArray:組合多個被觀察者一起傳送資料,合併後按傳送順序串執行組合被觀察者數量>4

concat和concatArray的用法類似,只是數量不一樣而已。

Observable.concat(Observable.just(1,2,3),
Observable.just(10,11,12)).subscribe(new Observer<Integer>() {
    @Override
public void onSubscribe(@NonNull Disposable disposable) {
    }

    @Override
public void onNext(@NonNull Integer integer) {
        //接收到的事件=1,2,3,10,11,12
}

    @Override
public void onError(@NonNull Throwable throwable) {
    }

    @Override
public void onComplete() {
    }
});

merge()/mergeArray():組合多個被觀察者一起傳送資料,合併後按時間線並行執行

兩者的區別:merge組合被觀察者數量<=4      mergeArray>4(兩者用法相似,只是數量不同)

//1.0開始傳送,共發2個數據,第一次事件延遲2s,間隔3s
//2.1開始傳送,共傳送3個數據,第一次事件延遲2s,間隔3s
Observable.merge(Observable.intervalRange(0,2,2,3,TimeUnit.SECONDS),
Observable.intervalRange(1,3,2,3,TimeUnit.SECONDS)).subscribe(new Observer<Long>() {
            @Override
public void onSubscribe(@NonNull Disposable disposable) {

            }

            @Override
public void onNext(@NonNull Long aLong) {
                //接收到的資料0,1--->1,2--->3
}

            @Override
public void onError(@NonNull Throwable throwable) {

            }

            @Override
public void onComplete() {

            }
        });

concatDelayError()/mergeDelayError();//多個被觀察者中的某一個丟擲onError,正常執行是其他的被觀察者就不會執行了。但是concatDelay和mergeDelayError則可以讓所有的被觀察者都發送事件結束之後再觸發onError。(兩者使用相似,用相應的方式即可)

Observable.concatArrayDelayError(
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
// 傳送Error事件,因為使用了concatDelayError,所以第2Observable將會發送事件,等傳送完畢後,再發送錯誤事件
emitter.onError(new NullPointerException());
emitter.onComplete();
}
        }),
Observable.just(4, 5, 6))
        .subscribe(new Observer<Integer>() {
            @Override
public void onSubscribe(Disposable d) {
            }
            @Override
public void onNext(Integer value) {
            }

            @Override
public void onError(Throwable e) {
            }

            @Override
public void onComplete() {
            }
        });

Zip();//合併多個被觀察者傳送的事件,生成一個新的事件序列,然後傳送。

*最終合併事件數量=多個被觀察者中數量最少的數、

Observable.zip(Observable.just(1, 2, 3,9), Observable.just(10, 11, 12),
//1.被觀察者物件1的型別  2.被觀察者2的型別  3合併的型別
//Observable9最終也會被髮送出來,但是如果在1,2被觀察者事件序列之後complete,則不會發送9
new BiFunction<Integer, Integer, String>() {
            @Override
public String apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
                return integer+integer2+"";
}
        }).subscribe(new Observer<String>() {
    @Override
public void onSubscribe(@NonNull Disposable disposable) {
    }

    @Override
public void onNext(@NonNull String s) {
        //最終接收到的=110,211,312
}

    @Override
public void onError(@NonNull Throwable throwable) {
    }

    @Override
public void onComplete() {
    }
});

一般應用於,展示的資訊從多個地方獲取並且統一結合後展示。

網路請求傳送和結果的統一顯示。(Retrofit+RxJava)

combineLatest();//按事件合併,在同一個事件點上合併  兩個被觀察者物件中,任何一個先發送事件的最後一個數據和另外一個被觀察者物件的每一個數據結合。然後傳送。

Observable.combineLatest(Observable.just(1L, 2L, 3L),
//0開始,3個數,第一次延遲1s,間隔1s
Observable.intervalRange(5,3,1,1,TimeUnit.SECONDS),
        new BiFunction<Long, Long, Long>() {
    @Override
public Long apply(@NonNull Long integer, @NonNull Long integer2) throws Exception {
       // integer3 integer2=5;  integer=3 integer2=6;  integer=3 integer2=7;
return integer+integer2;
}
}).subscribe(new Observer<Long>() {
    @Override
public void onSubscribe(@NonNull Disposable disposable) {
    }

    @Override
public void onNext(@NonNull Long integer) {
        //結果=8,9,10
}

    @Override
public void onError(@NonNull Throwable throwable) {
    }

    @Override
public void onComplete() {
    }
});

combineLatestDelaError();//和concatDelay(),mergeDelay();用法意義都是一樣的。

reduce();//把被觀察者中的事件聚合成一個事件,併發送。

Observable.just(1,2,3,4).reduce(new BiFunction<Integer, Integer, Integer>() {
    @Override
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
        //第一次是1+2=3,返回數字3,第二次,3+3=6,用返回數加下一個數,第三次6+4=10用返回數加下一個數4.
return integer+integer2;
}
}).subscribe(new Consumer<Integer>() {
    @Override
public void accept(Integer integer) throws Exception {
        //integer=3,6,10
}
});

collect();//把被觀察者的事件資料收集到一個數據結構裡。

Observable.just(1,2,3,4).collect(
        //建立資料結構(容器),用於收集被觀察者傳送的資料
new Callable<ArrayList<Integer>>() {
    @Override
public ArrayList<Integer> call() throws Exception {
        return new ArrayList<>();
}
    //對傳送的資料進行收集
}, new BiConsumer<ArrayList<Integer>, Integer>() {
    @Override
public void accept(ArrayList<Integer> integers, Integer integer) throws Exception {
        integers.add(integer);//資料收集
}
}).subscribe(new Consumer<ArrayList<Integer>>() {
    @Override
public void accept(ArrayList<Integer> integers) throws Exception {
        //接收到的陣列integers[1,2,3,4]
}
});

startWith()/startWithArray();//傳送事件前,追加發生事件。

在被觀察者在傳送事件前,追加發送一個(多個)資料/一個新的Observable(被觀察者)

Observable.just(1,2,3)
        .startWith(7)//追加一個數據
.startWithArray(8,9,10)//追加多個數據
.startWith(Observable.just(11,12))//追加被觀察者
.subscribe(new Observer<Integer>() {
    @Override
public void onSubscribe(@NonNull Disposable disposable) {
    }

    @Override
public void onNext(@NonNull Integer integer) {
    }

    @Override
public void onError(@NonNull Throwable throwable) {
    }

    @Override
public void onComplete() {

    }
});

count();//統計被觀察者傳送事件的數量

Observable.just(1,2,3,4,5,6).count().subscribe(new Consumer<Long>() {
    @Override
public void accept(Long aLong) throws Exception {
        //傳送事件數量along=6
}
});
為了清晰自己的認知,盜用一張Carson_Ho大神的講解圖,超級全面!強推他的RxJava講解,https://www.jianshu.com/p/cd984dd5aae8


此為個人學習筆記,主要參照Carson_Ho大神的簡書文章來總結出來自己習慣看的模式的樣子。要是我理解有誤的地方,麻煩各