【Spark核心原始碼】事件匯流排ListenerBus
目錄
訊息匯流排ListenerBus
org.apache.spark.util.ListenerBus處理來自DAGScheduler、SparkContext、BlockManagerMasterEndpoint等元件的事件,是整個Spark框架體系內事件處理匯流排。
ListenerBus是訊息匯流排特質(不懂特質的看這裡scala特質 ),提供了增加、刪除方法以及事件執行監聽器的方法。監聽列表listeners是執行緒安全的,但是postToAll事件執行監聽器的方法卻是執行緒不安全的,所以該方法要在同一執行緒中執行。
既然是特質,那麼就一定會有實現類,ListenerBus的繼承結構如下:
非同步事件處理LiveListenerBus
org.apache.spark.scheduler.LiveListenerBus是一個核心的具體實現,LiveListenerBus實現了非同步事件處理,內建事件阻塞佇列,可通過“spark.scheduler.listenerbus.eventqueue.size”設定,預設1000。
增加事件
既然有阻塞佇列,那麼就需要有將事件增加到佇列中的方法,post(event: SparkListenerEvent)就是幹這事的。該方法中傳遞一個SparkListenerEvent作為引數,在將事件增加到佇列中之前,需要先判斷LinveListenerBus是否還活著,如果stop了,則報錯並返回。如果不處於stop狀態,則增加到阻塞佇列中,增加成功了釋放訊號,告訴別人我增加好了。如果由於佇列已經滿了增加失敗,則會移除事件,並在刪除事件計數器中進行自增記錄,如果上一次刪除失敗事件和本次刪除失敗事件的時間間隔在60秒以上,就輸出一次錯誤報告。流程如下:
程式碼如下:
/**
* 將事件增加到eventQueue事件佇列中去
* */
def post(event: SparkListenerEvent): Unit = {
// 判斷LiveListenerBus是否停止,停止就報錯並返回
if (stopped.get) {
// Drop further events to make `listenerThread` exit ASAP
logError(s"$name has already stopped! Dropping event $event")
return
}
// 向佇列中增加事件
val eventAdded = eventQueue.offer(event)
/**
* 增加成功,釋放訊號量,加快listenerThread工作,
* 在listenerTread中eventLock.acquire()一直在獲取訊號量
*
* 增加失敗,則移除事件,並對刪除事件計數器droppedEventsCounter自增
* */
if (eventAdded) {
eventLock.release()
} else {
onDropEvent(event)
droppedEventsCounter.incrementAndGet()
}
// 如果有失敗的時間,60秒列印一次日誌
val droppedEvents = droppedEventsCounter.get
if (droppedEvents > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
new java.util.Date(prevLastReportTimestamp))
}
}
}
}
上面提到增加成功之後釋放訊號量,既然有釋放訊號量,那麼就一定會有接收訊號量,在listenerThread中接收訊號量並處理新增加的也就是未處理的事件。增加成功後立刻釋放訊號量這個操作的好處就在於,只要增加成功就立刻處理,不需要管後面的操作。
listenerThread處理事件
下面就看一下listenerThread是如何處理事件的,listenerThread程式碼如下:
private val listenerThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
LiveListenerBus.withinListenerThread.withValue(true) {
while (true) {
eventLock.acquire()// 獲取訊號量
self.synchronized {
processingEvent = true
}
try {
// 從佇列中獲取事件
val event = eventQueue.poll
if (event == null) {
// Get out of the while loop and shutdown the daemon thread
if (!stopped.get) {
throw new IllegalStateException("Polling `null` from eventQueue means" +
" the listener bus has been stopped. So `stopped` must be true")
}
return
}
postToAll(event) // 處理事件
} finally {
self.synchronized {
processingEvent = false
}
}
}
}
}
}
Utils.tryOrStopSparkContext(sparkContext)方法會保證線上程中丟擲異常後,會有啟動一個新的執行緒來停止SparkContext。
執行緒會不斷獲取訊號量,獲取到訊號量,說明還有事件未處理,並同步設定processingEvent狀態,表示我正在執行任務。然後從事件佇列中獲取事件,並執行事件監聽器,處理事件postToAll(event)的方法是LiveListenerBus的超類ListenerBus中的方法。執行過後同步設定processingEvent狀態,表示我執行完任務了。