RxJava2小白手冊(2)- 執行緒管理和流程淺析
介紹
承接上文,結合使用場景,討論一下如何告別AsyncTask,就是因為RxJava的強大執行緒管理功能。
舉個栗子
認識RxJava之前,我們處理非同步任務的方式主要有兩種:
1. AsyncTask
2. Thread + Runnable.
涉及的程式碼量相比較RxJava而言大太多,針對Handler處理不好,可能存在記憶體洩漏的風險。不贅述,看看如何使用RxJava處理非同步任務。
1. 非同步處理
1.1 程式碼示例
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Logger("Emit 111");
e.onNext(111);
Logger("Emit 222");
e.onNext(222);
Logger("Emit onComplete");
e.onComplete();
}
});
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Logger("onSubscribe");
}
@Override
public void onNext(Integer integer) {
Logger("onNext integer = " + integer);
}
@Override
public void onError(Throwable e) {
Logger("onError e = " + e.getMessage());
}
@Override
public void onComplete() {
Logger("onComplete");
}
};
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
1.2 執行結果
可以清楚的看到,Observer下的操作是在主執行緒下完成的,而Observable下發射器的發射動作卻是在一個新的執行緒中完成的。通過這種操作,我們可以在subscribe方法中執行耗時操作,然後結果通過onNext()方法返回給主執行緒,實現非同步處理的目的。
常用的場景:訪問資料庫,網路請求資料,後臺計算操作等等。
1.3 Schedulers 和AndroidSchedulers
AndroidSchedulers是RxAndroid中的執行緒排程器,主要用途如上所示,AndroidSchedulers.mainThread,代表Android中的主執行緒(UI執行緒)。
方法 | 解釋 |
---|---|
Schedulers.computation() | 用於計算任務 |
Schedulers.from(Executor executor) | 使用指定的Executor作為排程器 |
Schedulers.io() | 用於IO密集型任務,如非同步阻塞IO操作,這個排程器的執行緒池會根據需要增長 |
Scheduler.newThread() | 為每個任務建立一個新執行緒 |
Scheduler.shutdown() | 停止排程器 |
Scheduler.single() | 單獨執行緒 |
Scheduler.start() | 啟動排程器 |
Scheduler.trampoline() | 在當前執行緒中,但是會等到當前執行緒任務執行完畢之後再去執行 |
AndroidScheduler.mainThread() | 主執行緒 |
看看原始碼
1. Observable.create()
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
先執行非空檢查,然後通過ObservableCreate來建立Observable。而ObservableCreate繼承Observable。看下程式碼
//ObservableCreate繼承Observable
public final class ObservableCreate<T> extends Observable<T> {
//ObservableOnSubscribe介面只有一個subscribe方法
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
//賦值,結合上面Observable的create方法,這個source應該是我們new出來的ObservableOnSubscribe
this.source = source;
}
//這個方法在Observable執行subscribe(Observer)的時候使用到
@Override
protected void subscribeActual(Observer<? super T> observer) {
//建立CreateEmitter,傳入observer,內部使用
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//執行observer的onSubscribe方法,parent是CreateEmitter,實現了Disposable,和我們建立Observer時實現的onSubscribe方法一致,沒毛病
observer.onSubscribe(parent);
try {
//執行subscribe方法,source為ObservableOnSubscribe物件,parent為CreateEmitter,而CreateEmitter實現ObservableEmitter介面,沒毛病
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
//CreateEmitter執行onError方法
parent.onError(ex);
}
}
//CreateEmitter類繼承AtomicReference<Disposable>實現ObservableEmitter和Disposable 介面
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
//建立過程中傳入的Observer
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
//OnNext方法
@Override
public void onNext(T t) {
//非空檢查,onNext在2.0之後不允許傳入null值作為引數
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
//這個對應上我們的上一篇部落格,一次性水管,如果isDisposed為true,則發射器發出的事件,將不會被觀察者執行
if (!isDisposed()) {
observer.onNext(t);
}
}
//onError方法,當tryOnErro返回false的時候,執行RxJavaPlugins.onError(t),何時tryOnError會返回false呢?看下面
@Override
public void onError(Throwable t) {
//當isDisposed()為true後,會執行RxJavaPlugins.onError(t)操作,也就是說如果在isDisposed()為true的情況下,發射器還發出onError()事件,會導致程式崩潰。具體看下面的執行示例。
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
//也是不允許傳入null
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
//如果isDisposed為false,執行觀察者的onError方法,然後執行dispose()操作,也就是觀察者不處理後面發射器傳送的事件了。估計onComplete()方法中也會有類似的操作流程
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
//只有當isDisposed為true的時候回返回false,也就是上一個方法回執行RxJavaPlugins.onError(t);操作
return false;
}
@Override
public void onComplete() {
//和上面onError()操作類似,不同的是沒有非空檢查,因為onComplete沒有引數。
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
……
}
……
//這部分介紹的是SerializedEmitter,暫無涉及
}
舉例說明,isDisposed()為true時,發射器繼續傳送onError事件會導致程式崩潰。
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Logger("Emit 111");
e.onNext(111);
Logger("Emit 222");
e.onNext(222);
Logger("Emit 333");
e.onNext(333);
Logger("Emit onError");
e.onError(new Throwable("Test Disposable onError"));
}
});
Observer<Integer> observer = new Observer<Integer>() {
Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
Logger("onSubscribe");
mDisposable = d;
}
@Override
public void onNext(Integer integer) {
Logger("onNext integer = " + integer);
if(mDisposable!=null && !mDisposable.isDisposed() && integer == 222) {
mDisposable.dispose();
}
}
@Override
public void onError(Throwable e) {
Logger("onError e = " + e.getMessage());
}
@Override
public void onComplete() {
Logger("onComplete");
}
};
observable.subscribe(observer);
執行結果:
2. Observer
建立Observer,無甚特殊,注意onSubscribe,onNext,onError傳入的引數不能為空以及Disposable 的使用。
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
3. observable.subscribe(observer);
分析一下subscribe方法
public final void subscribe(Observer<? super T> observer) {
//observer非空檢查
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//關聯observable和observer
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//這個方法在Observable中是個抽象方法,但是結合上面Observerable的create過程,可以知道這裡實際上呼叫的是ObservableCreate的subscribeActual方法,也就是上面我們分析的過程,沒毛病
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
一路看下來,我們就可以很快的和我們做的測試對應上,先呼叫onSubscribe方法,然後執行subscribe方法中的發射器操作,根據發射器的操作Observer作出對應的處理。
4. subscribeOn
subscribeOn用來指定Observable在哪個執行緒執行自己的subscribe方法。
public final Observable<T> subscribeOn(Scheduler scheduler) {
//scheduler非空檢查
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
建立了一個ObservableSubscribeOn,這個類千萬別和上面建立Observable過程中使用的ObservableOnSubscribe介面弄混淆,結合當前操作為subscribeOn來記住這個類名。
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
Observerable繼承自ObservableSource,所以建立ObservableSubscribeOn的時候Observable和scheduler傳遞過來。
ObservableSubscribeOn繼承
AbstractObservableWithUpstream,而後者又繼承Observable。所以實際上經過subscribeOn操作之後,後續操作的物件從Observerable變成了ObservableSubscribeOn,所以,當後面執行subscribe時執行的subscribeActual方法為ObservableSubscribeOn重的對應方法
@Override
public void subscribeActual(final Observer<? super T> s) {
//封裝Observer,實際Observer由其內部actual維護
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//Observer執行onSubscribe方法,SubscribeOnObserver實現Disposable介面,所以上面的例子中onSubscribe傳遞的是Disposable型別
s.onSubscribe(parent);
//這裡有3個操作:
//1. 建立SubscribeTask,傳入封裝後的Observer
//2. 排程器執行scheduleDirect操作
//3. 封裝後的Observer執行setDisposable操作
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
關注scheduler.scheduleDirect(new SubscribeTask(parent)),先看下
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
實現Runnable,終於看到一個有點熟悉的東西。傳入的parent為封裝後的Observer。而source則是建立ObservableSubscribeOn過程中傳入的Observable。
在看scheduleDirect方法之前,我們得先弄清楚這個scheduler是個什麼東西?在Schedulers類中,不同的Scheduler已經初始化完成。
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
就看下NEW_THREAD 這個,首先new NewThreadTask()
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
public final class NewThreadScheduler extends Scheduler {
final ThreadFactory threadFactory;
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static {
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
public NewThreadScheduler() {
this(THREAD_FACTORY);
}
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}
看到了ThreadFactory,RxThreadFactory,而RxThreadFactory實現了ThreadFactory介面,所以最後還是執行緒的使用,只是RxJava對這些基礎的東西做了深度的封裝和流程上的優化,讓我們更方便的使用。
回溯到上面的scheduleDirect方法,
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//建立工作執行緒,以NewThreadScheduler為例,是建立NewThreadWorker
final Worker w = createWorker();
//
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//封裝出一個帶Dispose的Task,方便控制
DisposeTask task = new DisposeTask(decoratedRun, w);
//以NewThreadScheduler為例,這裡執行的是NewThreadWorker中的schedule方法
w.schedule(task, delay, unit);
return task;
}
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
忽略disposed的影響,最後執行到scheduleActual
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
//直接提交或者進入佇列
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
可以看到熟悉的ScheduledExecutorService和Future。
所以,綜上所述,其內部也是新建立一個執行緒,結合Runnable。
5. observeOn
原理我覺得和subscribeOn沒有太大的差別。不做贅述。
最後
RxJava使用起來方便,其中包含著很多技巧,也不甚瞭解,只能是慢慢用,慢慢理。
【碼道長】公眾號,最近開始運營,有興趣的朋友歡迎來訪。