1. 程式人生 > >Spark叢集啟動流程-Master啟動-原始碼分析

Spark叢集啟動流程-Master啟動-原始碼分析

Spark叢集啟動流程-Master啟動-原始碼分析

總結:

1.初始化一些用於啟動Master的引數
2.建立ActorSystem物件,並啟動Actor
3.呼叫工具類AkkaUtils工具類來建立actorSystem(用來建立Actor的物件)
4.建立屬於Master的actor,在建立actor的同時,會初始化Master
5.生命週期方法(preStart)是在構造器之後,receive方法之前執行,只會執行一次
6.啟動一個定時器,定時檢查超時的Worker
7.獲取到超時的Worker然後把他移除掉(刪除記憶體和磁盤裡的workerinfo)
8.啟動receive方法,會不斷的執行,用於接收actor傳送過來的請求

詳解:

我們先來看看啟動的命令指令碼,進入到\spark\bin\下,cat start-all.sh,可以看到,它其實內部呼叫了很多其他的指令碼

在這裡插入圖片描述

先啟動了start-master.sh,再次cat start-master.sh
在這裡插入圖片描述

可以看到,它將CLASS 1這個變數傳給了sbin目錄下的soark-daemon.sh,往上翻會看到CLASS 1的具體指的是“org.apache.spark.deploy.master.Master”,即傳的值是Master的全類名,我們cat一下soark-daemon.sh

在這裡插入圖片描述

它最終呼叫一個類來啟動服務,再cat一下start-slaves.sh

在這裡插入圖片描述

它傳入的是spark的url,這裡就是啟動了worker了,可以用start-slaves.sh單獨啟動Master和Worker

從上述中我們可以看出他們啟動的都是一個類,所以下面,我們只分析Master類和Worker類,首先將安裝包下的core包匯入到IDEA中我們來進行逐步分析(匯入略)

首先我們搜尋找到Master
在這裡插入圖片描述

這是Master類,既然要啟動就一定有main方法,一般他是放在一個伴生物件裡,我們在搜尋框裡輸入object找到伴生物件,其中有main方法

在這裡插入圖片描述

在main方法中new了一個MasterArguments並將conf和一些重要的啟動引數argStrings傳入其中,並將args傳入到startSystemAndActor這個方法中,我們先點開startSystemAndActor方法看看

它呼叫了AkkaUtils工具類下的createActorSystem方法,最後一行返回了四個值,後面三個值就是賦值給前面用_佔的位

在這裡插入圖片描述

呼叫工具類AkkaUtils來建立ActorSystem,ActorSystem就是建立Actor的方法,我們點進去createActorSystem方法看一下

在這裡插入圖片描述

這裡定義了一個startService的函式,函式裡面呼叫了doCreateActorSystem方法,就在下面,裡面就是一些初始化成<k,v>的配置資訊,翻到此方法的最下方有:
在這裡插入圖片描述

建立了ActorSystem,直接類名加小括號裡面傳參,說明是通過呼叫了ActorSystem裡的apply方法來建立了這個actorSystem例項,當然這裡可以用ActorSystem.create方法來建立。我們返回到Master裡的startSystemAndActor中,現在ActorSystem已經建立完成並且返回給了val (actorSystem,boundPort)中的actorSystem,接下來我們就用actorSystem這個例項來建立actor了,呼叫的就是actorOf方法

在這裡插入圖片描述

然後通過classOf反射的方式拿到Master這個類來啟動Master的actor,在建立actor的同時會初始化Master類。

接下來呢,將會執行一個叫preStart的生命週期方法,這個方法,是在構造器之後,receive方法之前執行,而且只執行一次。

在這裡插入圖片描述

那麼接下來就要實現這個方法裡面的內容了,那麼,根據之前的學習,我們知道,這時候呢應該要啟動一個定時器:

在這裡插入圖片描述

//第一個引數,是從什麼時候開始,第二個引數是從什麼時候結束點 // 進去會發現它預設的是60秒檢查一次worker //第三個引數是指的是自己給自己傳送檢查能否正常執行,第五個引數,是檢查 //超時的worker的邏輯,點進去則是一個模式匹配,再次點進去,會發現,其實他 就執行了一個方法timeOutDeadWorkers()

在這裡插入圖片描述

我們再次點進去timeOutDeadWorkers方法中。首先,它呼叫了系統的時間,然後,呼叫了一個變數workers,裡面的型別是new HashSet[WorkerInfo],WorkerInfo再點進去,裡面其實儲存的都是傳送過來的一些連線資訊,用HashSet,那麼他有個去重的功能。這裡不光有workers變數,還有idToWorker,這裡用HashMap來存,那麼key是id,value是這個id對應的WorkerInfo,還有addressToWorker,則是這個地址對應的WorkerInfo

在這裡插入圖片描述

在這裡插入圖片描述

那麼拿到這些資料,我們進行過濾,就是過濾超時的worker,它每次拿到一個worker,進行心跳時間的判斷,lastHeartbeat是最後一次心跳時間,它裡面是一個var的int型別,並用_來初始化。那麼當前的時間我們獲取到了減去超時的閾值60,即最後一次心跳時間 < 當前時間 - 超時時間閾值60,通過這樣過濾,那麼,當60秒沒有心跳,我們就過濾出來toArray之後賦給了toRemove。

接下來執行一個for迴圈,我們把每一次超時的worker拿出來,if進行判斷,當.state不等於DEAD時候,執行了remove,把WorkerInfo資訊移除掉

點進去這個removeWorker方法中

在這裡插入圖片描述

它將idToWorker和addressToWorker 移除掉了,這裡最後它還將磁盤裡的一份WorkerInfo也刪除了。

那麼這是preStart方法啟動完成及裡面一些重要的功能,到這裡全部啟動了

接下來,啟動一個receive方法,這裡叫receiveWithLogging,這個方法他會不但的迴圈執行,來接受actor發過來的請求。那麼到此,這個Master就啟動了

在這裡插入圖片描述

其實在啟動的過程中,Master和Worker一起啟動的,所以下一篇文章將介紹worker的啟動 https://blog.csdn.net/weixin_43637653/article/details/84099110