RxJava 原始碼解讀分析 observeOn
Observable.observeOn()方法有點繞,我們一點一點看。
從上圖中,我們可以看出,observeOn主要作的工作是:
1,通過指定的scheduler來切換執行緒,用來emit資料,這個資料就是onNext(data)方法的引數。
2,emit出來的資料,先非同步的快取到一個buffer,實際上是快取到了一個Queue中,注意,每呼叫一下observeOn()方法,就會new一個queue,以此保證事件序列特性。
3,通過指定的scheduler來非同步的消費Queue中的資料
再來看看OperatorObserveOn這個類,對原始的Subscriber進行了一下非同步類包裝,如下圖:
從上圖中,可以看出,這個被包裝過的onNext方法,在當前執行緒中做了一件事重的事,將onNext(data)的引數資料,直接放到queue的尾部,最後呼叫了schedule()方法如下圖。
從圖中可以看出,emit出的資料被同步放入到queue中,緊接者,非同步的從queue中poll()出佇列頭資料,再執行使用者定義的onNext(data)方法。
再回過頭來看,lift()方法,如下圖
再來看看OnSubscribeLift類中有什麼,如下圖。
從上圖中,可以看出,call()方法中呼叫了OperatorObserveOn.call(),而OperatorObserveOn.call()是用來產生一個代理的Subscriber(指定scheduler來實現非同步操作)。
OK,我們再回頭來,理一下,整個observeOn()的呼叫流程。
OnSubscribeLift.call()
ObserveOnSubscriber subscriber= OperatorObserveOn.call()
subscriber.onStart()
onSubscribe.call(subscriber)
subscriber.onNext(data)//這個類已經被包裝了,onNext()只是在當前執行緒中,將data pull 到queue佇列尾
scheduler()
recursiveScheduler.schedule(ObserveOnSubscriber);
ObserveOnSubscriber.call()//這裡通過指定的執行緒,非同步的從queue中獲取資料
localChild.onNext(data);//localChild就是最原始的使用者定義的subscriber
結合以上說明,來看一下類圖,主要操作集中在OperatorSubscribeOn和ObserveOnSubscriber中。