1. 程式人生 > >深入理解Spark之ListenerBus監聽器

深入理解Spark之ListenerBus監聽器

ListenerBus對消費佇列的實現

這裡寫圖片描述
上圖為LiveListenerBus類的申明
self => 這句相當於給this起了一個別名為self

LiveListenerBus負責將SparkListenerEvents非同步傳送過已經註冊過的SparkListeners,在SparkContext中,首先會建立LiveListenerBus例項,這個類的功能是

  • 儲存有訊息佇列負責訊息的快取
  • 儲存有註冊過的listener,負責訊息分發

Listener連結串列儲存在ListenerBus類中,為了保證併發的安全性,這裡使用了CopyOnWriteArrayList類來儲存Listener,關於CopyOnWriteArrayList的更多資訊請點選

CopyOnWriteArrayList

訊息佇列儲存在LiveListenerBus中
這裡寫圖片描述

從圖中可以看到佇列長度可以自己配置,但是spark設定了預設值10000,當快取事件數量達到上限後,新來的事件被丟棄,具體的丟棄函式在LiveListenerBus類中如下所示

這裡寫圖片描述

通過上面的程式碼可以看到,處理的方式輸出日誌,並且使用logDroppedEvent來保證輸出僅為一次

我們繼續看訊息機制的核心,訊息佇列的生產與消費

下面的程式碼是訊息的消費程式碼

listenerThread,來從訊息佇列中取得訊息並進行分發,為了保證生產者和消費者對訊息佇列的併發訪問,在每次需要獲取訊息的時候,呼叫eventLock.acquire()來獲取訊號量, 訊號量的值就是當前佇列中所含有的事件數量.如果正常獲取到事件,就呼叫postToAll將事件分發給所有listener, 繼續下一次迴圈. 如果獲取到null值, 則有下面兩種情況:

  1. stopped值被設定為true,整個應用正常的結束
  2. 系統發生了錯誤,立即終止執行

下面是生產者程式碼
這裡寫圖片描述

該函式用來將事件放入到訊息佇列中,每成功放入一個就呼叫eventLock.release()來增加Semaphore訊號量,供消費者消費,如果佇列滿了,就呼叫onDropEvent來處理。而真正的消費路由是通過SparkListenerBus的doPostEvent來處理

這裡寫圖片描述

doPostEvent會根據不同的訊息型別,呼叫listener對應的方法進行處理

訊息佇列的建立和傳送流程

這裡寫圖片描述

在SparkContext中會

  1. 建立LiveListenerBus類型別的成員變數listenerBus
  2. 建立各種listener,並加入到listenerBus中
  3. post一些事件到listenerBus中
  4. 呼叫listenerBus.start()來啟動事件處理程式

listenerBus.start()呼叫之前,可以向其中post訊息,這些訊息會被快取起來,等start函式啟動之後,消費者執行緒會分發這些快取的訊息,listenerBus.start()是在SparkContext中的setupAndStartListenerBus()函式被呼叫。
這裡寫圖片描述
這裡寫圖片描述
這段程式碼運用反射機制來建立spark.extraListeners引數指定的類,對於被建立類的構造器有如下的要求

  1. 有一個單引數構造器並且引數為SparkConf型別,則該引數構造器被呼叫
  2. 第一種情況不滿足的情況下,如果有無參構造器則將被呼叫
  3. 前兩種情況都不滿足的情況下,程式以丟擲異常結束

這裡寫圖片描述

在extraListener被構造並且被註冊之後,listenerBus.start被呼叫

於此同時,啟動消費者執行緒listenerThread,並且開始進行訊息路由,在程式結束後,會呼叫stop函式
這裡寫圖片描述

從上述stop()程式碼可以看到,在stop函式中呼叫了eventLock.release()增加訊號量,然而並未向訊息佇列中新增訊息所以,消費者執行緒listenerThread讀取佇列時會返回null值,進而達到結束listenerThread執行緒的目的