1. 程式人生 > >SparkContext的初始化(季篇)——測量系統、ContextCleaner及環境更新

SparkContext的初始化(季篇)——測量系統、ContextCleaner及環境更新

《深入理解Spark:核心思想與原始碼分析》一書第一章的內容請看連結《第1章 環境準備》

《深入理解Spark:核心思想與原始碼分析》一書第二章的內容請看連結《第2章 SPARK設計理念與基本架構》

由於本書的第3章內容較多,所以打算分別開闢四篇隨筆分別展現。

本文展現第3章第三部分的內容:

3.9 啟動測量系統MetricsSystem

MetricsSystem使用codahale提供的第三方測量倉庫Metrics,有關Metrics的具體資訊可以參考附錄D。MetricsSystem中有三個概念:

q  Instance:指定了誰在使用測量系統;

q  Source:指定了從哪裡收集測量資料;

q  Sink:指定了往哪裡輸出測量資料。

Spark按照Instance的不同,區分為Master、Worker、Application、Driver和Executor。

Spark目前提供的Sink有ConsoleSink、CsvSink、JmxSink、MetricsServlet、GraphiteSink等。

Spark中使用MetricsServlet作為預設的Sink。

MetricsSystem的啟動程式碼如下。

val metricsSystem = env.metricsSystem
  metricsSystem.start()

MetricsSystem的啟動過程包括以下步驟:

1) 註冊Sources;

2) 註冊Sinks;

3) 給Sinks增加Jetty的ServletContextHandler。

MetricsSystem啟動完畢後,會遍歷與Sinks有關的ServletContextHandler,並呼叫attachHandler將它們繫結到SparkUI上。

metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

3.9.1 註冊Sources

  registerSources方法用於註冊Sources,它的實現見程式碼清單3-44。註冊Sources的過程分為以下步驟:

1) 從metricsConfig獲取Driver的Properties,預設為建立MetricsSystem的過程中解析的{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json}。

2) 從Driver的Properties中用正則匹配以source.開頭的屬性。然後將屬性中的Source反射得到的例項,加入ArrayBuffer[Source]。

3) 將每個Source的metricRegistry(也是MetricSet的子型別)註冊到ConcurrentMap<String, Metric> metrics。這裡的registerSource方法已在3.8.2節講解過。

程式碼清單3-44         MetricsSystem

private def registerSources() {

    val instConfig = metricsConfig.getInstance(instance)
    val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX) 

    // Register all the sources related to instance
    sourceConfigs.foreach { kv =>
      val classPath = kv._2.getProperty("class")
      try {
        val source = Class.forName(classPath).newInstance()
        registerSource(source.asInstanceOf[Source])
      } catch {
        case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
      }
    }
  }

3.9.2 註冊Sinks

  registerSinks方法用於註冊Sinks,它的實現見程式碼清單3-45。註冊Sinks的步驟如下:

1) 從Driver的Properties中用正則匹配以sink.開頭的屬性,如:{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json}。將其轉換為Map(servlet -> {class=org.apache.spark.metrics.sink.MetricsServlet, path=/metrics/json})。

2) 將子屬性class對應的類metricsServlet反射得到MetricsServlet例項。如果屬性的key是servlet,將其設定為metricsServlet;如果是Sink,則加入到ArrayBuffer[Sink]中。

程式碼清單3-45         MetricsSystem註冊Sinks的實現

  private def registerSinks() {
    val instConfig = metricsConfig.getInstance(instance)
    val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
sinkConfigs.foreach { kv => val classPath = kv._2.getProperty("class") if (null != classPath) { try { val sink = Class.forName(classPath) .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) .newInstance(kv._2, registry, securityMgr) if (kv._1 == "servlet") { metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) } else { sinks += sink.asInstanceOf[Sink] } } catch { case e: Exception => logError("Sink class "+ classPath + " cannot be instantialized",e) } } } }

3.9.3給Sinks增加Jetty的ServletContextHandler

MetricsSystem的getServletHandlers方法,實現如下。

  def getServletHandlers = {
    require(running, "Can only call getServletHandlers on a running MetricsSystem")
    metricsServlet.map(_.getHandlers).getOrElse(Array())
  }

可以看到呼叫了metricsServlet的getHandlers,其實現如下。

def getHandlers = Array[ServletContextHandler](
    createServletHandler(servletPath,
      new ServletParams(request => getMetricsSnapshot(request), "text/json"), securityMgr)
  )

最終生成處理/metrics/json請求的ServletContextHandler,而請求的真正處理由getMetricsSnapshot方法,利用fastjson解析。生成的ServletContextHandler通過SparkUI的attachHandler方法,也被繫結到SparkUI。createServletHandler與attachHandler方法都已經在3.4.4節詳細闡述。最終我們可以使用以下這些地址來訪問測量資料。

3.10 建立和啟動ExecutorAllocationManager

  ExecutorAllocationManager用於動態分配executor,建立和啟動ExecutorAllocationManager的程式碼如下。

  private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =

    if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
      Some(new ExecutorAllocationManager(this, listenerBus, conf))
    } else {
      None
    }

  executorAllocationManager.foreach(_.start())

預設情況下不會建立ExecutorAllocationManager,可以修改屬性spark.dynamicAllocation.enabled為true來建立。ExecutorAllocationManager可以設定動態分配最小Executor數量、動態分配最大Executor數量、每個Executor可以執行的Task數量等配置資訊,並對配置資訊進行校驗。start方法將ExecutorAllocationListener加入到listenerBus中,ExecutorAllocationListener通過監聽listenerBus裡的事件,動態新增刪除executor。並且通過Thread不斷的新增executor,並且遍歷executor,將超時的executor殺掉並且移除。ExecutorAllocationListener的實現與其他SparkListener類似,不再贅述。ExecutorAllocationManager的關鍵程式碼見程式碼清單3-46。

程式碼清單3-46         ExecutorAllocationManagerr的關鍵程式碼

  private val intervalMillis: Long = 100
  private var clock: Clock = new RealClock
  private val listener = new ExecutorAllocationListener

  def start(): Unit = {
    listenerBus.addListener(listener)
    startPolling()
  }

  private def startPolling(): Unit = {

    val t = new Thread {
      override def run(): Unit = {
        while (true) {
          try {
            schedule()
          } catch {
            case e: Exception => logError("Exception in dynamic executor allocation thread!", e)
          }

          Thread.sleep(intervalMillis)
        }
      }
    }

    t.setName("spark-dynamic-executor-allocation")
    t.setDaemon(true)
    t.start()
  }

根據3.4.1節的內容,我們知道listenerBus內建了執行緒listenerThread,此執行緒不斷從eventQueue中拉出事件物件,呼叫監聽器的監聽方法。要啟動此執行緒,需要呼叫listenerBus的start方法,程式碼如下。

  listenerBus.start()

3.11 ContextCleaner的建立與啟動

  由於配置屬性spark.cleaner.referenceTracking預設是true,所以會構造並啟動ContextCleaner,程式碼如下。

  private[spark] val cleaner: Option[ContextCleaner] = {
    if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
      Some(new ContextCleaner(this))
    } else {
      None
    }
  }

  cleaner.foreach(_.start())

ContextCleaner用於清理那些超出應用範圍的RDD、ShuffleDependency和Broadcast物件。ContextCleaner的組成如下:

q  referenceQueue:快取頂級的AnyRef引用;

q  referenceBuffer:快取AnyRef的虛引用;

q  listeners:快取清理工作的監聽器陣列;

q  cleaningThread:用於具體清理工作的執行緒。

ContextCleaner的工作原理和listenerBus一樣,也採用監聽器模式,由執行緒來處理,此執行緒實際只是呼叫keepCleaning方法。keepCleaning的實現見程式碼清單3-47。

程式碼清單3-47         ContextCleaner的實現

  private def keepCleaning(): Unit = Utils.logUncaughtExceptions {

    while (!stopped) {
      try {
        val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
          .map(_.asInstanceOf[CleanupTaskWeakReference])

        // Synchronize here to avoid being interrupted on stop()
synchronized { reference.map(_.task).foreach { task => logDebug("Got cleaning task " + task) referenceBuffer -= reference.get
task match { case CleanRDD(rddId) => doCleanupRDD(rddId, blocking = blockOnCleanupTasks) case CleanShuffle(shuffleId) => doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) case CleanBroadcast(broadcastId) => doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) } } } } catch { case ie: InterruptedException if stopped => // ignore case e: Exception => logError("Error in cleaning thread", e) } } }

3.12 Spark環境更新

  在SparkContext的初始化過程中,可能對其環境造成影響,所以需要更新環境,程式碼如下。

postEnvironmentUpdate()
postApplicationStart()

SparkContext初始化過程中,如果設定了spark.jars屬性, spark.jars指定的jar包將由addJar方法加入到httpFileServer的jarDir變數指定的路徑下。spark.files指定的檔案將由addFile方法加入到httpFileServer的fileDir變數指定的路徑下。見程式碼清單3-48。

程式碼清單3-48         依賴檔案處理

  val jars: Seq[String] =
    conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten

  val files: Seq[String] =
    conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten

// Add each JAR given through the constructor
if (jars != null) { jars.foreach(addJar) } if (files != null) { files.foreach(addFile) }

httpFileServer的addFile和addJar方法,見程式碼清單3-49。 

程式碼清單3-49         HttpFileServer提供對依賴檔案的訪問

  def addFile(file: File) : String = {
    addFileToDir(file, fileDir)
    serverUri + "/files/" + file.getName
  }

  def addJar(file: File) : String = {
    addFileToDir(file, jarDir)
    serverUri + "/jars/" + file.getName
  }

  def addFileToDir(file: File, dir: File) : String = {
    if (file.isDirectory) {
      throw new IllegalArgumentException(s"$file cannot be a directory.")
    }

    Files.copy(file, new File(dir, file.getName))
    dir + "/" + file.getName
  }

postEnvironmentUpdate的實現見程式碼清單3-50,其處理步驟如下:

1) 通過呼叫SparkEnv的方法environmentDetails最終影響環境的JVM引數、Spark 屬性、系統屬性、classPath等,參見程式碼清單3-51。

2) 生成事件SparkListenerEnvironmentUpdate,並post到listenerBus,此事件被EnvironmentListener監聽,最終影響EnvironmentPage頁面中的輸出內容。

程式碼清單3-50         SparkContext環境更新

  private def postEnvironmentUpdate() {

    if (taskScheduler != null) {
      val schedulingMode = getSchedulingMode.toString
      val addedJarPaths = addedJars.keys.toSeq
      val addedFilePaths = addedFiles.keys.toSeq
      val environmentDetails =
        SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)
      val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)

      listenerBus.post(environmentUpdate)
    }
  }

程式碼清單3-51         environmentDetails的實現

   val jvmInformation = Seq(
      ("Java Version", s"$javaVersion ($javaVendor)"),
      ("Java Home", javaHome),
      ("Scala Version", versionString)
    ).sorted

    val schedulerMode =
      if (!conf.contains("spark.scheduler.mode")) {
        Seq(("spark.scheduler.mode", schedulingMode))
      } else {
        Seq[(String, String)]()
      }

    val sparkProperties = (conf.getAll ++ schedulerMode).sorted

    // System properties that are not java classpaths
val systemProperties = Utils.getSystemProperties.toSeq val otherProperties = systemProperties.filter { case (k, _) => k != "java.class.path" && !k.startsWith("spark.") }.sorted // Class paths including all added jars and files val classPathEntries = javaClassPath .split(File.pathSeparator) .filterNot(_.isEmpty) .map((_, "System Classpath"))
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User")) val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted Map[String, Seq[(String, String)]]( "JVM Information" -> jvmInformation, "Spark Properties" -> sparkProperties, "System Properties" -> otherProperties, "Classpath Entries" -> classPaths) }

postApplicationStart方法很簡單,只是向listenerBus傳送了SparkListenerApplicationStart事件,程式碼如下。

listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
  startTime, sparkUser))

3.13 建立DAGSchedulerSource和BlockManagerSource

  在建立DAGSchedulerSource、BlockManagerSource之前首先呼叫taskScheduler的postStartHook方法,其目的是為了等待backend就緒,見程式碼清單3-52。postStartHook的實現見程式碼清單3-53。

  建立DAGSchedulerSource和BlockManagerSource的過程類似於ExecutorSource,只不過DAGSchedulerSource測量的資訊是stage. failedStages、stage. runningStages、stage. waitingStages、stage. allJobs、stage. activeJobs,BlockManagerSource測量的資訊是memory. maxMem_MB、memory. remainingMem_MB、memory. memUsed_MB、memory. diskSpaceUsed_MB。

程式碼清單3-52         建立DAGSchedulerSource和BlockManagerSource

  taskScheduler.postStartHook()

  private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
  private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)

  private def initDriverMetrics() {
    SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
    SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
  }

  initDriverMetrics()

程式碼清單3-53         等待backend就緒的實現

override def postStartHook() {
    waitBackendReady()
  }

private def waitBackendReady(): Unit = {
    if (backend.isReady) {
      return
    }

    while (!backend.isReady) {
      synchronized {
        this.wait(100)
      }
    }
  }

3.14 將SparkContext標記為啟用

  SparkContext初始化的最後將當前SparkContext的狀態從contextBeingConstructed(正在構建中)改為activeContext(已啟用),程式碼如下。

SparkContext.setActiveContext(this, allowMultipleContexts)

setActiveContext方法的實現如下。

  private[spark] def setActiveContext(
      sc: SparkContext,
      allowMultipleContexts: Boolean): Unit = {

    SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
      assertNoOtherContextIsRunning(sc, allowMultipleContexts)
      contextBeingConstructed = None
      activeContext = Some(sc)
    }
}

3.15 小結

  回顧本章, Scala與Akka基於Actor的併發程式設計模型給人帶來深刻的印象,改變了我本人每當需要提升效能時就想到使用多執行緒的傳統觀念,Actor與事件模型有類似之處,通過非同步處理,減少執行緒切換開銷,值得開發人員借鑑。listenerBus對於監聽器模式的經典應用將處理轉化為事件並交給統一的執行緒處理,減少了執行緒阻塞與切換,提升了效能,希望讀者朋友能應用到自己的產品開發中去。此外,使用Netty所提供的非同步網路框架構建的Block傳輸服務,基於Jetty構建的內嵌web服務、HTTP檔案伺服器和SparkUI,基於codahale提供的第三方測量倉庫建立的測量系統,Executor中的心跳實現等內容,都值得借鑑。

後記:自己犧牲了7個月的週末和下班空閒時間,通過研究Spark原始碼和原理,總結整理的《深入理解Spark:核心思想與原始碼分析》一書現在已經正式出版上市,目前亞馬遜、京東、噹噹、天貓等網站均有銷售,歡迎感興趣的同學購買。我開始研究原始碼時的Spark版本是1.2.0,經過7個多月的研究和出版社近4個月的流程,Spark自身的版本迭代也很快,如今最新已經是1.6.0。目前市面上另外2本原始碼研究的Spark書籍的版本分別是0.9.0版本和1.2.0版本,看來這些書的作者都與我一樣,遇到了這種問題。由於研究和出版都需要時間,所以不能及時跟上Spark的腳步,還請大家見諒。但是Spark核心部分的變化相對還是很少的,如果對版本不是過於追求,依然可以選擇本書。

相關推薦

SparkContext初始——測量系統ContextCleaner環境更新

《深入理解Spark:核心思想與原始碼分析》一書第一章的內容請看連結《第1章 環境準備》 《深入理解Spark:核心思想與原始碼分析》一書第二章的內容請看連結《第2章 SPARK設計理念與基本架構》 由於本書的第3章內容較多,所以打算分別開闢四篇隨筆分別展現。 本文展現第3章第三部分的內

SparkContext初始——TaskScheduler的啟動

《深入理解Spark:核心思想與原始碼分析》一書第一章的內容請看連結《第1章 環境準備》 《深入理解Spark:核心思想與原始碼分析》一書第二章的內容請看連結《第2章 SPARK設計理念與基本架構》 由於本書的第3章內容較多,所以打算分別開闢四篇隨筆分別展現。 本文展現第3章第三部分的內容:

《深入理解SPARK:核心思想與原始碼分析》——SparkContext初始——SparkUI環境變數排程

《深入理解Spark:核心思想與原始碼分析》一書第一章的內容請看連結《第1章 環境準備》 《深入理解Spark:核心思想與原始碼分析》一書第二章的內容請看連結《第2章 SPARK設計理念與基本架構》 由於本書的第3章內容較多,所以打算分別開闢四篇隨筆分別展現。 本文展現第3章第二部分的內容:

《深入理解Spark:核心思想與原始碼分析》——SparkContext初始——執行環境與元資料清理器

《深入理解Spark:核心思想與原始碼分析》一書第一章的內容請看連結《第1章 環境準備》 《深入理解Spark:核心思想與原始碼分析》一書第二章的內容請看連結《第2章 SPARK設計理念與基本架構》 由於本書的第3章內容較多,所以打算分別開闢四篇隨筆分別展現。本文展現第3章第一部分的內容: 第3章

TCP/IP協議棧初始十一完結-完成IP層與網絡卡的連線

上回ICMP的插曲說完了,把一個ICMP socket的建立流程說完了。對於資料結構關係圖沒有加入什麼新元素。執行的流程是從inet_family_ops到inet_create,raw_prot,這樣的執行順序。此時完成的只是ICMP協議的處理socket。繼

SQL Server 2017 AlwaysOn AG 自動初始十二

class 無法 增加 tle 完整 之前 join 截斷 51cto 何時不使用自動種子設定在某些情況下,自動種子設定可能不是初始化次要副本的最優選擇。 自動種子設定過程中,SQL Server 通過網絡執行備份以進行初始化。 如果數據庫非常大或者次要副本是遠程副本,此過

SQL Server 2017 AlwaysOn AG 自動初始十一

自動 進行 備份 情況下 耗時 server serve 使用 日誌 何時不使用自動種子設定在某些情況下,自動種子設定可能不是初始化次要副本的最優選擇。 自動種子設定過程中,SQL Server 通過網絡執行備份以進行初始化。 如果數據庫非常大或者次要副本是遠程副本,此過程

SQL Server 2017 AlwaysOn AG 自動初始十五

strong 恢復 種子設定 子網 通過 width SQ 對比 備份 性能測試對比分析拿xx庫來做測試,數據文件8G,備份後為600M:測試場景使用時間1通過備份恢復來創建,開啟備份壓縮1分29秒2通過自動種子設定,開啟備份壓縮1分22秒3通過自動種子設定,開啟備份壓縮,

C++11 帶來的新特性 2—— 統一初始Uniform Initialization

1 統一初始化(Uniform Initialization) 在C++ 11之前,所有物件的初始化方式是不同的,經常讓寫程式碼的我們感到困惑。C++ 11努力創造一個統一的初始化方式。 其語法是使用{}和std::initializer_list ,先看示例。 int values[

神經網路之權重初始附程式碼

摘要 神經網路/深度學習模型訓練的過程本質是對權重進行更新,在對一個新的模型進行訓練之前,需要每個引數有相應的初始值。對於多層神經網路/深度學習而言,如何選擇引數初始值便成為一個值得探討的問題。本文從實現啟用值的穩定分佈角度來探討神經網路的效率優化問題 權重在

IOC容器初始原始碼解讀

一、過程(資源定位,Bean的載入,解析,以及註冊) 第一個過程是Resource資源定位。這個Resouce指的是BeanDefinition的資源定位。這個過程就是容器找資料的過程,就像水桶裝水需要先找到水一樣 第二個過程是BeanDefinition的

C++類的靜態成員變數一定要初始分配記憶體

文章轉載自https://my.oschina.net/u/1537391/blog/219432 我們知道C++類的靜態成員變數是需要初始化的,但為什麼要初始化呢。其實這句話“靜態成員變數是需要初始化的”是有一定問題的,應該說“靜態成員變數需要定義”才是準確的,而不是初始化

學習Linux-4.12核心網路協議棧1.7——網路裝置的初始struct net_device

在linux的網路裝置裡,其中一個最關鍵的結構體應該要算net_device了,它由對應的網路裝置驅動進行建立和初始化,服務於核心網路子系統。 1. struct net_device 註釋分析 struct net_device這個結構體比較大,在瞭解它之前,我們先看一下

Java類的載入連結和初始個人筆記

這裡看到一篇比較好的文章:http://www.infoq.com/cn/articles/cf-Java-class-loader 這裡只是針對什麼時候會觸發java類的初始化(注意:這裡不是說的例項化)進行討論: 除了文章中提到的5點: 建立一個Java類的例項。如 MyClass obj =

沒有躲過的坑--map的初始插入資料

最近工作中需要使用map,進行查詢。 首先簡單介紹一點map,也許是教科書裡講授最少的STL知識吧。但是在實際工作中map挺重要的,用於查詢很方便快捷,尤其是以鍵和值的形式存在的! 1、標頭檔案 #include<map> 2、map的功能

Linux核心原始碼分析--系統時間初始kernel_mktime()函式

        從boot檔案中的幾個彙編程式執行後跳轉到init檔案中的main.c程式開始繼續執行,該main.c函式式為系統執行的環境進行初始化的。首先來看系統時間的初始化(因為系統時間的初始化開始程式就在init檔案中),其中主要還是由kernel中的mktime.

css初始淘寶

size tex css img ace ont erl adding -a <style> blockquote, body, button, dd,d

Spring概念:SpringSpringMVCSpringBoot以及SpringCloud的概念關係與區別詳解

Spring與Spring MVC Spring Spring是一個一站式的輕量級的Java開發框架 Spring是一個一站式的輕量級的Java開發框架,核心是控制反轉(IOC)和麵向切面(AOP),針對於開發的WEB層(SpringMVC)、業務層(IOC)、持久層(jdbc Te

詳解遞迴基礎———函式棧階乘Fibonacci數列

一、遞迴的基本概念 遞迴函式:在定義的時候,自己呼叫了自己的函式。 注意:遞迴函式定義的時候一定要明確結束這個函式的條件! 二、函式棧 棧:一種資料結構,它僅允許棧頂進,棧頂出,先進後出,後進先出。我們可以簡單的理解為棧就是一個杯子,這個杯子裡面有很多隔層,每一層都可以放東西,第一個放入的東西就在杯子

解決RxJava記憶體洩漏:RxLifecycle詳解原理分析

隨著RxJava及RxAndroid的逐漸推廣,使用者越來越多,但是有一個問題,RxJava的使用不當極有可能會導致記憶體洩漏。比如,使用RxJava釋出一個訂閱後,當Activity被finish,此時訂閱邏輯還未完成,如果沒有及時取消訂閱,就會導致Activity無法被回