Spark定製班第9課:Spark Streaming原始碼解讀之Receiver在Driver的精妙實現全生命週期徹底研究和思考
阿新 • • 發佈:2019-02-18
本期內容:
1. Receiver啟動的方式設想
2. Receiver啟動原始碼徹底分析
1. Receiver啟動的方式設想
Spark Streaming是個執行在Spark Core上的應用程式。這個應用程式既要接收資料,還要處理資料,這些都是在分散式的叢集中進行的,應該啟動多個Job,讓它們分工並能協調。Receiver的工作是接收資料,應該是用Spark Core中的Job來實現。
Receiver啟動的設計,還要解決以下問題:
1. 一個Executor上啟動多個Receiver、而其它Executor卻空閒的負載不均衡問題;
2. Receiver啟動異常導致整個Spark Streaming應用程式失敗的問題。
2. Receiver啟動原始碼徹底分析
Spark Streaming的應用程式要處理流資料,肯定是在開始階段就要做好接收資料的準備。
Spark Streaming的應用程式程式碼定義DStream時,會定義一個或多個InputDStream。每個InputDStream分別對應有一個Receiver。
Receiver啟動全生命週期主流程圖如下:
Receiver的啟動,是在ssc.start()中。 剖析一下StreamingContext的start():
/** * Start the execution of the streams. * * @throws IllegalStateException if the StreamingContext is already stopped.
*/
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
// 啟動子執行緒,一方面為了本地初始化工作,另外一方面是不要阻塞主執行緒。
scheduler.start()
}
state = StreamingContextState.ACTIVE
} catch {
case NonFatal(e) =>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state = StreamingContextState.STOPPED
throw e
}
StreamingContext.setActiveContext(this)
}
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
// Registering Streaming Metrics at the start of the StreamingContextassert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
case STOPPED =>
thrownew IllegalStateException("StreamingContext has already been stopped")
}
}
而在JobScheduler的start方法中ReceiverTracker的start方法被呼叫,Receiver就啟動了。
JobScheduler的start:
def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) //啟動receiverTracker receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") } ReceiverTracker的start方法啟動RPC訊息通訊體,為啥呢?因為ReceiverTracker會監控整個叢集中的Receiver,Receiver轉過來要向ReceiverTrackerEndpoint彙報自己的狀態,接收的資料,包括生命週期等資訊。 ReceiverTracker.start:
/** Start the endpoint and receiver execution thread. */ def start(): Unit = synchronized { if (isTrackerStarted) { thrownew SparkException("ReceiverTracker already started") } // Receiver的啟動是依據輸入資料流的。 if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) if (!skipReceiverLaunch) launchReceivers() logInfo("ReceiverTracker started") trackerState = Started } } 基於ReceiverInputDStream(是在Driver端)來獲得具體的Receivers例項,然後再把他們分佈到Worker節點上。一個ReceiverInputDStream只對應一個Receiver。 ReceiverTracker.launchReceivers:
/** * Get the receivers from the ReceiverInputDStreams, distributes them to the * worker nodes as a parallel collection, and runs them. */ private def launchReceivers(): Unit = { val receivers = receiverInputStreams.map(nis => { // 一個數據輸入來源(receiverInputDStream)只對應一個Receiver val rcvr = nis.getReceiver() rcvr.setReceiverId(nis.id) rcvr }) runDummySparkJob() logInfo("Starting " + receivers.length + " receivers") // 此時的endpoint就是上面程式碼中在ReceiverTracker的start方法中構造的ReceiverTrackerEndpoint endpoint.send(StartAllReceivers(receivers)) } 先看其中的runDummySparkJob()。 runDummySparkJob()是為了確保所有節點活著,而且避免所有的receivers集中在一個節點上。 ReceiverTracker.runDummySparkJob():
/** * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the * receivers to be scheduled on the same node. * * TODO Should poll the executor number and wait for executors according to * "spark.scheduler.minRegisteredResourcesRatio"and * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job. */ private def runDummySparkJob(): Unit = { if (!ssc.sparkContext.isLocal) { ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() } assert(getExecutors.nonEmpty) } 再回去看ReceiverTracker.launchReceivers()中的getReceiver()。 ReceiverInputDStream.getReceiver():
/** * Gets the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation * of a ReceiverInputDStream. */ def getReceiver(): Receiver[T] //返回的是Receiver物件 ReceiverInputDStream的getReceiver()方法返回Receiver物件。 該方法實際上要靠ReceiverInputDStream的子類實現。 相應的,ReceiverInputDStream的子類中必須要實現這個getReceiver()方法。ReceiverInputDStream的子類還必須定義自己對應的Receiver子類,因為這個Receiver子類會在getReceiver()方法中用來建立這個Receiver子類的物件。 根據繼承關係,這裡看一下ReceiverInputDStream的子類SocketInputDStream中的getReceiver方法。 SocketInputDStream.getReceiver:
def getReceiver(): Receiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) } } SocketInputDStream中還定義了相應的Receiver子類SocketReceiver。SocketReceiver類中還必須定義onStart方法。
onStart方法會啟動後臺執行緒,呼叫receive方法。 private[streaming] class SocketReceiver[T: ClassTag]( host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { 再回到ReceiverTracker.launchReceivers()中,看最後的程式碼 endpoint.send(StartAllReceivers(receivers))。這個程式碼給ReceiverTrackerEndpoint物件傳送了StartAllReceivers訊息,ReceiverTrackerEndpoint物件接收後所做的處理在ReceiverTrackerEndpoint.receive中。 ReceiverTracker.ReceiverTrackerEndpoint.receive: /** RpcEndpoint to receive messages from the receivers. */ private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint { private val submitJobThreadPool = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("submit-job-thread-pool")) privateval walBatchingThreadPool = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("wal-batching-thread-pool")) @volatile private var active: Boolean = true override def receive: PartialFunction[Any, Unit] = { // Local messages case StartAllReceivers(receivers) => // schedulingPolicy排程策略 // receivers就是要啟動的receiver // getExecutors獲得叢集中的Executors的列表 // scheduleReceivers就可以確定receiver可以執行在哪些Executor上 val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) for (receiver <- receivers) { // scheduledLocations根據receiver的Id就找到了當前那些Executors可以執行 Receiverval executors = scheduledLocations(receiver.streamId) updateReceiverScheduledExecutors(receiver.streamId, executors) receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation // 上述程式碼之後要啟動的Receiver確定了,具體Receiver執行在哪些Executors上也確定了。 // 迴圈receivers,每次將一個receiver傳入過去。 startReceiver(receiver, executors) } // 用於接收RestartReceiver訊息,重新啟動Receiver. case RestartReceiver(receiver) => // Old scheduled executors minus the ones that are not active any more // 如果Receiver失敗的話,從可選列表中減去。 // 剛在排程為Receiver分配給哪個Executor的時候會有一些列可選的Executor列表 val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId) // 重新獲取Executors val scheduledLocations = if (oldScheduledExecutors.nonEmpty) { // Try global scheduling again oldScheduledExecutors } else { // 如果可選的Executor使用完了,則會重新執行rescheduleReceiver重新獲取Executor. val oldReceiverInfo = receiverTrackingInfos(receiver.streamId) // Clear "scheduledLocations" to indicate we are going to do local scheduling val newReceiverInfo = oldReceiverInfo.copy( state = ReceiverState.INACTIVE, scheduledLocations = None) receiverTrackingInfos(receiver.streamId) = newReceiverInfo schedulingPolicy.rescheduleReceiver( receiver.streamId, receiver.preferredLocation, receiverTrackingInfos, getExecutors) } // Assume there is one receiver restarting at one time, so we don't need to update // receiverTrackingInfos // 重複呼叫startReceiver startReceiver(receiver, scheduledLocations) case c: CleanupOldBlocks => receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c)) case UpdateReceiverRateLimit(streamUID, newRate) => for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) { eP.send(UpdateRateLimit(newRate)) } // Remote messagescase ReportError(streamId, message, error) => reportError(streamId, message, error) } 從註釋中可以看到,Spark Streaming指定receiver在哪些Executors上執行,而不是基於Spark Core中的Task來指定。 Spark使用submitJob的方式啟動Receiver,而在應用程式執行的時候會有很多Receiver,這個時候是啟動一個Receiver呢,還是把所有的Receiver通過這一個Job啟動? 在ReceiverTracker的receive方法中startReceiver方法第一個引數就是receiver,從實現中可以看出for迴圈不斷取出receiver,然後呼叫startReceiver。由此就可以得出一個Job只啟動一個Receiver。 如果Receiver啟動失敗,此時並不會認為是作業失敗,會重新發訊息給ReceiverTrackerEndpoint重新啟動Receiver,這樣也就確保了Receivers一定會被啟動,這樣就不會像Task啟動Receiver的話如果失敗受重試次數的影響。 ReceiverTracker.startReceiver: /** * Start a receiver along with its scheduled executors */ private def startReceiver( receiver: Receiver[_], // scheduledLocations指定的是在具體的那臺物理機器上執行。 scheduledLocations: Seq[TaskLocation]): Unit = { // 判斷下Receiver的狀態是否正常。 def shouldStartReceiver: Boolean = { // It's okay to start when trackerState is Initialized or Started !(isTrackerStopping || isTrackerStopped) } val receiverId = receiver.streamId if (!shouldStartReceiver) { // 如果不需要啟動Receiver則會呼叫 onReceiverJobFinish(receiverId) return } val checkpointDirOption = Option(ssc.checkpointDir) val serializableHadoopConf = new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration) // startReceiverFunc封裝了在worker上啟動receiver的動作。 // Function to start the receiver on the worker node val startReceiverFunc: Iterator[Receiver[_]] => Unit = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") } if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next() assert(iterator.hasNext == false) // ReceiverSupervisorImpl是Receiver的監控器,同時負責資料的寫等操作。 val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() } else { // 如果你想重新啟動receiver的話,你需要重新完成上面的排程,重新schedule,而不是Task重試。 // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. } } // Create the RDD using the scheduledLocations to run the receiver in a Spark job val receiverRDD: RDD[Receiver[_]] = if (scheduledLocations.isEmpty) { ssc.sc.makeRDD(Seq(receiver), 1) } else { val preferredLocations = scheduledLocations.map(_.toString).distinct ssc.sc.makeRDD(Seq(receiver -> preferredLocations)) } // receiverId可以看出,receiver只有一個 receiverRDD.setName(s"Receiver $receiverId") ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId") ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite())) // 每個Receiver的啟動都會觸發一個Job,而不是一個作業的Task去啟動所有的Receiver. // 應用程式一般會有很多Receiver, // 呼叫SparkContext的submitJob,為了啟動Receiver,啟動了Spark一個作業。 val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ()) // We will keep restarting the receiver job until ReceiverTracker is stopped future.onComplete { case Success(_) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } case Failure(e) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logError("Receiver has been stopped. Try to restart it.", e) logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } // 使用執行緒池的方式提交Job,這樣的好處是可以併發的啟動Receiver。 }(submitJobThreadPool) logInfo(s"Receiver ${receiver.streamId} started") } 當Receiver啟動失敗的話,就會觸發ReceiverTrackEndpoint重新啟動一個Spark Job去啟動Receiver. /** * This message will trigger ReceiverTrackerEndpoint to restart a Spark job for the receiver. */ private[streaming] case class RestartReceiver(receiver: Receiver[_]) extends ReceiverTrackerLocalMessage // 當Receiver關閉的話,並不需要重新啟動Spark Job. /** * Call when a receiver is terminated. It means we won't restart its Spark job. */ private def onReceiverJobFinish(receiverId: Int): Unit = { receiverJobExitLatch.countDown() // 使用foreach將receiver從receiverTrackingInfo中去掉。 receiverTrackingInfos.remove(receiverId).foreach { receiverTrackingInfo => if (receiverTrackingInfo.state == ReceiverState.ACTIVE) { logWarning(s"Receiver $receiverId exited but didn't deregister") } } } 回頭再看ReceiverTracker.startReceiver中的程式碼supervisor.start()。在子類ReceiverSupervisorImpl中並沒有start方法,因此呼叫的是父類ReceiverSupervisor的start方法。 ReceiverSupervisor.start:
/** Start the supervisor */ def start() { onStart() // 具體實現是子類實現的。 startReceiver() }
Receiver的啟動,是在ssc.start()中。 剖析一下StreamingContext的start():
/** * Start the execution of the streams. * * @throws IllegalStateException if the StreamingContext is already stopped.
def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) //啟動receiverTracker receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") } ReceiverTracker的start方法啟動RPC訊息通訊體,為啥呢?因為ReceiverTracker會監控整個叢集中的Receiver,Receiver轉過來要向ReceiverTrackerEndpoint彙報自己的狀態,接收的資料,包括生命週期等資訊。 ReceiverTracker.start:
/** Start the endpoint and receiver execution thread. */ def start(): Unit = synchronized { if (isTrackerStarted) { thrownew SparkException("ReceiverTracker already started") } // Receiver的啟動是依據輸入資料流的。 if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) if (!skipReceiverLaunch) launchReceivers() logInfo("ReceiverTracker started") trackerState = Started } } 基於ReceiverInputDStream(是在Driver端)來獲得具體的Receivers例項,然後再把他們分佈到Worker節點上。一個ReceiverInputDStream只對應一個Receiver。 ReceiverTracker.launchReceivers:
/** * Get the receivers from the ReceiverInputDStreams, distributes them to the * worker nodes as a parallel collection, and runs them. */ private def launchReceivers(): Unit = { val receivers = receiverInputStreams.map(nis => { // 一個數據輸入來源(receiverInputDStream)只對應一個Receiver val rcvr = nis.getReceiver() rcvr.setReceiverId(nis.id) rcvr }) runDummySparkJob() logInfo("Starting " + receivers.length + " receivers") // 此時的endpoint就是上面程式碼中在ReceiverTracker的start方法中構造的ReceiverTrackerEndpoint endpoint.send(StartAllReceivers(receivers)) } 先看其中的runDummySparkJob()。 runDummySparkJob()是為了確保所有節點活著,而且避免所有的receivers集中在一個節點上。 ReceiverTracker.runDummySparkJob():
/** * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the * receivers to be scheduled on the same node. * * TODO Should poll the executor number and wait for executors according to * "spark.scheduler.minRegisteredResourcesRatio"and * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job. */ private def runDummySparkJob(): Unit = { if (!ssc.sparkContext.isLocal) { ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() } assert(getExecutors.nonEmpty) } 再回去看ReceiverTracker.launchReceivers()中的getReceiver()。 ReceiverInputDStream.getReceiver():
/** * Gets the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation * of a ReceiverInputDStream. */ def getReceiver(): Receiver[T] //返回的是Receiver物件 ReceiverInputDStream的getReceiver()方法返回Receiver物件。 該方法實際上要靠ReceiverInputDStream的子類實現。 相應的,ReceiverInputDStream的子類中必須要實現這個getReceiver()方法。ReceiverInputDStream的子類還必須定義自己對應的Receiver子類,因為這個Receiver子類會在getReceiver()方法中用來建立這個Receiver子類的物件。 根據繼承關係,這裡看一下ReceiverInputDStream的子類SocketInputDStream中的getReceiver方法。 SocketInputDStream.getReceiver:
def getReceiver(): Receiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) } } SocketInputDStream中還定義了相應的Receiver子類SocketReceiver。SocketReceiver類中還必須定義onStart方法。
onStart方法會啟動後臺執行緒,呼叫receive方法。 private[streaming] class SocketReceiver[T: ClassTag]( host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel ) extends Receiver[T](storageLevel) with Logging { 再回到ReceiverTracker.launchReceivers()中,看最後的程式碼 endpoint.send(StartAllReceivers(receivers))。這個程式碼給ReceiverTrackerEndpoint物件傳送了StartAllReceivers訊息,ReceiverTrackerEndpoint物件接收後所做的處理在ReceiverTrackerEndpoint.receive中。 ReceiverTracker.ReceiverTrackerEndpoint.receive: /** RpcEndpoint to receive messages from the receivers. */ private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint { private val submitJobThreadPool = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("submit-job-thread-pool")) privateval walBatchingThreadPool = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("wal-batching-thread-pool")) @volatile private var active: Boolean = true override def receive: PartialFunction[Any, Unit] = { // Local messages case StartAllReceivers(receivers) => // schedulingPolicy排程策略 // receivers就是要啟動的receiver // getExecutors獲得叢集中的Executors的列表 // scheduleReceivers就可以確定receiver可以執行在哪些Executor上 val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) for (receiver <- receivers) { // scheduledLocations根據receiver的Id就找到了當前那些Executors可以執行 Receiverval executors = scheduledLocations(receiver.streamId) updateReceiverScheduledExecutors(receiver.streamId, executors) receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation // 上述程式碼之後要啟動的Receiver確定了,具體Receiver執行在哪些Executors上也確定了。 // 迴圈receivers,每次將一個receiver傳入過去。 startReceiver(receiver, executors) } // 用於接收RestartReceiver訊息,重新啟動Receiver. case RestartReceiver(receiver) => // Old scheduled executors minus the ones that are not active any more // 如果Receiver失敗的話,從可選列表中減去。 // 剛在排程為Receiver分配給哪個Executor的時候會有一些列可選的Executor列表 val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId) // 重新獲取Executors val scheduledLocations = if (oldScheduledExecutors.nonEmpty) { // Try global scheduling again oldScheduledExecutors } else { // 如果可選的Executor使用完了,則會重新執行rescheduleReceiver重新獲取Executor. val oldReceiverInfo = receiverTrackingInfos(receiver.streamId) // Clear "scheduledLocations" to indicate we are going to do local scheduling val newReceiverInfo = oldReceiverInfo.copy( state = ReceiverState.INACTIVE, scheduledLocations = None) receiverTrackingInfos(receiver.streamId) = newReceiverInfo schedulingPolicy.rescheduleReceiver( receiver.streamId, receiver.preferredLocation, receiverTrackingInfos, getExecutors) } // Assume there is one receiver restarting at one time, so we don't need to update // receiverTrackingInfos // 重複呼叫startReceiver startReceiver(receiver, scheduledLocations) case c: CleanupOldBlocks => receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c)) case UpdateReceiverRateLimit(streamUID, newRate) => for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) { eP.send(UpdateRateLimit(newRate)) } // Remote messagescase ReportError(streamId, message, error) => reportError(streamId, message, error) } 從註釋中可以看到,Spark Streaming指定receiver在哪些Executors上執行,而不是基於Spark Core中的Task來指定。 Spark使用submitJob的方式啟動Receiver,而在應用程式執行的時候會有很多Receiver,這個時候是啟動一個Receiver呢,還是把所有的Receiver通過這一個Job啟動? 在ReceiverTracker的receive方法中startReceiver方法第一個引數就是receiver,從實現中可以看出for迴圈不斷取出receiver,然後呼叫startReceiver。由此就可以得出一個Job只啟動一個Receiver。 如果Receiver啟動失敗,此時並不會認為是作業失敗,會重新發訊息給ReceiverTrackerEndpoint重新啟動Receiver,這樣也就確保了Receivers一定會被啟動,這樣就不會像Task啟動Receiver的話如果失敗受重試次數的影響。 ReceiverTracker.startReceiver: /** * Start a receiver along with its scheduled executors */ private def startReceiver( receiver: Receiver[_], // scheduledLocations指定的是在具體的那臺物理機器上執行。 scheduledLocations: Seq[TaskLocation]): Unit = { // 判斷下Receiver的狀態是否正常。 def shouldStartReceiver: Boolean = { // It's okay to start when trackerState is Initialized or Started !(isTrackerStopping || isTrackerStopped) } val receiverId = receiver.streamId if (!shouldStartReceiver) { // 如果不需要啟動Receiver則會呼叫 onReceiverJobFinish(receiverId) return } val checkpointDirOption = Option(ssc.checkpointDir) val serializableHadoopConf = new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration) // startReceiverFunc封裝了在worker上啟動receiver的動作。 // Function to start the receiver on the worker node val startReceiverFunc: Iterator[Receiver[_]] => Unit = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") } if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next() assert(iterator.hasNext == false) // ReceiverSupervisorImpl是Receiver的監控器,同時負責資料的寫等操作。 val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() } else { // 如果你想重新啟動receiver的話,你需要重新完成上面的排程,重新schedule,而不是Task重試。 // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. } } // Create the RDD using the scheduledLocations to run the receiver in a Spark job val receiverRDD: RDD[Receiver[_]] = if (scheduledLocations.isEmpty) { ssc.sc.makeRDD(Seq(receiver), 1) } else { val preferredLocations = scheduledLocations.map(_.toString).distinct ssc.sc.makeRDD(Seq(receiver -> preferredLocations)) } // receiverId可以看出,receiver只有一個 receiverRDD.setName(s"Receiver $receiverId") ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId") ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite())) // 每個Receiver的啟動都會觸發一個Job,而不是一個作業的Task去啟動所有的Receiver. // 應用程式一般會有很多Receiver, // 呼叫SparkContext的submitJob,為了啟動Receiver,啟動了Spark一個作業。 val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ()) // We will keep restarting the receiver job until ReceiverTracker is stopped future.onComplete { case Success(_) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } case Failure(e) => if (!shouldStartReceiver) { onReceiverJobFinish(receiverId) } else { logError("Receiver has been stopped. Try to restart it.", e) logInfo(s"Restarting Receiver $receiverId") self.send(RestartReceiver(receiver)) } // 使用執行緒池的方式提交Job,這樣的好處是可以併發的啟動Receiver。 }(submitJobThreadPool) logInfo(s"Receiver ${receiver.streamId} started") } 當Receiver啟動失敗的話,就會觸發ReceiverTrackEndpoint重新啟動一個Spark Job去啟動Receiver. /** * This message will trigger ReceiverTrackerEndpoint to restart a Spark job for the receiver. */ private[streaming] case class RestartReceiver(receiver: Receiver[_]) extends ReceiverTrackerLocalMessage // 當Receiver關閉的話,並不需要重新啟動Spark Job. /** * Call when a receiver is terminated. It means we won't restart its Spark job. */ private def onReceiverJobFinish(receiverId: Int): Unit = { receiverJobExitLatch.countDown() // 使用foreach將receiver從receiverTrackingInfo中去掉。 receiverTrackingInfos.remove(receiverId).foreach { receiverTrackingInfo => if (receiverTrackingInfo.state == ReceiverState.ACTIVE) { logWarning(s"Receiver $receiverId exited but didn't deregister") } } } 回頭再看ReceiverTracker.startReceiver中的程式碼supervisor.start()。在子類ReceiverSupervisorImpl中並沒有start方法,因此呼叫的是父類ReceiverSupervisor的start方法。 ReceiverSupervisor.start:
/** Start the supervisor */ def start() { onStart() // 具體實現是子類實現的。 startReceiver() }