Spark原始碼解析之SparkStreaming中Receiver的啟動
本篇博文我們主要分析SparkStreaming中的Receiver啟動的過程。我們都知道StreamingContext是SparkStreaming程式的的主要入口,我們先看一下它的部分原始碼:
class StreamingContext private[streaming] (
sc_ : SparkContext,
cp_ : Checkpoint,
batchDur_ : Duration
) extends Logging {
//DStreamGraph中儲存了一系列的DStream之間的依賴關係
private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
cp_.graph.setContext(this)
cp_.graph.restoreCheckpointData()
cp_.graph
} else {
require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
val newGraph = new DStreamGraph()
newGraph.setBatchDuration(batchDur_)
newGraph
}
}
private [streaming] val scheduler = new JobScheduler(this)
}
在這裡我們可以看到當初始化一個StreamingContext物件的時候,就會新建一個DStreamGraph和JobScheduler物件,在這裡DStreamGraph主要儲存了DStream和它們之間的依賴關係
我們看看JobScheduler中的下面程式碼:
//建立JobScheduler的時候就會建立JobGenerator
private val jobGenerator = new JobGenerator(this)
我們就可以看到在建立JobScheduler後隨即就會建立JobGenerator,至於有什麼作用,我們後面再說
當開始啟動一個StreamingContext的時候,我們首先需要呼叫StreamingContext#start方法
def start(): Unit = synchronized {
...
scheduler.start()
...
}
在它的start方法中主要就是呼叫了jobScheduler的start方法
...
//建立了receiverTracker元件,資料接收相關,建立並啟動
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
receiverTracker.start()
//啟動Generator
//至此,我們說的StreamingContext的幾個相關的元件就都創建出來了
// 然後就是啟動輸入DStream關聯的Receiver
//邏輯都在ReceiverTracker的start方法中
jobGenerator.start()
...
在jobSceduler的start方法中,會建立一個ReceiverTracker物件,然後呼叫其start方法
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {
//endpoint為ReceiverTrackerEndpoint
//初始化一個endpoint,用來接收和處理來自ReceiverTracker和Receiver傳送的訊息
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
//這個start的方法主要的就是呼叫了launchReceivers方法,這個ReceiverTracker的主要作用就是啟動Receiver
//將各個receivers分發到executors上去
if (!skipReceiverLaunch) launchReceivers()
logInfo("ReceiverTracker started")
trackerState = Started
}
}
先判斷一下當前的receiverInputStreams陣列是否為空,因為在SparkStreaming中除了ReceiverInputStream還有InputStream,類似於讀取hadoop目錄中的檔案就不需要Receiver這個元件,所以這邊需要過濾一下,然後我們發現初始化了一個ReceiverTrackerEndPoint,這個元件的主要功能就是用來處理髮送給ReceiverTracker的訊息,最最重要的是後面執行了一個launchReceivers方法
private def launchReceivers(): Unit = {
//獲取DStreamGraph.inputStreams的receivers,即資料接收器
val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
runDummySparkJob()
//給訊息接收處理器endpoint傳送StartALLReceivers訊息,直接返回不等待訊息被處理
endpoint.send(StartAllReceivers(receivers))
}
先獲取與receiverInputStreams對應的receivers,然後向ReceiverTrackerEndpoint傳送startAllReceivers訊息,rte接收到訊息後的處理如下:
case StartAllReceivers(receivers) =>
//根據流資料接收器分發策略,匹配流資料接收器Receiver和Executor
val scheduledExecutors = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
//遍歷每一個receiver,根據上面得到的目的executors呼叫startReceiver方法
for (receiver <- receivers) {
val executors = scheduledExecutors(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
//將流資料接收器的最佳位置儲存起來
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
//在指定的executor中啟動receiver
startReceiver(receiver, executors)
}
首先會為每一個receiver按照一定的策略指定一個executor,然後呼叫startReceiver(receiver, executors)
在指定的executor上啟動receiver
private def startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String]): Unit = {
...
val serializableHadoopConf =
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// Function to start the receiver on the worker node
//這是一個將要被提交到executor上的函式,用來啟動receiver,
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
//建立一個ReceiverSupervisorImpl物件supervisor
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
//呼叫start方法來啟動receiver
supervisor.start()
//呼叫awaitTermination來阻塞住主執行緒
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}
// Create the RDD using the scheduledExecutors to run the receiver in a Spark job
//將Receiver與其相對應的executor封裝成一個RDD
val receiverRDD: RDD[Receiver[_]] =
if (scheduledExecutors.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
ssc.sc.makeRDD(Seq(receiver -> scheduledExecutors))
}
receiverRDD.setName(s"Receiver $receiverId")
ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
//將封裝了receiver和其對應的executor的RDD,以及要在ReceiverRDD上執行的函式組裝成Job提交
// 從而正真地在executor上啟動receiver
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
case Success(_) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.", e)
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
}(submitJobThreadPool)
logInfo(s"Receiver ${receiver.streamId} started")
}
在這裡,我們將會看到SparkStreaming設計時最精妙的地方,首先將receiver及其對應的executor轉換成一個RDD,然後將封裝了receiver和其對應的executor的RDD,以及要在ReceiverRDD上執行的函式startReceiverFunc組裝成Job提交,當任務提交的時候,在executors上就會呼叫這個函式對傳入的RDD進行處理,然後我們看到在startReceiverFunc函式中,會呼叫supervisor#start方法
def start() {
onStart()
startReceiver()
}
上面的方法是在其父類ReceiverSupervisor上的,然後會繼續呼叫父類的startReceiver方法
def startReceiver(): Unit = synchronized {
try {
//先呼叫ReceiverSupervisorImpl的onReceiverStart方法進行註冊
//如果註冊成功,則繼續進行流資料接收器Receiver的啟動
if (onReceiverStart()) {
logInfo("Starting receiver")
receiverState = Started
//receiver中就是真正的資料接收
receiver.onStart()
logInfo("Called receiver onStart")
} else {
// The driver refused us
//如果Driver端的TrackReceiver拒絕註冊或者註冊失敗,則停止流資料接收器
//併發送登出流資料接收器DeregisterReceiver訊息
stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
}
} catch {
case NonFatal(t) =>
stop("Error starting receiver " + streamId, Some(t))
}
}
在這個方法中就會呼叫receiver的onStart方法進行接收資料,通常在onStart方法裡面會開啟一個執行緒,例如我們的socketReceiver中的原始碼就是這樣的:
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
if (!isStopped()) {
restart("Socket data stream had no more data")
} else {
logInfo("Stopped receiving")
}
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case NonFatal(e) =>
logWarning("Error receiving data", e)
restart("Error receiving data", e)
} finally {
if (socket != null) {
socket.close()
logInfo("Closed socket to " + host + ":" + port)
}
}
}
至此我們的SparkStreaming的Receiver的啟動過程就分析完了