1. 程式人生 > 實用技巧 >深入理解Spark Streaming

深入理解Spark Streaming

一.DStream的兩類操作

  DStream內部其實是RDD序列,所有的DStream操作最終都轉換為RDD操作。通過分析原始碼,可以進一步窺探這種轉換是如何進行的。

  DStream有一些與RDD類似的基礎屬性:

  • 依賴的其它DStream列表。
  • 生成RDD的時間間隔。
  • 一個名為compute的計算函式,用於生成RDD,類似於RDD的compute。

  DStream的操作分為兩類,一類是Transformation操作,對應RDD的Transformation操作。以flatMap為例,DStream中的flatMap不過是返回一個新的DStream派生類FlatMappedDStream,這一點跟RDD的flatMap非常類似。DStream的flatMap定義如下:

  /**
   * Return a new DStream by applying a function to all elements of this DStream,
   * and then flattening the results
   */
  def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
  }

  而FlatMappedDStream的實現也很簡單,主要作用是像RDD一樣維護計算關係鏈,完整定義如下:

private[streaming]
class FlatMappedDStream[T: ClassTag, U: ClassTag](
    parent: DStream[T],
    flatMapFunc: T => TraversableOnce[U]
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration 
= parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) } }

  其中compute呼叫DStream的getOrCompute方法用於讀取RDD的記憶體,要麼放到快取中,要麼呼叫介面函式compute計算生成。

  DStream另外一類操作是OutPut操作,Output操作才會觸發DStream的實際執行,作用非常類似於RDD的Action操作,類如print操作,定義如下:

 /**
   * Print the first ten elements of each RDD generated in this DStream. This is an output
   * operator, so this DStream will be registered as an output stream and there materialized.
   */
  def print(): Unit = ssc.withScope {
    print(10)
  }

  /**
   * Print the first num elements of each RDD generated in this DStream. This is an output
   * operator, so this DStream will be registered as an output stream and there materialized.
   */
  def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println(s"Time: $time")
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }

  DStream.print呼叫了RDD.take方法,而後者是一個Action操作,是不是所有的DStream輸出操作最後都呼叫一個RDD的Action操作呢,看看saveAsTextFiles和saveAsObjectFiles,它們沒有直接呼叫RDD Action操作【而是先呼叫一下rdd.saveAsTextFile】,然後通過foreachRDD來實現的,傳入的函式中呼叫了RDD的Action。saveAsTextFiles的定義如下:

  /**
   * Save each RDD in this DStream as at text file, using string representation
   * of elements. The file name at each batch interval is generated based on
   * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
   */
  def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
    val saveFunc = (rdd: RDD[T], time: Time) => {
      val file = rddToFileName(prefix, suffix, time)
      rdd.saveAsTextFile(file)
    }
    this.foreachRDD(saveFunc, displayInnerRDDOps = false)
  }

  相比之下,另外一個最靈活的Output操作foreachRDD完全依賴傳入的函式來實現功能,所以對於foreachRDD的使用至少要包含一個RDD Action呼叫。因為Spark Streaming的排程是由Output方法觸發的,每個週期呼叫一次所有定義的Output方法。Output內部再呼叫RDD Action最終完成計算,否則程式只會接收資料,然後丟棄,不執行計算。