Rxjava - 操作符,線程操作的簡單使用
目錄
- 創建操作符
- 10種常用的操作符定義
- 下面做幾個操作符的demo演示
- create
- from
- repeat
- defer
- interval
- Scheduler
- 什麽是Scheduler?
- 如何使用Scheduler
操作符是用來幹什麽的?Rxjava中的每一個操作符基本都是用來創建Observable,也就是被訂閱者。RxJava中常用的操作符包括:創建操作符,連接操作符,工具操作符,變換操作符,過濾操作符,條件操作符,布爾操作符,合並操作符。本次著重了解創建操作符的用法。
創建操作符
10種常用的操作符定義
摘自《RxJava實戰》
just:將一個或多個對象轉換成發射這個或這些對象的一個Observable; from:將一個Interable,一個Future或者一個數組轉換成一個Observable; create:使用一個函數從頭創建一個Observable; defer:只有當訂閱者訂閱才創建Observable,為每個訂閱創建一個新的Observable; range:創建一個發射指定範圍的整數序列的Observable; interval:創建一個按照給定的時間間隔發射整數序列的Observable; timer:創建一個在給定的延時之後發射單個數據的Observable; empty:創建一個什麽都不做直接通知完成的Observable; error:創建一個什麽都不做直接通知錯誤的Observable; never:創建一個不發射任何數據的Observable。
下面做幾個操作符的demo演示
create
此處模擬create中報錯的場景
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> observableEmitter) throws Exception { try { if (!observableEmitter.isDisposed()) { for (int i = 0; i < 10; i ++) { observableEmitter.onNext(i); String str = null; str.length(); } observableEmitter.onComplete(); } } catch (Exception e) { observableEmitter.onError(e); } } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("Next: " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { System.out.println("Error: " + throwable.getMessage()); } }, new Action() { @Override public void run() throws Exception { System.out.println("Sequence complete. "); } });
運行結果:
just就不寫了,之前寫hello world的時候就用過了。
from
發射Iterable或者數組的每一項數據
Observable.fromArray("hello", "from").subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
運行結果:
repeat
重復的發射原始Observable的數據序列,次數通過repeat(n)指定
Observable.just("hello repeat")
.repeat(3)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
運行結果:
defer
這裏我們在observable訂閱前睡眠1秒,我們發現只有當observable被訂閱了,發射的“hello defer”這條消息才被打印出來
Observable observable = Observable.defer(new Callable<ObservableSource<?>>() {
@Override
public ObservableSource<?> call() throws Exception {
return Observable.just("hello defer");
}
});
TimeUnit.SECONDS.sleep(1);
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String str) throws Exception {
System.out.println(str);
}
});
運行結果:
interval
interval是按照固定的時間發射一個無限遞增的整數序列
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println(aLong);
}
});
TimeUnit.SECONDS.sleep(10);
運行結果:
Scheduler
什麽是RxJava線程?RxJava是一個為異步線程而實現的庫,所以RxJava的特點就是異步,一個通過異步編程合理提高系統處理速度的。在默認情況下,RxJava是單線程的。用Observable發射數據,Observe接受和響應數據,各種操作符來加工處理數據流,都是在同一個線程中運行的,實現出來的就是一個同步的函數響應式。其實在Observer中接受和響應數據會牽涉到多線程來操作RxJava,這些多線程我們通過調度器(Scheduler)來實現。上述總結於《RxJava實戰》這本書中。
什麽是Scheduler?
Scheduler是RxJava對線程控制器的一個抽象,RxJava內置了多個Scheduler的實現。常用的調度器有single,newThread,computation,io,trampoline,Schedulers.from這五個。當然如果自帶的調度器不能滿足需求,我們是可以自己定義Executor來作為調度器的。
如何使用Scheduler
- 切換newThread線程
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> observableEmitter) throws Exception {
observableEmitter.onNext("hello");
observableEmitter.onNext("world");
}
}).observeOn(Schedulers.newThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
- 線程調度的兩種方法,也就是線程的切換
通過observeOn或者subscribeOn方法
Observable.just("hello", "world")
.observeOn(Schedulers.newThread())
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return s.toUpperCase();
}
})
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
不同線程調度器的使用場景
- computation()
用於CPU密集型的計算任務,不適合I/O操作
- io()
用於I/O密集型任務,支持異步阻塞I/O操作,這個調度器的線程池會根據需要增長,對於普通的計算任務,還是用Schedulers.computation()
- newThread()
為每個任務創建一個新的線程。
- single()
single擁有一個線程單例,所有的任務都在這一個線程中執行,當此線程中有任務執行時,它的任務將會按照先進先出的順序依次執行。
- 幾種線程測試demo
Map map = new HashMap();
Observable.just("hello world")
.subscribeOn(Schedulers.single())
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
s = s.toUpperCase();
map.put("map1", s);
return s;
}
})
.observeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
s = s + " leo.";
map.put("map2", s);
return s;
}
})
.subscribeOn(Schedulers.computation())
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
s = s + "it is a test";
map.put("map3", s);
return s;
}
})
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
map.put("subscribe", s);
System.out.println(s);
}
});
總結:操作符,線程操作的使用,大家可以多看看相關書籍,自己多敲代碼,帶著不懂去看源碼或者相關文檔,作為初學者,現在也只是看著書上的皮毛,自己敲著書上的demo來理解。學習之路慢慢,共勉。。。
Rxjava - 操作符,線程操作的簡單使用