1. 程式人生 > 其它 >Spark原始碼系列(八)Spark Streaming例項分析

Spark原始碼系列(八)Spark Streaming例項分析

這一章要講Spark Streaming,講之前首先回顧下它的用法,具體用法請參照《Spark Streaming程式設計指南》。

Example程式碼分析

val ssc = new StreamingContext(sparkConf, Seconds(1));
// 獲得一個DStream負責連線 監聽埠:地址
val lines = ssc.socketTextStream(serverIP, serverPort);
// 對每一行資料執行Split操作
val words = lines.flatMap(_.split(" "));
// 統計word的數量
val pairs = words.map(word => (word, 1));
val wordCounts = pairs.reduceByKey(_ + _);
// 輸出結果
wordCounts.print();
ssc.start();             // 開始
ssc.awaitTermination();  // 計算完畢退出

1、首先例項化一個StreamingContext

2、呼叫StreamingContext的socketTextStream

3、對獲得的DStream進行處理

4、呼叫StreamingContext是start方法,然後等待

我們看StreamingContext的socketTextStream方法吧。

  def socketTextStream(
      hostname: String,
      port: Int,
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[String] = {
    socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
  }

1、StoageLevel是StorageLevel.MEMORY_AND_DISK_SER_2

2、使用SocketReceiver的bytesToLines把輸入流轉換成可遍歷的資料

繼續看socketStream方法,它直接new了一個

new SocketInputDStream[T](this, hostname, port, converter, storageLevel)

繼續深入挖掘SocketInputDStream,追述一下它的繼承關係,SocketInputDStream>>ReceiverInputDStream>>InputDStream>>DStream。

具體實現ReceiverInputDStream的類有好幾個,基本上都是從網路端來資料的。

它實現了ReceiverInputDStream的getReceiver方法,例項化了一個SocketReceiver來接收資料。

SocketReceiver的onStart方法裡面呼叫了receive方法,處理程式碼如下:

      socket = new Socket(host, port)
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next)
      }

1、new了一個Socket來接收資料,用bytesToLines方法把InputStream轉換成一行一行的字串。

2、把每一行資料用store方法儲存起來,store方法是從SocketReceiver的父類Receiver繼承而來,內部實現是:

  def store(dataItem: T) {
    executor.pushSingle(dataItem)
  }

executor是ReceiverSupervisor型別,Receiver的操作都是由它來處理。這裡先不深糾,後面我們再說這個pushSingle的實現。

到這裡我們知道lines的型別是SocketInputDStream,然後對它是一頓的轉換,flatMap、map、reduceByKey、print,這些方法都不是RDD的那種方法,而是DStream獨有的。

講到上面這幾個方法,我們開始轉入DStream了,flatMap、map、reduceByKey、print方法都涉及到DStream的轉換,這和RDD的轉換是類似的。我們講一下reduceByKey和print。

reduceByKey方法和RDD一樣,呼叫的combineByKey方法實現的,不一樣的是它直接new了一個ShuffledDStream了,我們接著看一下它的實現吧。

override def compute(validTime: Time): Option[RDD[(K,C)]] = {
    parent.getOrCompute(validTime) match {
      case Some(rdd) => Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
      case None => None
    }
  }

在compute階段,對通過Time獲得的rdd進行reduceByKey操作。接下來的print方法也是一個轉換:

new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()

列印前十個,超過10個列印"..."。需要注意register方法。

ssc.graph.addOutputStream(this)

它會把程式碼插入到當前的DStream新增到outputStreams裡面,後面輸出的時候如果沒有outputStream就不會有輸出,這個需要記住哦!

啟動過程分析

前戲結束之後,ssc.start() 高潮開始了。 start方法很小,最核心的一句是JobScheduler的start方法。我們得轉到JobScheduler方法上面去。

下面是start方法的程式碼:

  def start(): Unit = synchronized {
  // 接受到JobSchedulerEvent就處理事件
    eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
      def receive = {
        case event: JobSchedulerEvent => processEvent(event)
      }
    }), "JobScheduler")

    listenerBus.start()
    receiverTracker = new ReceiverTracker(ssc)
    receiverTracker.start()
    jobGenerator.start()
  }

1、啟動了一個Actor來處理JobScheduler的JobStarted、JobCompleted、ErrorReported事件。

2、啟動StreamingListenerBus作為監聽器。

3、啟動ReceiverTracker。

4、啟動JobGenerator。

我們接下來看看ReceiverTracker的start方法。

  def start() = synchronized {if (!receiverInputStreams.isEmpty) {
      actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor), "ReceiverTracker")
      receiverExecutor.start()
    }
  }

1、首先判斷了一下receiverInputStreams不能為空,那receiverInputStreams是怎麼時候寫入值的呢?答案在SocketInputDStream的父類InputDStream當中,當例項化InputDStream的時候會在DStreamGraph裡面新增InputStream。

abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) {
  ssc.graph.addInputStream(this)
  //....
}

2、例項化ReceiverTrackerActor,它負責RegisterReceiver(註冊Receiver)、AddBlock、ReportError(報告錯誤)、DeregisterReceiver(登出Receiver)等事件的處理。

3、啟動receiverExecutor(實際類是ReceiverLauncher,這名字起得。。),它主要負責啟動Receiver,start方法裡面呼叫了startReceivers方法吧。

    private def startReceivers() {
     // 對應著上面的那個例子,getReceiver方法獲得是SocketReceiver
      val receivers = receiverInputStreams.map(nis => {
        val rcvr = nis.getReceiver()
        rcvr.setReceiverId(nis.id)
        rcvr
      })

      // 檢視是否所有的receivers都有優先選擇機器,這個需要重寫Receiver的preferredLocation方法,目前只有FlumeReceiver重寫了
      val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)

      // 建立一個並行receiver集合的RDD, 把它們分散到各個worker節點上
      val tempRDD =
        if (hasLocationPreferences) {
          val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
          ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
        } else {
          ssc.sc.makeRDD(receivers, receivers.size)
        }

      // 在worker節點上啟動Receiver的方法,遍歷所有Receiver,然後啟動
      val startReceiver = (iterator: Iterator[Receiver[_]]) => {
        if (!iterator.hasNext) {
          throw new SparkException("Could not start receiver as object not found.")
        }
        val receiver = iterator.next()
        val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
        executor.start()
        executor.awaitTermination()
      }
      // 執行這個重複的作業來確保所有的slave都已經註冊了,避免所有的receivers都到一個節點上
      if (!ssc.sparkContext.isLocal) {
        ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
      }

      // 把receivers分發出去,啟動
      ssc.sparkContext.runJob(tempRDD, startReceiver)
    }

1、遍歷receiverInputStreams獲取所有的Receiver。

2、檢視這些Receiver是否全都有優先選擇機器。

3、把SparkContext的makeRDD方法把所有Receiver包裝到ParallelCollectionRDD裡面,並行度是Receiver的數量。

4、發個小任務給確保所有的slave節點都已經註冊了(這個小任務有點兒莫名其妙,感覺怪怪的)。

5、提交作業,啟動所有Receiver。

Spark寫得實在是太巧妙了,居然可以把Receiver包裝在RDD裡面,當做是資料來處理!

啟動Receiver的時候,new了一個ReceiverSupervisorImpl,然後調的start方法,主要乾了這麼三件事情,程式碼就不貼了。

1、啟動BlockGenerator。

2、呼叫Receiver的OnStart方法,開始接受資料,並把資料寫入到ReceiverSupervisor。

3、呼叫onReceiverStart方法,傳送RegisterReceiver訊息給driver報告自己啟動了。

儲存接收到的資料

ok,到了這裡,重點落到了BlockGenerator。前面說到SocketReceiver把接受到的資料呼叫ReceiverSupervisor的pushSingle方法儲存。

  // 這是ReceiverSupervisorImpl的方法
  def pushSingle(data: Any) {
    blockGenerator += (data)
  }
  // 這是BlockGenerator的方法
   def += (data: Any): Unit = synchronized {
    currentBuffer += data
  }

我們看一下它的start方法吧。

  def start() {
    blockIntervalTimer.start()
    blockPushingThread.start()
  }

它啟動了一個定時器RecurringTimer和一個執行緒執行keepPushingBlocks方法。

先看RecurringTimer的實現:

      while (!stopped) {
        clock.waitTillTime(nextTime)
        callback(nextTime)
        prevTime = nextTime
        nextTime += period
      }

每隔一段時間就執行callback函式,callback函式是new的時候傳進來的,是BlockGenerator的updateCurrentBuffer方法。

  private def updateCurrentBuffer(time: Long): Unit = synchronized {
    try {
      val newBlockBuffer = currentBuffer
      currentBuffer = new ArrayBuffer[Any]
      if (newBlockBuffer.size > 0) {
        val blockId = StreamBlockId(receiverId, time - blockInterval)
        val newBlock = new Block(blockId, newBlockBuffer)
        blocksForPushing.put(newBlock) 
      }
    } catch {case t: Throwable =>
        reportError("Error in block updating thread", t)
    }
  }

它new了一個Block出來,然後新增到blocksForPushing這個ArrayBlockingQueue隊列當中。

提到這裡,有兩個引數需要大家注意的:

spark.streaming.blockInterval   預設值是200
spark.streaming.blockQueueSize  預設值是10

這是前面提到的間隔時間和佇列的長度,間隔時間預設是200毫秒,佇列是最多能容納10個Block,多了就要阻塞了。

我們接下來看一下BlockGenerator另外啟動的那個執行緒執行的keepPushingBlocks方法到底在幹什麼?

  private def keepPushingBlocks() {
    while(!stopped) {
        Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
          case Some(block) => pushBlock(block)
          case None =>
        }
      }
   // ...退出之前把剩下的也輸出去了
  }

它在把blocksForPushing中的block不停的拿出來,呼叫pushBlock方法,這個方法屬於在例項化BlockGenerator的時候,從ReceiverSupervisorImpl傳進來的BlockGeneratorListener的。

  private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
    def onError(message: String, throwable: Throwable) {
      reportError(message, throwable)
    }

    def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
      pushArrayBuffer(arrayBuffer, None, Some(blockId))
    }
  }, streamId, env.conf)

1、reportError,通過actor向driver傳送錯誤報告訊息ReportError。

2、呼叫pushArrayBuffer儲存資料。

下面是pushArrayBuffer方法:

  def pushArrayBuffer(arrayBuffer: ArrayBuffer[_], optionalMetadata: Option[Any], optionalBlockId: Option[StreamBlockId]
    ) {
    val blockId = optionalBlockId.getOrElse(nextBlockId)
    val time = System.currentTimeMillis
    blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], storageLevel, tellMaster = true)
    reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
  }

1、把Block儲存到BlockManager當中,序列化方式為之前提到的StorageLevel.MEMORY_AND_DISK_SER_2(記憶體不夠就寫入到硬碟,並且在2個節點上儲存的方式)。

2、呼叫reportPushedBlock給driver傳送AddBlock訊息,報告新新增的Block,ReceiverTracker收到訊息之後更新內部的receivedBlockInfo對映關係。

處理接收到的資料

前面只講了資料的接收和儲存,那資料是怎麼處理的呢?

之前一直講ReceiverTracker,而忽略了之前的JobScheduler的start方法裡面最後啟動的JobGenerator。

  def start(): Unit = synchronized {
    eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
      def receive = {
        case event: JobGeneratorEvent =>  processEvent(event)
      }
    }), "JobGenerator")
    if (ssc.isCheckpointPresent) {
      restart()
    } else {
      startFirstTime()
    }
  }

1、啟動一個actor處理JobGeneratorEvent事件。

2、如果是已經有CheckPoint了,就接著上次的記錄進行處理,否則就是第一次啟動。

我們先看startFirstTime吧,CheckPoint以後再說吧,有點兒小複雜。

  private def startFirstTime() {
    val startTime = new Time(timer.getStartTime())
    graph.start(startTime - graph.batchDuration)
    timer.start(startTime.milliseconds)
  }

1、timer.getStartTime計算出來下一個週期的到期時間,計算公式:(math.floor(clock.currentTime.toDouble / period) + 1).toLong * period,以當前的時間/除以間隔時間,再用math.floor求出它的上一個整數(即上一個週期的到期時間點),加上1,再乘以週期就等於下一個週期的到期時間。

2、啟動DStreamGraph,啟動時間=startTime - graph.batchDuration。

3、啟動Timer,我們看看它的定義:

  private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")

到這裡就清楚了,DStreamGraph的間隔時間就是timer的間隔時間,啟動時間要設定成比Timer早一個時間間隔,原因再慢慢探究。

可以看出來每隔一段時間,Timer給eventActor傳送GenerateJobs訊息,我們直接去看它的處理方法generateJobs吧,中間忽略了一步,大家自己看。

  private def processEvent(event: JobGeneratorEvent) {
    event match {
      case GenerateJobs(time) => generateJobs(time)
      case ClearMetadata(time) => clearMetadata(time)
      case DoCheckpoint(time) => doCheckpoint(time)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
  }

下面是generateJobs方法。

  private def generateJobs(time: Time) {
    SparkEnv.set(ssc.env)
    Try(graph.generateJobs(time)) match {
      case Success(jobs) =>
        val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
          val streamId = stream.id
          val receivedBlockInfo = stream.getReceivedBlockInfo(time)
          (streamId, receivedBlockInfo)
        }.toMap
        jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
    eventActor ! DoCheckpoint(time)
  }

1、DStreamGraph生成jobs。

2、從stream那裡獲取接收到的Block資訊。

3、呼叫submitJobSet方法提交作業。

4、提交完作業之後,做一個CheckPoint。

先看DStreamGraph是怎麼生成的jobs。

  def generateJobs(time: Time): Seq[Job] = {
    val jobs = this.synchronized {
      outputStreams.flatMap(outputStream => outputStream.generateJob(time))
    }
    jobs
  }

outputStreams在這個例子裡面是print這個方法裡面新增的,這個在前面說了,我們繼續看DStream的generateJob。

  private[streaming] def generateJob(time: Time): Option[Job] = {
    getOrCompute(time) match {
      case Some(rdd) => {
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          context.sparkContext.runJob(rdd, emptyFunc)
        }
        Some(new Job(time, jobFunc))
      }
      case None => None
    }
  }

1、呼叫getOrCompute方法獲得RDD

2、new了一個方法去提交這個作業,缺什麼都不做

為什麼呢?這是直接跳轉的錯誤,呵呵,因為這個outputStream是print方法返回的,它應該是ForEachDStream,所以我們應該看的是它裡面的generateJob方法。

  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

這裡請大家千萬要注意,不要在這塊被卡住了。

我們看看它這個RDD是怎麼出來的吧。

  private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
    // If this DStream was not initialized (i.e., zeroTime not set), then do it
    // If RDD was already generated, then retrieve it from HashMap
    generatedRDDs.get(time) match {

      // 這個RDD已經被生成過了,直接用就是了
      case Some(oldRDD) => Some(oldRDD)

      // 還沒生成過,就呼叫compte函式生成一個
      case None => {
        if (isTimeValid(time)) {
          compute(time) match {
            case Some(newRDD) =>
         // 設定儲存的級別
              if (storageLevel != StorageLevel.NONE) {
                newRDD.persist(storageLevel)
              }
         // 如果現在需要,就做CheckPoint
              if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
                newRDD.checkpoint()
              }
         // 新增到generatedRDDs裡面去,可以再次利用
              generatedRDDs.put(time, newRDD)
              Some(newRDD)
            case None =>
              None
          }
        } else {
          None
        }
      }
    }
  }

從上面的方法可以看出來它是通過每個DStream自己實現的compute函式得出來的RDD。我們找到SocketInputDStream,沒有compute函式,在父類ReceiverInputDStream裡面找到了。

  override def compute(validTime: Time): Option[RDD[T]] = {
    // 如果出現了時間比startTime早的話,就返回一個空的RDD,因為這個很可能是master掛了之後的錯誤恢復
    if (validTime >= graph.startTime) {
      val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
      receivedBlockInfo(validTime) = blockInfo
      val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
      Some(new BlockRDD[T](ssc.sc, blockIds))
    } else {
      Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
    }
  }

通過DStream的id把receiverTracker當中把接收到的block資訊全部拿出來,記錄到ReceiverInputDStream自身的receivedBlockInfo這個HashMap裡面,就把RDD返回了,RDD裡面實際包含的是Block的id的集合。

現在我們就可以回到之前JobGenerator的generateJobs方法,我們就清楚它這句是提交的什麼了。

jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))

JobSet是記錄Job的完成情況的,直接看submitJobSet方法吧。

  def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
    } else {
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    }
  }

遍歷jobSet裡面的所有jobs,通過jobExecutor這個執行緒池提交。我們看一下JobHandler就知道了。

  private class JobHandler(job: Job) extends Runnable {
    def run() {
      eventActor ! JobStarted(job)
      job.run()
      eventActor ! JobCompleted(job)
    }
  }

1、通知eventActor處理JobStarted事件。

2、執行job。

3、通知eventActor處理JobCompleted事件。

這裡的重點是job.run,事件處理只是更新相關的job資訊。

  def run() {
    result = Try(func())
  }

在遍歷BlockRDD的時候,在compute函式獲取該Block(詳細請看BlockRDD),然後對這個RDD的結果進行列印。

到這裡就算結束了,最後來個總結吧,圖例在下一章補上,這一章只是過程分析:

1、可以有多個輸入,我們可以通過StreamingContext定義多個輸入,比如我們監聽多個(host,ip),可以給它們定義各自的處理邏輯和輸出,輸出方式不僅限於print方法,還可以有別的方法,saveAsTextFiles和saveAsObjectFiles。這塊的設計是支援共享StreamingContext的。

2、StreamingContext啟動了JobScheduler,JobScheduler啟動ReceiverTracker和JobGenerator。

3、ReceiverTracker是通過把Receiver包裝成RDD的方式,傳送到Executor端執行起來的,Receiver起來之後向ReceiverTracker傳送RegisterReceiver訊息。

3、Receiver把接收到的資料,通過ReceiverSupervisor儲存。

4、ReceiverSupervisorImpl把資料寫入到BlockGenerator的一個ArrayBuffer當中。

5、BlockGenerator內部每個一段時間(預設是200毫秒)就把這個ArrayBuffer構造成Block新增到blocksForPushing當中。

6、BlockGenerator的另外一條執行緒則不斷的把加入到blocksForPushing當中的Block寫入到BlockManager當中,並向ReceiverTracker傳送AddBlock訊息。

7、JobGenerator內部有個定時器,定期生成Job,通過DStream的id,把ReceiverTracker接收到的Block資訊從BlockManager上抓取下來進行處理,這個間隔時間是我們在例項化StreamingContext的時候傳進去的那個時間,在這個例子裡面是Seconds(1)。