RxJava執行緒切換原理
1 前言
執行緒的切換都是通過subscribeOn或者observeOn來進行,生產者的執行執行緒只受subscribeOn控制,不受observeOn影響。subscribeOn指定的執行緒環境能一直維持到第一次observeOn出現之前。要講執行緒切換原理之前,我們先來看一下下面的幾個類定義:
- Operator
/**
* Operator function for lifting into an Observable.
*/
publicinterfaceOperator<R, T>extendsFunc1<Subscriber<?super R>,Subscriber<?super T>>{
// cover for generics insanity
}
Operator是Observable中定義的介面,即使用者的邏輯操作,RxJava框架會呼叫lift方法將Operator包裝成為Observable。
- ObserveOnSubseriber
ObserveOnSubseriber是被訂閱者的類,處理使用者資料邏輯,也即生產者,用來產生使用者資料的。
- OperatorObserveOn
OperatorObserveOn是訂閱者的類,接收資料的,也即消費者,消費生產者傳送過來的資料。
- Worker
Worker是執行緒真正執行的地方,也就是單獨新建的一個執行緒或執行緒池中的某個執行緒。
2 原理解析
我們先建立一個Observable:
Observable.just(null)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe
just方法又是呼叫的ScalarSynchronousObservable,然後new一個OnSubscribe作為建構函式的引數,暫且叫做1號OnSubscribe,這個下面會再提到,也是執行緒切換的區別所在:
protectedScalarSynchronousObservable(final T t){
super(newOnSubscribe<T>(){
@Override
publicvoid call(Subscriber<?super T> s){
s.onNext(t);
s.onCompleted();
}
});
this.t = t;
}
執行緒切換的要點在lift()函式裡面,都是基於同一個基礎的變換方法: lift(Operator)
先來看一下它的原始碼:
publicfinal<R>Observable<R> lift(finalOperator<?extends R,?super T>operator){
returnnewObservable<R>(newOnSubscribe<R>(){
@Override
publicvoid call(Subscriber<?super R> o){
try{
Subscriber<?super T> st = hook.onLift(operator).call(o);
try{
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st);
}catch(Throwable e){
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
if(e instanceofOnErrorNotImplementedException){
throw(OnErrorNotImplementedException) e;
}
st.onError(e);
}
}catch(Throwable e){
if(e instanceofOnErrorNotImplementedException){
throw(OnErrorNotImplementedException) e;
}
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
});
}
我們可以看到,呼叫lift()方法後(即執行subscribeOn或observeOn),是返回一個新的Observable,而不是呼叫者的Observable,這裡同樣是重新建立了一個OnSubscribe,暫且叫做2號OnSubscribe,我們再回頭看看,這個OnSubscribe與前面提到的just()方法裡面呼叫到的OnSubscribe不是同一個物件。
這裡是執行緒切換的關鍵點。當呼叫鏈來到lift()方法後,使用的是lift()所返回的新的 Observable,也就是它所觸發的onSubscribe.call(subscriber)也是用新的Observable中的新 OnSubscribe,即我們上面命名的2號OnSubscribe。
3 OperatorSubscribeOn
再來看lift()函式的原始碼,它的第二個try方法體裡面有個onSubscribe,這個OnSubscribe就是我們前面定義的1號onSubscribe,它就是我們呼叫just()方法後建立的原始Observable。
那它是怎麼做到切換執行緒的呢?如上面的例子,subscribeOn(Schedulers.io()),它通過下面的程式碼(舉例)產生一個新的Subscriber:
Subscriber<?super T> st = hook.onLift(operator).call(o);//將新的Subscriber物件o傳遞給OperatorSubscribeOn,它裡面的call()方法去建立新的Worker執行緒
//OperatorSubscribeOn的call(o)方法
@Override
publicSubscriber<?superObservable<T>> call(finalSubscriber<?super T> subscriber){
finalWorker inner = scheduler.createWorker();//新建執行緒
subscriber.add(inner);
returnnewSubscriber<Observable<T>>(subscriber){
@Override
publicvoid onCompleted(){
// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
}
@Override
publicvoid onError(Throwable e){
subscriber.onError(e);
}
@Override
publicvoid onNext(finalObservable<T> o){
inner.schedule(newAction0(){
@Override
publicvoid call(){
finalThread t =Thread.currentThread();
o.unsafeSubscribe(newSubscriber<T>(subscriber){
@Override
publicvoid onCompleted(){
subscriber.onCompleted();
}
@Override
publicvoid onError(Throwable e){
subscriber.onError(e);
}
@Override
publicvoid onNext(T t){
subscriber.onNext(t);
}
@Override
publicvoid setProducer(finalProducer producer){
subscriber.setProducer(newProducer(){
@Override
publicvoid request(finallong n){
if(Thread.currentThread()== t){//如果是當前執行緒,則在當前執行緒執行
// don't schedule if we're already on the thread (primarily for first setProducer call)
// see unit test 'testSetProducerSynchronousRequest' for more context on this
producer.request(n);
}else{//不是當前執行緒,將在新建立的Worker執行緒inner中執行
inner.schedule(newAction0(){
@Override
publicvoid call(){
producer.request(n);
}
});
}
}
});
}
});
}
});
}
};
}
然後,通過呼叫1號OnSubscribe的call()方法 onSubscribe.call(st) 將新建立的Subscriber與原始的Observable關聯起來,即新的Subscriber去訂閱原始的Observable。這樣,生產者
通過上面的程式碼可以知道,Scheduler類其實並不負責非同步執行緒處理,它只負責通過createWorker()類創建出一個Worker物件,真正負責任務的延時處理。
4 OperatorObserveOn
observeOn方法內部也是呼叫了lift()方法,然後建立一個operator,
//OperatorObserveOn.java
@Override
publicSubscriber<?super T> call(Subscriber<?super T> child){
if(scheduler instanceofImmediateScheduler){
// avoid overhead, execute directly
return child;
}elseif(scheduler instanceofTrampolineScheduler){
// avoid overhead, execute directly
return child;
}else{
ObserveOnSubscriber<T> parent =newObserveOnSubscriber<T>(scheduler, child);
parent.init();
return parent;
}
}
publicObserveOnSubscriber(Scheduler scheduler,Subscriber<?super T> child){
this.child = child;
this.recursiveScheduler = scheduler.createWorker();//建立新的worker執行緒
if(UnsafeAccess.isUnsafeAvailable()){
queue =newSpscArrayQueue<Object>(RxRingBuffer.SIZE);
}else{
queue =newSynchronizedQueue<Object>(RxRingBuffer.SIZE);
}
this.scheduledUnsubscribe =newScheduledUnsubscribe(recursiveScheduler);
}
protectedvoid schedule(){
if(COUNTER_UPDATER.getAndIncrement(this)==0){
recursiveScheduler.schedule(action);//用相應的執行緒進行資料輸出排程
}
}
結合扔物線大大的圖如下:
未完待續......