深入理解Spark Streaming
一.DStream的兩類操作
DStream內部其實是RDD序列,所有的DStream操作最終都轉換為RDD操作。通過分析原始碼,可以進一步窺探這種轉換是如何進行的。
DStream有一些與RDD類似的基礎屬性:
- 依賴的其它DStream列表。
- 生成RDD的時間間隔。
- 一個名為compute的計算函式,用於生成RDD,類似於RDD的compute。
DStream的操作分為兩類,一類是Transformation操作,對應RDD的Transformation操作。以flatMap為例,DStream中的flatMap不過是返回一個新的DStream派生類FlatMappedDStream,這一點跟RDD的flatMap非常類似。DStream的flatMap定義如下:
/** * Return a new DStream by applying a function to all elements of this DStream, * and then flattening the results */ def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope { new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) }
而FlatMappedDStream的實現也很簡單,主要作用是像RDD一樣維護計算關係鏈,完整定義如下:
private[streaming] class FlatMappedDStream[T: ClassTag, U: ClassTag]( parent: DStream[T], flatMapFunc: T => TraversableOnce[U] ) extends DStream[U](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration= parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) } }
其中compute呼叫DStream的getOrCompute方法用於讀取RDD的記憶體,要麼放到快取中,要麼呼叫介面函式compute計算生成。
DStream另外一類操作是OutPut操作,Output操作才會觸發DStream的實際執行,作用非常類似於RDD的Action操作,類如print操作,定義如下:
/** * Print the first ten elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ def print(): Unit = ssc.withScope { print(10) } /** * Print the first num elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ def print(num: Int): Unit = ssc.withScope { def foreachFunc: (RDD[T], Time) => Unit = { (rdd: RDD[T], time: Time) => { val firstNum = rdd.take(num + 1) // scalastyle:off println println("-------------------------------------------") println(s"Time: $time") println("-------------------------------------------") firstNum.take(num).foreach(println) if (firstNum.length > num) println("...") println() // scalastyle:on println } } foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false) }
DStream.print呼叫了RDD.take方法,而後者是一個Action操作,是不是所有的DStream輸出操作最後都呼叫一個RDD的Action操作呢,看看saveAsTextFiles和saveAsObjectFiles,它們沒有直接呼叫RDD Action操作【而是先呼叫一下rdd.saveAsTextFile】,然後通過foreachRDD來實現的,傳入的函式中呼叫了RDD的Action。saveAsTextFiles的定義如下:
/** * Save each RDD in this DStream as at text file, using string representation * of elements. The file name at each batch interval is generated based on * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope { val saveFunc = (rdd: RDD[T], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(file) } this.foreachRDD(saveFunc, displayInnerRDDOps = false) }
相比之下,另外一個最靈活的Output操作foreachRDD完全依賴傳入的函式來實現功能,所以對於foreachRDD的使用至少要包含一個RDD Action呼叫。因為Spark Streaming的排程是由Output方法觸發的,每個週期呼叫一次所有定義的Output方法。Output內部再呼叫RDD Action最終完成計算,否則程式只會接收資料,然後丟棄,不執行計算。