《深入理解Spark-核心思想與原始碼分析》讀書筆記(1)
前兩章
第一章主要是講如何安裝和配置spark,以及如何匯入spark原始碼除錯執行;第二章主要講的是上次那本書《Spark快速大資料分析》的內容,科普一下spark的知識。
第三章 SparkContext的初始化
1. 概述
這章的主要內容就是講解SparkContext的初始化。SparkContext就是所有Spark應用基礎環境而配置Spark任務則是由SparkConf來完成。SparkContext的初始化一共有以下幾步
1)建立 Spark 執行環境 SparkEnv;
2)建立 RDD 清理器 metadataCleaner;
3)建立並初始化 Spark UI;
4)Hadoop 相關配置及 Executor 環境變數的設定;
5)建立任務排程 TaskScheduler;
6)建立和啟動 DAGScheduler;
7)TaskScheduler 的啟動;
8)初始化塊管理器 BlockManager(BlockManager 是儲存體系的主要元件之一,將在第 4章介紹);
9)啟動測量系統 MetricsSystem;
10)建立和啟動 Executor 分配管理器 ExecutorAllocationManager;
11)ContextCleaner 的建立與啟動;
12)Spark 環境更新;
13)建立 DAGSchedulerSource 和 BlockManagerSource;
14)將 SparkContext 標記為啟用。
SparkContext構造器的引數就是SparkConf
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient
2. 建立執行環境SparkEnv
SparkEnv包含眾多和Executor(執行器)相關的物件。Executor就是Worker(工作節點)的一個程序。包括以下內容
1)建立安全管理器 SecurityManager;
2)建立基於 Akka 的分散式訊息系統 ActorSystem;
3)建立 Map 任務輸出跟蹤器 mapOutputTracker;
4)例項化 ShuffleManager;
5)建立 ShuffleMemoryManager;
6)建立塊傳輸服務 BlockTransferService;
7)建立 BlockManagerMaster;
8)建立塊管理器 BlockManager;
9)建立廣播管理器 BroadcastManager;
10)建立快取管理器 CacheManager;
11)建立 HTTP 檔案伺服器 HttpFileServer;
12)建立測量系統 MetricsSystem;
13)建立 SparkEnv。
2.1 安全管理器SecurityManager
用來管理系統的口令
//Set our own authenticator鑑別器 to properly專有的 negotiate協商
//userpassword for HTTP connections. This is needed by the HTTP client
//fetching from the HttpServer. Put here so its only set once.
if (authOn) {
Authenticator.setDefault(
new Authenticator() {
override def getPasswordAuthentication(): PasswordAuthentication = {
var passAuth: PasswordAuthentication = null
val userInfo = getRequestingURL().getUserInfo()
if (userInfo != null) {
val parts = userInfo.split(":", 2)
passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())
}
return passAuth
}
}
)
}
2.2 基於Akka的分散式訊息系統ActorSystem
ActorSystem是Akka提供的用於建立分散式訊息通訊系統的基礎類。SparkEnv使用了AkkaUtils.createActorSystem方法完成,而createActorSystem實際上使用了doCreaterActorSystem來創造ActorSystem。
2.3 map任務輸出跟蹤器mapOutputTracker
跟蹤map階段任務的輸出狀態,便於reduce階段任務獲取地址和中間輸出結果。所以這個mapOutputTracker就是用來管下面這些map和shuffle的,比如知道map輸出block之類的,讓reduce能找得到map的結果。
下面的程式碼是建立MapOutputTrackerMasterActor的。map任務的狀態是由Executor像持有的MapOutputTrackerMasterActor傳送訊息,講map任務狀態同步到mapOutputTracker的mapStatuses和cachedSerializedStatuses。
2.4 例項化ShuffleManager
這個就是用來管理上面那個圖裡的shuffle的
2.5 塊傳輸服務BlockTransferService
獲取遠端節點上的block的,第四章講
2.6 BlockManagerMaster介紹
這個負責對block進行管理,具體操作藉助BlockManagerMasterActor,在初始化之後,建立BlockManager
2.7 建立廣播管理器BroadcastManager
BroadcastManager是用於配置資訊和序列化後的RDD、Job以及ShuffleDEpendency等資訊在本地儲存。
2.8 建立快取管理器CacheManager
用於快取RDDM某個分割槽計算後的中間結果,第四章解釋。
2.9 HTTP檔案伺服器HttpFileServer
提供對檔案的HTTP訪問。開始時要初始化,建立檔案伺服器的根目錄和臨時目錄。建立jar包及其他檔案的檔案目錄。用start()方法啟動,而這個方法用了doStart方法,doStart方法就是各種配置server物件,然後啟動它。
def initialize() {
baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd")
fileDir = new File(baseDir, "files")
jarDir = new File(baseDir, "jars")
fileDir.mkdir()
jarDir.mkdir()
logInfo("HTTP File server directory is " + baseDir)
httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")
httpServer.start()
serverUri = httpServer.uri
logDebug("HTTP file server started at: " + serverUri)
}
2.10 建立測量系統MetricsSystem
MetricsSystem是Spark的測量系統,其作用是定期將資料指標從資料來源(source)拉到資料匯(sink)。
2.11 建立SparkEnv
當所有基礎元件準備好後,使用new Spark(……)來建立執行環境SparkEnv。
3. 建立metadataCleaner
metadataCleaner的功能是清楚過期的持久化RDD。
/**
* Runs a timer task to periodically定期地 clean up metadata (e.g. old files or hashtable entries)
*/
private[spark] class MetadataCleaner(
cleanerType: MetadataCleanerType.MetadataCleanerType,
cleanupFunc: (Long) => Unit,
conf: SparkConf)
extends Logging
{
val name = cleanerType.toString
private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType)
private val periodSeconds = math.max(10, delaySeconds / 10)
private val timer = new Timer(name + " cleanup timer", true)
private val task = new TimerTask {
override def run() {
try {
//就這,定期清理,用cleanupFunc
cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
logInfo("Ran metadata cleaner for " + name)
} catch {
case e: Exception => logError("Error running cleanup task for " + name, e)
}
}
}
if (delaySeconds > 0) {
logDebug(
"Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds " +
"and period of " + periodSeconds + " secs")
timer.schedule(task, delaySeconds * 1000, periodSeconds * 1000)
}
def cancel() {
timer.cancel()
}
}
而clean函式如下
private[spark] def cleanup(cleanupTime:Long){ persistentRdds.clearOldValues(cleanupTime)
}
4. SparkUI詳解
DAGScheduler是主要的產生各種Event的源頭,它將各種SparkListenerEvent傳送到listenerBus的時間佇列中,然後BUS把事件和具體的sparklistener匹配,最終由sparkUI展示。
4.1 listenerBus詳解
由三個部分組成。
- 事件阻塞佇列
- 監聽器陣列
- 事件匹配監聽器執行緒
事件阻塞佇列相當於排隊上車的人,而執行緒就是公交車,不停地拉去排事件阻塞佇列裡的事件與監聽器陣列匹配,然後對事件進行操作。
private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
private var started = false
// A counter that represents the number of events produced and consumed in the queue
private val eventLock = new Semaphore(0)
private val listenerThread = new Thread("SparkListenerBus") {
setDaemon(true)
override def run(): Unit = Utils.logUncaughtExceptions {
while (true) {
eventLock.acquire()
// Atomically remove and process this event
LiveListenerBus.this.synchronized {
val event = eventQueue.poll
if (event == SparkListenerShutdown) {
// Get out of the while loop and shutdown the daemon thread
return
}
Option(event).foreach(postToAll)
}
}
}
}
def start() {
if (started) {
throw new IllegalStateException("Listener bus already started!")
}
listenerThread.start()
started = true
}
def post(event: SparkListenerEvent) {
val eventAdded = eventQueue.offer(event)
if (eventAdded) {
eventLock.release()
} else {
logQueueFullErrorMessage()
}
}
def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive }
def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }
def stop() {
if (!started) {
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
}
post(SparkListenerShutdown)
listenerThread.join()
}
4.2 構造JobProgressListener
JobProgressListener用來統計Job的資訊和狀態。並在它們發生變化時進行反應。
4.3 SparkUI的建立與初始化
用create()方法加入一些listener,然後initialize()連線tab。再之後就是利用render()方法對頁面進佈局,實現顯示。
5.Hadoop相關配置及Executor環境變數
獲取Hadoop相關配置資訊和對Executor的環境變數進行配置
6.建立任務排程器TaskScheduler
z這個就是用來負責任務的提交,並且請求叢集管理器對任務排程。可以看做是任務排程的客戶端。首先createTaskScheduler要建立TaskSchedulerImpl。
6.1 建立TaskSchedulerImpl
- 從SparkConf中讀取配置資訊,包括每個任務分配的CPU數,排程模式(分為FAIR和FIFO兩種,預設為FIFO)
- 建立TaskResultGetter,它的作用是通過執行緒池對Worker上的Executor傳送的Task的執行結果進行處理。
排程方式最終落實到介面SchedulerBackend上實現
6.2 TaskSchedulerImpl的初始化
1)使 TaskSchedulerImpl 持有 LocalBackend 的引用。
2)建立 Pool,Pool 中快取了排程佇列、排程演算法及 TaskSetManager 集合等資訊。
3)建立 FIFOSchedulableBuilder,FIFOSchedulableBuilder 用來操作 Pool 中的排程佇列。
7.建立和啟動DAGScheduler
DAGScheduler 主要用於在任務正式交給 TaskSchedulerImpl 提交之前做一些準備工作,包 括: 創 建 Job, 將 DAG 中 的 RDD 劃 分 到 不 同 的 Stage, 提 交 Stage, 等 等。
此節留以後詳細說明
8.TashScheduler的啟動
此節留以後詳細說明
9.啟動測量系統MetricsSystem
這個測量系統有三個概念。
- Instance:指定了誰在使用測量系統
- Source:指定了從哪裡收集測量資料
- Sink:指定了往哪裡輸出測量資料
啟動過程包括 1)註冊Source 2)註冊Sinks 3)給Sinks增加Jetty的ServletContextHandler
private def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
// Register all the sources related to instance
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
val source = Utils.classForName(classPath).newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
}
}
}
給Sinks增加Jetty的ServletContextHandler主要是為了和SparkUI同步,用到了getServletHandler方法,最終生成處理 /metrics/json請求的ServletContextHandler。
10.啟動和建立ExecutorAllocationManager
ExecutorAllocationManager用與對已經分配的Executor進行管理。
def start(): Unit = {
listenerBus.addListener(listener)
val scheduleTask = new Runnable() {
override def run(): Unit = {
try {
schedule()
} catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
}
}
}
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}
其中start方法就將listener加入bus中,通過監聽事件,動態新增、刪除Executor。
11.ContextCleaner的建立和啟動
用於清理那些超出應用範圍的RDD、ShuffleDepency和Broadcast物件。
/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.map(_.task).foreach { task =>
logDebug("Got cleaning task " + task)
referenceBuffer -= reference.get
task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
}
}
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning thread", e)
}
}
}
12.Spark環境更新
在SparkContext的初始化過程中,可能對其環境造成影響,所以需要更新環境,程式碼如下。
postEnviromentUpdate()
postApplicationStart()
13.建立DAGSchedulerSource和BlockManagerSource
private def initDriveMetrics(){
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
}
initDriveMetrics()
14.將SparkContext標記為啟用
SparkContext.setActiveContext(this,allowMultipleContexts)