1. 程式人生 > >Spark叢集啟動流程-Worker啟動-原始碼分析

Spark叢集啟動流程-Worker啟動-原始碼分析

Spark叢集啟動流程-Worker啟動-原始碼分析

上篇文章介紹了Master啟動(Master啟動點選:https://blog.csdn.net/weixin_43637653/article/details/84073849
),接下來,我們在原始碼裡繼續分析Worker的啟動

總結:(和Master幾分相似)

​1.建立ActorSystem物件,並將初始化引數傳入
2.建立了屬於Worker的Actor
3.啟動生命週期方法(preStart),向Master進行註冊,通過Master的Url獲取Master的actor,
4.Worker接收到Master傳送過來的註冊成功資訊,然後更新url
5.呼叫定時器,定時地向Master傳送心跳資訊

全部圖解:
在這裡插入圖片描述
詳解:

在啟動之時,Master和Worker是同時啟動,我們進入到Worker這個類中來進行分析

同樣,我們找到他的伴生物件裡的main方法

在這裡插入圖片描述

與Master裡相似,開始初始化了一些用於啟動actor的引數,接下來封裝到args裡將其傳到startSystemAndActor的方法裡,這個startSystemAndActor方法就在下面,可以看到它裡面同樣呼叫了AkkaUtils工具包,並呼叫了createActorSystem方法,來建立這個actorSystem

在這裡插入圖片描述

那麼這個createActorSystem方法呢,,它裡面定義了一個函式,和呼叫了startServiceOnPort這個方法,這個函式是傳給了startServiceOnPort方法

在這裡插入圖片描述

通過startServiceOnPort方法,我們拿到actorSyatem這個物件

在這裡插入圖片描述

然後回退到開始建立actorSyatem物件的那,這時候我們就有了這個actorSyatem這個物件,往下看,會發現,它同樣呼叫了這個actorOf這個方法,然後建立了屬於Worker的Actor

在這裡插入圖片描述

用反射的方式,new一個Worker,一旦new了這個worker,那麼它裡面的主構造器就開始初始化了,一直往下執行到preStart方法

在這裡插入圖片描述

這個preStart生命週期方法裡,它有一個registerWithMaster方法,這個方法用來向Master來進行註冊的,怎麼註冊的呢?我們點進去看:

在這裡插入圖片描述

它裡面用了一個模式匹配,這裡面最主要的是tryRegisterAllMasters方法,他就是嘗試所有的Master進行註冊

如果嘗試沒有成功,下面有個嘗試的時間的閾值registrationRetryTimer,那麼這裡面有個ReregisterWithMaster方法,其實這個ReregisterWithMaster方法和tryRegisterAllMasters方法裡面是一樣的,我們先進ReregisterWithMaster方法裡看看:

在這裡插入圖片描述

這裡面有個for迴圈,將master的Url遍歷出來,然後通過Master的Url獲取Master的actor,接下來,actor向Master傳送註冊資訊。就是通過一個!的方式非同步傳送資訊,點進去RegisterWorker裡:

在這裡插入圖片描述

這裡我們接收到Worker傳送過來的註冊資訊,然後判斷是否是standby,如果是standby模式,則說明都不做,//idToWorker就是所有註冊過的id,然後將傳進來的id與之比較 //看是否包含這個id,如果包含,說明之前註冊過,如果註冊過,則不需要再註冊,也不需要做任何。else 然後把傳送過來的資訊封裝到WorkerInfo賦給worker,下一步判斷,將封裝的WorkerInfo資訊傳給registerWorker方法裡進行一些過濾處理,保證裡面是一些有效的worker。再往下,判斷記憶體裡儲存了wrker資訊,然後呼叫持久化引擎往磁盤裡儲存一份,目的是為了防止
在這裡插入圖片描述

丟失。可以點開這個addWorker方法,這裡是用persist,裡面封裝有worker的id和worker的資訊。

然後,向Worker響應註冊成功的資訊,併發送MasterUrl,並且開始排程資源。

-我們點進去RegisteredWorker方法,到receiveWithLogging裡

在這裡插入圖片描述

receiveWithLogging裡Worker接收到Master傳送過來的註冊成功的資訊,實際上就是傳送Url。“changeMaster(masterUrl, masterWebUiUrl)”就是把active的Url更新一下,然後又呼叫了一個定時器,定時的向Master傳送心跳資訊,從0 millis開始到15秒(60/4),也就是說,沒15秒傳送一次心跳,那麼傳送心跳的邏輯,就是在SendHeartbeat裡,裡面是判斷connected,connected的預設值為false。其實發送資訊,就是往master傳了一個workerId
在這裡插入圖片描述
在這裡插入圖片描述

那麼Master接收到Worker傳送過來的資訊後,通過ge方法將workerid進行模式匹配,在some裡,拿到workerInfo資訊,把最後一次心跳時間更改為系統當前時間,至此,這是一次心跳時間,這個每隔15秒進行檢查更新,保證時間最新,一旦長時間未做響應,那麼就是這兩個節點網路斷開了或者worker死掉了,這樣就會被那個檢查超時的定時器監測到,並進行刪除這個資訊。

那麼至此,叢集啟動完成。