1. 程式人生 > >[spark streaming] DStream 和 DStreamGraph 解析

[spark streaming] DStream 和 DStreamGraph 解析

看 spark streaming 原始碼解析之前最好先了解spark core的內容。

前言

Spark Streaming 是基於Spark Core將流式計算分解成一系列的小批處理任務來執行。

在Spark Streaming裡,總體負責任務的動態排程是JobScheduler,而JobScheduler有兩個很重要的成員:JobGeneratorReceiverTrackerJobGenerator 負責將每個 batch 生成具體的 RDD DAG ,而ReceiverTracker負責資料的來源。

Spark Streaming裡的DStream可以看成是Spark Core裡的RDD的模板,DStreamGraph

是RDD DAG的模板。

跟著例子看流程

DStream 也和 RDD 一樣有著轉換(transformation)和 輸出(output)操作,通過 transformation 操作會產生新的DStream,典型的transformation 操作有map(), filter(), reduce(), join()等。RDD的輸出操作會觸發action,而DStream的輸出操作也會新建一個ForeachDStream,用一個函式func來記錄所需要做的操作。

下面看一個例子:

val conf = new SparkConf().setMaster("local[2]")
                          .setAppName
("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination
()

在建立 StreamingContext 的時候實建立了 graph: DStreamGraph:

private[streaming] val graph: DStreamGraph = {
    if (isCheckpointPresent) {
      _cp.graph.setContext(this)
      _cp.graph.restoreCheckpointData()
      _cp.graph
    } else {
      require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
      val newGraph = new DStreamGraph()
      newGraph.setBatchDuration(_batchDur)
      newGraph
    }
  }

checkpoint 可用,會優先從 checkpoint 恢復 graph,否則新建一個。graph用來動態的建立RDD DAG,DStreamGraph有兩個重要的成員:inputStreamsoutputStreams

private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()

Spark Streaming記錄DStream DAG 的方式就是通過DStreamGraph例項記錄所有的outputStreams ,因為outputStream會通過依賴
dependencies 來和parent DStream形成依賴鏈,通過outputStreams 向前追溯遍歷就可以得到所有上游的DStream,另外,DStreamGraph 還會記錄所有的inputStreams ,避免每次為查詢 input stream 而對 output steam 進行 BFS 的消耗。

繼續回到例子,這裡通過ssc.socketTextStream 建立了一個ReceiverInputDStream,在其父類 InputDStream 中會將該ReceiverInputDStream新增到inputStream裡。

接著呼叫了flatMap方法:

def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
  }

--------------------------------------------------------------------

private[streaming]
class FlatMappedDStream[T: ClassTag, U: ClassTag](
    parent: DStream[T],
    flatMapFunc: T => TraversableOnce[U]
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
  }
}

建立了一個 FlatMappedDStream ,而該類的compute方法是在父 DStream(ReceiverInputDStream) 在對應batch時間的RDD上呼叫了flatMap方法,也就是構造了 rdd.flatMap(func)這樣的程式碼,後面的操作類似,隨後形成的是rdd.flatMap(func1).map(func2).reduceByKey(func3).take(),這不就是我們spark core裡的東西嗎。另外其dependencies是直接指向了其構造引數parent,也就是剛才的ReceiverInputDStream,每個新建的DStream的dependencies都是指向了其父DStream,這樣就構成了一個依賴鏈,也就是形成了DStream DAG。

這裡我們再看看最後的 print() 操作:

----
def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println(s"Time: $time")
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }
----
private def foreachRDD(
      foreachFunc: (RDD[T], Time) => Unit,
      displayInnerRDDOps: Boolean): Unit = {
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  }
----
#ForEachDStream
override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

在print() 方法裡構建了一個foreachFunc方法:對一個rdd進行了take操作並列印(spark core中的action操作)。隨後建立了ForEachDStream例項並呼叫了register()方法:

 private[streaming] def register(): DStream[T] = {
    ssc.graph.addOutputStream(this)
    this
  }

將 OutputStream 新增到DStreamGraphoutputStreams 裡。可以看到剛才構建的 foreachFunc 方法最終用在了ForEachDStream例項的generateJob方法裡,並建立了一個Streaming 中的Job,在job中的run方法中會呼叫這個方法,也就是會觸發action操作。

注意這裡Spark Streaming的Job和Spark Core裡的Job是不一樣的,Streaming的Job執行的是前面構造的方法,方法裡面是Core裡的Job,方法可以定義多個core裡的Job,也可以一個core裡的job都沒有。