1. 程式人生 > >【Spark核心原始碼】事件匯流排ListenerBus

【Spark核心原始碼】事件匯流排ListenerBus

目錄

訊息匯流排ListenerBus

非同步事件處理LiveListenerBus

增加事件

listenerThread處理事件


訊息匯流排ListenerBus

org.apache.spark.util.ListenerBus處理來自DAGScheduler、SparkContext、BlockManagerMasterEndpoint等元件的事件,是整個Spark框架體系內事件處理匯流排。

ListenerBus是訊息匯流排特質(不懂特質的看這裡scala特質 ),提供了增加、刪除方法以及事件執行監聽器的方法。監聽列表listeners是執行緒安全的,但是postToAll事件執行監聽器的方法卻是執行緒不安全的,所以該方法要在同一執行緒中執行。

ListenerBus特質原始碼

 既然是特質,那麼就一定會有實現類,ListenerBus的繼承結構如下:

ListenerBus繼承關係

 

非同步事件處理LiveListenerBus

org.apache.spark.scheduler.LiveListenerBus是一個核心的具體實現,LiveListenerBus實現了非同步事件處理,內建事件阻塞佇列,可通過“spark.scheduler.listenerbus.eventqueue.size”設定,預設1000。

事件佇列的設定

增加事件

既然有阻塞佇列,那麼就需要有將事件增加到佇列中的方法,post(event: SparkListenerEvent)就是幹這事的。該方法中傳遞一個SparkListenerEvent作為引數,在將事件增加到佇列中之前,需要先判斷LinveListenerBus是否還活著,如果stop了,則報錯並返回。如果不處於stop狀態,則增加到阻塞佇列中,增加成功了釋放訊號,告訴別人我增加好了。如果由於佇列已經滿了增加失敗,則會移除事件,並在刪除事件計數器中進行自增記錄,如果上一次刪除失敗事件和本次刪除失敗事件的時間間隔在60秒以上,就輸出一次錯誤報告。流程如下:

增加event到佇列的流程

程式碼如下:

/**
    * 將事件增加到eventQueue事件佇列中去
    * */
  def post(event: SparkListenerEvent): Unit = {
    // 判斷LiveListenerBus是否停止,停止就報錯並返回
    if (stopped.get) {
      // Drop further events to make `listenerThread` exit ASAP
      logError(s"$name has already stopped! Dropping event $event")
      return
    }
    // 向佇列中增加事件
    val eventAdded = eventQueue.offer(event)
    /**
      * 增加成功,釋放訊號量,加快listenerThread工作,
      * 在listenerTread中eventLock.acquire()一直在獲取訊號量
      *
      * 增加失敗,則移除事件,並對刪除事件計數器droppedEventsCounter自增
      * */
    if (eventAdded) {
      eventLock.release()
    } else {
      onDropEvent(event)
      droppedEventsCounter.incrementAndGet()
    }

    // 如果有失敗的時間,60秒列印一次日誌
    val droppedEvents = droppedEventsCounter.get
    if (droppedEvents > 0) {
      // Don't log too frequently
      if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
        // There may be multiple threads trying to decrease droppedEventsCounter.
        // Use "compareAndSet" to make sure only one thread can win.
        // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
        // then that thread will update it.
        if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
          val prevLastReportTimestamp = lastReportTimestamp
          lastReportTimestamp = System.currentTimeMillis()
          logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
            new java.util.Date(prevLastReportTimestamp))
        }
      }
    }
  }

上面提到增加成功之後釋放訊號量,既然有釋放訊號量,那麼就一定會有接收訊號量,在listenerThread中接收訊號量並處理新增加的也就是未處理的事件。增加成功後立刻釋放訊號量這個操作的好處就在於,只要增加成功就立刻處理,不需要管後面的操作。

listenerThread處理事件

下面就看一下listenerThread是如何處理事件的,listenerThread程式碼如下:

private val listenerThread = new Thread(name) {
    setDaemon(true)
    override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
      LiveListenerBus.withinListenerThread.withValue(true) {
        while (true) {
          eventLock.acquire()// 獲取訊號量
          self.synchronized {
            processingEvent = true
          }
          try {
            // 從佇列中獲取事件
            val event = eventQueue.poll 
            if (event == null) {
              // Get out of the while loop and shutdown the daemon thread
              if (!stopped.get) {
                throw new IllegalStateException("Polling `null` from eventQueue means" +
                  " the listener bus has been stopped. So `stopped` must be true")
              }
              return
            }
            postToAll(event) // 處理事件
          } finally {
            self.synchronized {
              processingEvent = false
            }
          }
        }
      }
    }
  }

Utils.tryOrStopSparkContext(sparkContext)方法會保證線上程中丟擲異常後,會有啟動一個新的執行緒來停止SparkContext。

執行緒會不斷獲取訊號量,獲取到訊號量,說明還有事件未處理,並同步設定processingEvent狀態,表示我正在執行任務。然後從事件佇列中獲取事件,並執行事件監聽器,處理事件postToAll(event)的方法是LiveListenerBus的超類ListenerBus中的方法。執行過後同步設定processingEvent狀態,表示我執行完任務了。