1. 程式人生 > >Spark原始碼解析之SparkStreaming中Receiver的啟動

Spark原始碼解析之SparkStreaming中Receiver的啟動

本篇博文我們主要分析SparkStreaming中的Receiver啟動的過程。我們都知道StreamingContext是SparkStreaming程式的的主要入口,我們先看一下它的部分原始碼:

class StreamingContext private[streaming] (
  sc_ : SparkContext,
   cp_ : Checkpoint,
   batchDur_ : Duration
 ) extends Logging {

 //DStreamGraph中儲存了一系列的DStream之間的依賴關係
 private[streaming] val graph: DStreamGraph = {
   if
(isCheckpointPresent) { cp_.graph.setContext(this) cp_.graph.restoreCheckpointData() cp_.graph } else { require(batchDur_ != null, "Batch duration for StreamingContext cannot be null") val newGraph = new DStreamGraph() newGraph.setBatchDuration(batchDur_) newGraph } } private
[streaming] val scheduler = new JobScheduler(this) }

在這裡我們可以看到當初始化一個StreamingContext物件的時候,就會新建一個DStreamGraph和JobScheduler物件,在這裡DStreamGraph主要儲存了DStream和它們之間的依賴關係

我們看看JobScheduler中的下面程式碼:

//建立JobScheduler的時候就會建立JobGenerator
private val jobGenerator = new JobGenerator(this)

我們就可以看到在建立JobScheduler後隨即就會建立JobGenerator,至於有什麼作用,我們後面再說

當開始啟動一個StreamingContext的時候,我們首先需要呼叫StreamingContext#start方法

def start(): Unit = synchronized {
 ...
 scheduler.start()
 ...
}

在它的start方法中主要就是呼叫了jobScheduler的start方法

...
 //建立了receiverTracker元件,資料接收相關,建立並啟動
 receiverTracker = new ReceiverTracker(ssc)
 inputInfoTracker = new InputInfoTracker(ssc)
 receiverTracker.start()
 //啟動Generator
 //至此,我們說的StreamingContext的幾個相關的元件就都創建出來了
 // 然後就是啟動輸入DStream關聯的Receiver
 //邏輯都在ReceiverTracker的start方法中
 jobGenerator.start()
...

在jobSceduler的start方法中,會建立一個ReceiverTracker物件,然後呼叫其start方法

def start(): Unit = synchronized {
   if (isTrackerStarted) {
     throw new SparkException("ReceiverTracker already started")
   }

   if (!receiverInputStreams.isEmpty) {
     //endpoint為ReceiverTrackerEndpoint
     //初始化一個endpoint,用來接收和處理來自ReceiverTracker和Receiver傳送的訊息
     endpoint = ssc.env.rpcEnv.setupEndpoint(
       "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))

     //這個start的方法主要的就是呼叫了launchReceivers方法,這個ReceiverTracker的主要作用就是啟動Receiver
     //將各個receivers分發到executors上去
     if (!skipReceiverLaunch) launchReceivers()
     logInfo("ReceiverTracker started")
     trackerState = Started
   }
}

先判斷一下當前的receiverInputStreams陣列是否為空,因為在SparkStreaming中除了ReceiverInputStream還有InputStream,類似於讀取hadoop目錄中的檔案就不需要Receiver這個元件,所以這邊需要過濾一下,然後我們發現初始化了一個ReceiverTrackerEndPoint,這個元件的主要功能就是用來處理髮送給ReceiverTracker的訊息,最最重要的是後面執行了一個launchReceivers方法

private def launchReceivers(): Unit = {
   //獲取DStreamGraph.inputStreams的receivers,即資料接收器
   val receivers = receiverInputStreams.map(nis => {
     val rcvr = nis.getReceiver()
     rcvr.setReceiverId(nis.id)
     rcvr
   })

   runDummySparkJob()

   //給訊息接收處理器endpoint傳送StartALLReceivers訊息,直接返回不等待訊息被處理
   endpoint.send(StartAllReceivers(receivers))
}

先獲取與receiverInputStreams對應的receivers,然後向ReceiverTrackerEndpoint傳送startAllReceivers訊息,rte接收到訊息後的處理如下:

case StartAllReceivers(receivers) =>
  //根據流資料接收器分發策略,匹配流資料接收器Receiver和Executor
  val scheduledExecutors = schedulingPolicy.scheduleReceivers(receivers, getExecutors)

  //遍歷每一個receiver,根據上面得到的目的executors呼叫startReceiver方法
  for (receiver <- receivers) {
    val executors = scheduledExecutors(receiver.streamId)
    updateReceiverScheduledExecutors(receiver.streamId, executors)
    //將流資料接收器的最佳位置儲存起來
    receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
    //在指定的executor中啟動receiver
    startReceiver(receiver, executors)
  }

首先會為每一個receiver按照一定的策略指定一個executor,然後呼叫startReceiver(receiver, executors)在指定的executor上啟動receiver

 private def startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String]): Unit = {
   ...
   val serializableHadoopConf =
     new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

   // Function to start the receiver on the worker node
   //這是一個將要被提交到executor上的函式,用來啟動receiver,
   val startReceiverFunc: Iterator[Receiver[_]] => Unit =
     (iterator: Iterator[Receiver[_]]) => {
       if (!iterator.hasNext) {
         throw new SparkException(
           "Could not start receiver as object not found.")
       }
       if (TaskContext.get().attemptNumber() == 0) {
         val receiver = iterator.next()
         assert(iterator.hasNext == false)

         //建立一個ReceiverSupervisorImpl物件supervisor
         val supervisor = new ReceiverSupervisorImpl(
           receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)

         //呼叫start方法來啟動receiver
         supervisor.start()

         //呼叫awaitTermination來阻塞住主執行緒
         supervisor.awaitTermination()
       } else {
         // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
       }
     }

   // Create the RDD using the scheduledExecutors to run the receiver in a Spark job
   //將Receiver與其相對應的executor封裝成一個RDD
   val receiverRDD: RDD[Receiver[_]] =
     if (scheduledExecutors.isEmpty) {
       ssc.sc.makeRDD(Seq(receiver), 1)
     } else {
       ssc.sc.makeRDD(Seq(receiver -> scheduledExecutors))
     }

   receiverRDD.setName(s"Receiver $receiverId")
   ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
   ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

   //將封裝了receiver和其對應的executor的RDD,以及要在ReceiverRDD上執行的函式組裝成Job提交
   // 從而正真地在executor上啟動receiver
   val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
     receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())

   // We will keep restarting the receiver job until ReceiverTracker is stopped
   future.onComplete {
     case Success(_) =>
       if (!shouldStartReceiver) {
         onReceiverJobFinish(receiverId)
       } else {
         logInfo(s"Restarting Receiver $receiverId")
         self.send(RestartReceiver(receiver))
       }
     case Failure(e) =>
       if (!shouldStartReceiver) {
         onReceiverJobFinish(receiverId)
       } else {
         logError("Receiver has been stopped. Try to restart it.", e)
         logInfo(s"Restarting Receiver $receiverId")
         self.send(RestartReceiver(receiver))
       }
   }(submitJobThreadPool)
   logInfo(s"Receiver ${receiver.streamId} started")
}

在這裡,我們將會看到SparkStreaming設計時最精妙的地方,首先將receiver及其對應的executor轉換成一個RDD,然後將封裝了receiver和其對應的executor的RDD,以及要在ReceiverRDD上執行的函式startReceiverFunc組裝成Job提交,當任務提交的時候,在executors上就會呼叫這個函式對傳入的RDD進行處理,然後我們看到在startReceiverFunc函式中,會呼叫supervisor#start方法

def start() {
    onStart()
    startReceiver()
}

上面的方法是在其父類ReceiverSupervisor上的,然後會繼續呼叫父類的startReceiver方法

def startReceiver(): Unit = synchronized {
  try {
     //先呼叫ReceiverSupervisorImpl的onReceiverStart方法進行註冊
     //如果註冊成功,則繼續進行流資料接收器Receiver的啟動
     if (onReceiverStart()) {
       logInfo("Starting receiver")
       receiverState = Started
       //receiver中就是真正的資料接收
       receiver.onStart()
       logInfo("Called receiver onStart")
     } else {
       // The driver refused us
       //如果Driver端的TrackReceiver拒絕註冊或者註冊失敗,則停止流資料接收器
       //併發送登出流資料接收器DeregisterReceiver訊息
       stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
     }
   } catch {
     case NonFatal(t) =>
       stop("Error starting receiver " + streamId, Some(t))
   }
}

在這個方法中就會呼叫receiver的onStart方法進行接收資料,通常在onStart方法裡面會開啟一個執行緒,例如我們的socketReceiver中的原始碼就是這樣的:

def onStart() {
  // Start the thread that receives data over a connection
  new Thread("Socket Receiver") {
    setDaemon(true)
    override def run() { receive() }
  }.start()
}

def receive() {
   var socket: Socket = null
   try {
     logInfo("Connecting to " + host + ":" + port)
     socket = new Socket(host, port)
     logInfo("Connected to " + host + ":" + port)
     val iterator = bytesToObjects(socket.getInputStream())
     while(!isStopped && iterator.hasNext) {
       store(iterator.next)
     }
     if (!isStopped()) {
       restart("Socket data stream had no more data")
     } else {
       logInfo("Stopped receiving")
     }
   } catch {
     case e: java.net.ConnectException =>
       restart("Error connecting to " + host + ":" + port, e)
     case NonFatal(e) =>
       logWarning("Error receiving data", e)
       restart("Error receiving data", e)
   } finally {
     if (socket != null) {
       socket.close()
       logInfo("Closed socket to " + host + ":" + port)
     }
   }
}

至此我們的SparkStreaming的Receiver的啟動過程就分析完了