Android自助餐之RxJava手冊
阿新 • • 發佈:2018-12-05
Android自助餐之RxJava手冊
下載完整原始碼
觀察者
- Observer
- onNext()
- onCompleted();
- onError();
- Subscriber
- 繼承Observer
- onStart();在開始傳送事件前
- subscriber.unsubscribe();取消訂閱
被觀察物件
Observable.create(Observable.OnSubscribe());
Observable.create(new Observable.OnSubscribe<String>(){ @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("a"); subscriber.onNext("aa"
- Observable.just();
Observable.just("a","aa","aaa");
- Observable.from();
ArrayList<String> strings = new ArrayList<>();
Observable.from(strings);
方法封裝
FuncX
封裝帶有x個引數的帶返回值方法。ActionX
封裝帶有x個引數的無返回值方法。public static void demoOfAction() { Observable.from(DataManager.getInstance().getData()) .subscribe(new Action1<String>() { @Override public void call(String s) { LogUtil.e("next",s); } });//將事件發給訂閱者 }
型別轉換
map()一對一轉換
public static void demoOfMap(){ Observable.from(DataManager.getInstance().getData()) .map(new Func1<String, Car>() { private int count=0; @Override public Car call(String s) { return new Car(count++,s); } })//將事件內容轉型 .subscribe(new Action1<Car>() { @Override public void call(Car car) { LogUtil.e("next",car.getStyle()); } });//將事件發給訂閱者 }
- flatMap()一對多轉換
public static void demoOfFlatMap() {
//一個人有多輛車,輸出每個人的每個車
Observable.from(DataManager.getInstance().getPserson())
.flatMap(new Func1<Person, Observable<Car>>() {
@Override
public Observable<Car> call(Person person) {
return Observable.from(person.getCars());
}
})
.subscribe(new Action1<Car>() {
@Override
public void call(Car car) {
LogUtil.e("car",car.getStyle());
}
});
}
執行緒控制
- subscribeOn()指定被觀察物件執行緒
- observeOn()指定觀察者執行緒
- Schedulers.immediate();當前執行緒
- Schedulers.newThread();新開一個執行緒
- Schedulers.io();與newThread()差不多,但io()中內建無數量上限的執行緒池。注意不要把計算操作放在這裡,避免建立多餘的執行緒。
- Schedulers.computation();與io()的區別在於其執行緒池大小固定,大小為CPU核心數。注意不要把I/O操作放在這裡,避免浪費CPU。
- AndroidSchedulers.mainThread();UI執行緒。
public static void demoOfScheduler() {
//建立觀察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(String s) {
LogUtil.e("next-thread",Thread.currentThread().getId()+"<->"+Thread.currentThread().getName());
}
};
//建立被觀察物件
Observable.OnSubscribe<String> observable = new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
LogUtil.e("send-thread",Thread.currentThread().getId()+"<->"+Thread.currentThread().getName());
subscriber.onNext("gfsdh");
subscriber.onNext("dfshs");
subscriber.onNext("fdsa");
subscriber.onCompleted();
}
};
//開始搞起
Observable.create(observable)
.subscribeOn(Schedulers.io())//指定事件發生執行緒
.observeOn(Schedulers.newThread())//指定下一個事件處理執行緒
.map(new Func1<String, Car>() {
private int count=0;
@Override
public Car call(String s) {
LogUtil.e("to-car-thread",Thread.currentThread().getId()+"<->"+Thread.currentThread().getName());
return new Car(count++,s);
}
})//將事件內容轉型
.observeOn(Schedulers.newThread())//指定一下一個事件處理執行緒
.map(new Func1<Car, Person>() {
private int count=0;
@Override
public Person call(Car car) {
LogUtil.e("to-person-thread",Thread.currentThread().getId()+"<->"+Thread.currentThread().getName());
return new Person(car.getStyle(),count++);
}
})//將事件內容轉型
.observeOn(Schedulers.computation())//指定一下一個事件處理執行緒
.map(new Func1<Person, String>() {
@Override
public String call(Person person) {
LogUtil.e("to-string-thread",Thread.currentThread().getId()+"<->"+Thread.currentThread().getName());
return person.getName();
}
})//將事件內容轉型
.observeOn(AndroidSchedulers.mainThread())//指定一下一個事件處理執行緒
.subscribe(subscriber);//將事件發給訂閱者
}
E/send-thread: 6412<->RxCachedThreadScheduler-1
E/to-car-thread: 6410<->RxNewThreadScheduler-2
E/to-car-thread: 6410<->RxNewThreadScheduler-2
E/to-person-thread: 6409<->RxNewThreadScheduler-1
E/to-car-thread: 6410<->RxNewThreadScheduler-2
E/to-person-thread: 6409<->RxNewThreadScheduler-1
E/to-person-thread: 6409<->RxNewThreadScheduler-1
E/to-string-thread: 6408<->RxComputationThreadPool-3
E/to-string-thread: 6408<->RxComputationThreadPool-3
E/to-string-thread: 6408<->RxComputationThreadPool-3
E/next-thread: 1<->main
E/next-thread: 1<->main
E/next-thread: 1<->main