EventBus 訊息的執行緒切換模型與實現原理
一. 序
EventBus 是一個基於觀察者模式的事件訂閱/釋出框架,利用 EventBus 可以在不同模組之間,實現低耦合的訊息通訊。
EventBus 因為其使用簡單且穩定,被廣泛應用在一些生產專案中。
通常我們就是使用 EventBus 分發一些訊息給訊息的訂閱者,除此之外我們還可以通過 EventBus 將訊息傳遞到不同的執行緒中去執行,處理訊息。這其中還涉及到一些執行緒切換問題、執行緒池的問題,在使用的過程中,還有一些配置的選擇,此時我們需要根據不同的業務場景,來選擇不同的執行緒切換方式。
本文就 EventBus 的幾種執行緒切換方式,以及內部的實現原來,來分析如何使用 EventBus 來切換訊息執行緒。
二. EventBus 的執行緒切換
2.1 EventBus 切換執行緒
EventBus 是一個基於觀察者模式的事件訂閱/釋出框架。利用 EventBus 可以在不同模組之間,實現低耦合的訊息通訊。
EventBus 誕生以來這麼多年,在很多生產專案中都可以看到它的身影。而從更新日誌可以看到,除了體積小,它還很穩定,這兩年就沒更新過,最後一次更新也只是因為支援所有的 JVM,讓其使用範圍不僅僅侷限在 Android 上。
可謂是非常的穩定,穩定到讓人有一種感覺,要是你使用 EventBus 出現了什麼問題,那一定是你使用的方式不對。
EventBus 的使用方式,對於 Android 老司機來說,必然是不陌生的,相關資料太多,這裡就不再贅述了。
在 Android 下,執行緒的切換是一個很常用而且很必須的操作,EventBus 除了可以訂閱和傳送訊息之外,它還可以指定接受訊息處理訊息的執行緒。
也就是說,無論你 post()
訊息時處在什麼執行緒中,EventBus 都可以將訊息分發到你指定的執行緒上去,聽上去就感覺非常的方便。
不過無論怎麼切換,無外乎幾種情況:
- UI 執行緒切子執行緒。
- 子執行緒切 UI 執行緒。
- 子執行緒切其他子執行緒。
在我們使用 EventBus 註冊訊息的時候,可以通過 @Subscribe
註解來完成註冊事件, @Subscribe
中可以通過引數 threadMode
來指定使用那個執行緒來接收訊息。
@Subscribe(threadMode = ThreadMode.MAIN)
fun onEventTest(event:TestEvent){
// 處理事件
}
threadMode
是一個 enum,有多種模式可供選擇:
- POSTING,預設值,那個執行緒發就是那個執行緒收。
- MAIN,切換至主執行緒接收事件。
- MAIN_ORDERED,v3.1.1 中新增的屬性,也是切換至主執行緒接收事件,但是和 MAIN 有些許區別,後面詳細講。
- BACKGROUND,確保在子執行緒中接收事件。細節就是,如果是主執行緒傳送的訊息,會切換到子執行緒接收,而如果事件本身就是由子執行緒發出,會直接使用傳送事件訊息的執行緒處理訊息。
- ASYNC,確保在子執行緒中接收事件,但是和 BACKGROUND 的區別在於,它不會區分發送執行緒是否是子執行緒,而是每次都在不同的執行緒中接收事件。
EventBus 的執行緒切換,主要涉及的方法就是 EventBus 的 postToSubscription()
方法。
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
可以看到,在 postToSubscription()
方法中,對我們配置的 threadMode 值進行了處理。
這端程式碼邏輯非常的簡單,接下來我們看看它們執行的細節。
2.2 切換至主執行緒接收事件
想在主執行緒接收訊息,需要配置 threadMode 為 MAIN。
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
這一段的邏輯很清晰,判斷是主執行緒就直接處理事件,如果是非主執行緒,就是用 mainThreadPoster 處理事件。
追蹤 mainThreadPoster
的程式碼,具體的邏輯程式碼都在 HandlerPoster 類中,它實現了 Poster 介面,這就是一個普通的 Handler,只是它的 Looper 使用的是主執行緒的 「Main Looper」,可以將訊息分發到主執行緒中。
為了提高效率,EventBus 在這裡還做了一些小優化,值得我們借鑑學習。
為了避免頻繁的向主執行緒 sendMessage()
,EventBus 的做法是在一個訊息裡儘可能多的處理更多的訊息事件,所以使用了 while 迴圈,持續從訊息佇列 queue 中獲取訊息。
同時為了避免長期佔有主執行緒,間隔 10ms (maxMillisInsideHandleMessage = 10ms)會重新發送 sendMessage()
,用於讓出主執行緒的執行權,避免造成 UI 卡頓和 ANR。
MAIN
可以確保事件的接收,在主執行緒中,需要注意的是,如果事件就是在主執行緒中傳送的,則使用 MAIN
會直接執行。為了讓開發和可配置的成都更高,在 EventBus v3.1.1 新增了 MAIN_ORDERED
,它不會區分當前執行緒,而是通通使用 mainThreadPoster
來處理,也就是必然會走一遍 Handler 的訊息分發。
當事件需要在主執行緒中處理的時候,要求不能執行耗時操作,這沒什麼好說的,另外對於 MAIN
或者 MAIN_ORDERED
的選擇,就看具體的業務要求了。
2.3 切換至子執行緒執行
想要讓訊息在子執行緒中處理,可以配置 threadMode 為 BACKGROUND
或者 AYSNC
,他們都可以實現,但是也有一些區別。
先來看看 BACKGROUND
,通過 postToSubscription()
中的邏輯可以看到,BACKGROUND
會區分當前發生事件的執行緒,是否是主執行緒,非主執行緒這直接分發事件,如果是主執行緒,則 backgroundPoster
來分發事件。
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
BackgroundPoster 也實現了 Poster 介面,其中也維護了一個用連結串列實現的訊息佇列 PendingPostQueue,
在一些編碼規範裡就提到,不要直接建立執行緒,而是需要使用執行緒池。EventBus 也遵循這個規範,在 BackgroundPoster 中,就使用了 EventBus 的 executorService
執行緒池物件去執行。
為了提高效率,EventBus 在處理 BackgroundPoster 時,也有一些小技巧值得我們學習。
可以看到,在 BackgroundPoster 中,處理主執行緒丟擲的事件時,同一時刻只會存在一個執行緒,去迴圈從佇列中,獲取事件處理事件。
通過 synchronized 同步鎖來保證佇列資料的執行緒安全,同時利用 volatile 標識的 executorRunning
來保證不同執行緒下看到的執行狀態是可見的。
既然 BACKGROUND
在處理任務的時候,只會使用一個執行緒,但是 EventBus 卻用到了執行緒池,看似有點浪費。但是再繼續瞭解 ASYNC
的實現,才知道怎麼樣是對執行緒池的充分利用。
和前面介紹的 threadMode 一樣,大多數都對應了一個 Poster,而 ASYNC
對應的 Poster 是 AsyncPoster,其中並沒有做任何特殊的處理,所有的事件,都是無腦的拋給 EventBus 的 executorService
這個執行緒池去處理,這也就保證了,無論如何發生事件的執行緒,和接收事件的執行緒,必然是不同的,也保證了一定會在子執行緒中處理事件。
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
到這裡應該就理解了 BACKGROUND
和 ASYNC
,雖然都可以保證在子執行緒中接收處理事件,但是內部實現是不同的。
BACKGROUND
同一時間,只會利用一個子執行緒,來迴圈從事件佇列中獲取事件並進行處理,也就是前面的事件的執行效率,會影響後續事件的執行。例如你分發了一個事件,使用的是 BACKGROUND
但是佇列前面還有一個耗時操作,那你分發的這個事件,也必須等待佇列前面的事件都處理完成才可以繼續執行。所以如果你追求執行的效率,立刻馬上就要執行的事件,可以使用 ASYNC
。
那是不是都用 ASYNC
就好了?當然這種一攬子的決定都不會好,具體問題具體分析,ASYNC
也有它自己的問題。
ASYNC
會無腦的向執行緒池 executorService 傳送任務,而這個執行緒池,如果你不配置的話,預設情況下使用的是 Executors 的 newCachedThreadPool()
建立的。
這裡我又要說到編碼規範了,不推薦使用 Executors 直接建立執行緒,之所以這樣,其中一個原因在於執行緒池對任務的拒絕策略。 newCachedThreadPool
則會建立一個無界佇列,來存放執行緒池暫時無法處理的任務,說到無界佇列,拍腦袋就能想到,當任務(事件)過多時,會出現的 OOM。
這也確實是 EventBus 在使用 ASYNC
時,真實存在的問題。
但是其實這裡讓開發者自己去配置,也很難配置一個合理的執行緒池的拒絕策略,拒絕時必然會放棄一些任務,也就是會放棄掉一些事件,任何放棄策略都是不合適的,這在 EventBus 的使用中,表現出來就是出現邏輯錯誤,該收到的事件,收不到了。所以你看,這裡無界佇列不合適,但是不用它呢也不合適,唯一的辦法就是合理的使用 ASYNC
,只在必要且合理的情況下,才去使用它。
三. 小結時刻
到這裡基本上 EventBus 在分發事件時的執行緒切換,就講清除了,很多資料裡其實都寫了他們可以切換執行緒,但是對於一些使用的細節,描述的並不清除,正好藉此文,把 EventBus 的執行緒切換的直接講清除。
EventBus 也是簡歷上比較常見的高頻詞,我在面試的過程中,也經常會問面試者,關於它是如何做到執行緒切換的問題。但是正因為它簡單易用,其實很多時候我們都忽略了它的實現細節。
今天就到這裡,小結一下:
1. EventBus 可以通過 threadMode 來配置接收事件的執行緒。
2. MAIN 和 MAIN_ORDERED 都會在主執行緒接收事件,區別在於是否區分,發生事件的執行緒是否是主執行緒。
3. BACKGROUND 確保在子執行緒中接收執行緒,它會通過執行緒池,使用一個執行緒迴圈處理所有的事件。所以事件的執行時機,會受到事件佇列前面的事件處理效率的影響。
4. ASYNC 確保在子執行緒中接收事件,區別於 BACKGROUND,ASYNC 會每次向執行緒池中傳送任務,通過執行緒池的排程去執行。但是因為執行緒池採用的是無界佇列,會導致 ASYNC 待處理的事件太多時,會導致 OOM。
本文就到這裡,本文對你有幫助嗎?留言、轉發、收藏是最大的支援,謝謝!
本文首發自公眾號「承香墨影」,歡迎關注獲取最新的原創文章。公眾號後臺回覆成長『成長』,將會得到我準備的學習資料,。
相關推薦
EventBus 訊息的執行緒切換模型與實現原理
一. 序 EventBus 是一個基於觀察者模式的事件訂閱/釋出框架,利用 EventBus 可以在不同模組之間,實現低耦合的訊息通訊。 EventBus 因為其使用簡單且穩定,被廣泛應用在一些生產專案中。 通常我們就是使用 EventBus 分發一些訊息給訊息的訂閱者,除此之外我們還可以通過 EventBu
程序與執行緒(三)——執行緒的概念與實現
一 什麼是執行緒? 程序中的一條執行流程。 有了執行緒,程序發生了一系列的變化。首先是(1)資源管理,包括地址空間(程式碼段,資料段):程序就是由來管理資源的:地址空間,開啟的檔案,訪問的網路。(2)執行緒把程序的另一部分功能給拆出來了。 程序的執行功能,程序的
Java執行緒詳解(3)-執行緒棧模型與執行緒的變數
要理解執行緒排程的原理,以及執行緒執行過程,必須理解執行緒棧模型。 執行緒棧是指某時刻時記憶體中執行緒排程的棧資訊,當前呼叫的方法總是位於棧頂。執行緒棧的內容是隨著程式的執行動態變化的,因此研究執行緒棧必須選擇一個執行的時刻(實際上指程式碼執行到什麼地方)。
唯品會多執行緒Redis設計與實現
宣告:本文來自京東張開濤的微信公眾號(kaitao-1234567),授權CSDN轉載,如需轉載請聯絡作者。 作者:申政,開源愛好者,唯品會高階DBA,主要負責Redis相關領域的原始碼研究和研發工作。 責編:錢曙光,關注架構和演算法領域,尋求報道或者投稿
根據Linux 執行緒掛起與喚醒原理,實現Sleep的暫停與繼續
在呼叫pthread_cond_wait()前必須由本執行緒加鎖(pthread_mutex_lock()),而在更新條件等待佇列以前,mutex保持鎖定狀態,並在執行緒掛起進入等待前解鎖。在條件滿足從而離開pthread_cond_wait()之前,mutex將被重新加鎖,以與進入pthread_cond_
Java併發(4)深入分析java執行緒池框架及實現原理(一)
先說說我個人對執行緒池的理解:執行緒池顧名思義是一個裝有很多執行緒的池子,這個池子維護著從執行緒建立到銷燬的怎個生命週期以及執行緒的分配,使用者只需要把任務提交給這個執行緒池而不用去關心執行緒池如何建立執行緒,執行緒池會自己給這些任務分配執行緒資源來完成任務。 java的E
硬核乾貨:4W字從原始碼上分析JUC執行緒池ThreadPoolExecutor的實現原理
![](https://throwable-blog-1256189093.cos.ap-guangzhou.myqcloud.com/202008/j-u-c-t-p-e-logo.png) ## 前提 很早之前就打算看一次JUC執行緒池`ThreadPoolExecutor`的原始碼實現,由於近段時間
netty原始碼解解析(4.0)-6 執行緒模型-IO執行緒EventLoopGroup和NIO實現(一)
介面定義 io.netty.channel.EventLoopGroup extends EventExecutorGroup 方法 說明
netty原始碼解解析(4.0)-7 執行緒模型-IO執行緒EventLoopGroup和NIO實現(二)
把NIO事件轉換成對channel unsafe的呼叫或NioTask的呼叫 processSelectedKeys()方法是處理NIO事件的入口: private void processSelectedKeys() { if (selectedKeys != null) {
Java多執行緒的實現(程序與執行緒的概念、Java繼承Thread類實現多執行緒、Java實現Runnable介面實現多執行緒、Thread與Runnable的區別、實現Callable介面實現多執行緒)
1 程序與執行緒 1.1 程序與執行緒的概念 什麼是程序? 程序: 作業系統中一個程式的執行週期。(比如我們想要在電腦上登入QQ,從雙擊qq按鈕---->關閉qq這個過程就是一個程序) 多程序: 同一時刻跑多個程式。 在DOS(磁碟作業系統時
Linux利用訊號量實現執行緒的同步與互斥
執行緒使用互斥鎖可以實現執行緒間的互斥,而互斥鎖本身就是對資源的一種標識狀態,當可以申請到鎖時說明此時資源可以使用,當申請鎖失敗時說明資源此時被其他執行緒所佔用不可使用,我們可以使用訊號量來代替互斥鎖實現。 訊號量用來表示資源數目,當一個執行緒要去訪問資源時,必須先去申請
RxJava的訊息訂閱和執行緒切換原理
本文由玉剛說寫作平臺提供寫作贊助,版權歸玉剛說微信公眾號所有 原作者:四月葡萄 版權宣告:未經玉剛說許可,不得以任何形式轉載 1.前言 本文主要是對RxJava的訊息訂閱和執行緒切換進行原始碼分析,相關的使用方式等不作詳細介紹。 本文原始碼基於
Linux中執行緒的同步與互斥、生產者消費模型和讀者寫者問題、死鎖問題
執行緒的同步與互斥 執行緒是一個存在程序中的一個執行控制流,因為執行緒沒有程序的獨立性,在程序內部執行緒的大部分資源資料都是共享的,所以在使用的過程中就需要考慮到執行緒的安全和資料的可靠。不能因為執行緒之間資源的競爭而導致資料發生錯亂,也不能因為有些執行緒因為
【執行緒的同步與互斥 (互斥量 條件變數 訊號量)】生產者與消費者模型
執行緒 執行緒是程序中的一個獨立的執行流,由環境(包括暫存器集和程式計數器)和一系列要執行的置零組成。所有程序至少有一個執行緒組成,多執行緒的程序包括多個執行緒,所有執行緒共享為程序分配的公共地址空間,所以文字段(Text Segment)和資料段(Datan
ThreadPoolExecutor執行緒池解析與BlockingQueue的三種實現
目的 主要介紹ThreadPoolExecutor的用法,和較淺顯的認識,場景的使用方案等等,比較忙碌,如果有錯誤還請大家指出 ThreadPoolExecutor介紹 ThreadPoolExecutor的完整構造方法的簽名如下 ThreadP
javaSE高階開發多執行緒——1 程序與執行緒 and 2 多執行緒的實現
一、程序與執行緒 1.程序的概念 執行緒隸屬於某個程序,程序是一個程式的執行週期,但是我們的執行緒是執行程序中的某個任務 所以如果程序不存在的話,那麼執行緒自然也就不會存在了。 我們應該時刻將執行緒和任務對等起來,執行一個程式啟動一個程序。這樣就可以提升沃恩程式的執行
程序切換與執行緒切換的代價比較
程序切換分兩步:1.切換頁目錄以使用新的地址空間2.切換核心棧和硬體上下文對於linux來說,執行緒和程序的最大區別就在於地址空間,對於執行緒切換,第1步是不需要做的,第2是程序和執行緒切換都要做的。切換的效能消耗:1、執行緒上下文切換和程序上下問切換一個最主要的區別是執行緒
Rxjava 子執行緒 主執行緒 切換 簡單實現
package com.zgt.demo01.rxjava; import com.zgt.demo01.os2.Handler; import com.zgt.demo01.os2.Message; public abstract class MObse
使用Rxjava替代Handler實現單純的執行緒切換
標題中所謂單純的執行緒切換是指不攜帶任何資料實現執行緒切換,當然要攜帶資料的話使用Rxjava更加常見,相比Handler也簡單得多。一般使用Rxjava都會有用create、just、from進行傳參的操作,但有時候我不需要傳參只是想切換一下執行緒而已。例如使
Java的多執行緒程式設計模型5--從AtomicInteger開始(自增長實現)
AtomicInteger,一個提供原子操作的Integer的類。在Java語言中,++i和i++操作並不是執行緒安全的,在使用的時候,不可避免的會用到synchronized關鍵字。而AtomicInteger則通過一種執行緒安全的加減操作介面。 來看AtomicIn