[spark streaming] DStream 和 DStreamGraph 解析
看 spark streaming 原始碼解析之前最好先了解spark core的內容。
前言
Spark Streaming 是基於Spark Core將流式計算分解成一系列的小批處理任務來執行。
在Spark Streaming裡,總體負責任務的動態排程是JobScheduler
,而JobScheduler
有兩個很重要的成員:JobGenerator
和 ReceiverTracker
。JobGenerator
負責將每個 batch 生成具體的 RDD DAG ,而ReceiverTracker
負責資料的來源。
Spark Streaming裡的DStream
可以看成是Spark Core裡的RDD的模板,DStreamGraph
跟著例子看流程
DStream 也和 RDD 一樣有著轉換(transformation)和 輸出(output)操作,通過 transformation
操作會產生新的DStream
,典型的transformation
操作有map(), filter(), reduce(), join()等。RDD的輸出操作會觸發action,而DStream的輸出操作也會新建一個ForeachDStream
,用一個函式func來記錄所需要做的操作。
下面看一個例子:
val conf = new SparkConf().setMaster("local[2]")
.setAppName ("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination ()
在建立 StreamingContext
的時候實建立了 graph: DStreamGraph:
private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
_cp.graph.setContext(this)
_cp.graph.restoreCheckpointData()
_cp.graph
} else {
require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
val newGraph = new DStreamGraph()
newGraph.setBatchDuration(_batchDur)
newGraph
}
}
若checkpoint
可用,會優先從 checkpoint 恢復 graph,否則新建一個。graph用來動態的建立RDD DAG,DStreamGraph
有兩個重要的成員:inputStreams
和outputStreams
。
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
Spark Streaming記錄DStream DAG 的方式就是通過DStreamGraph
例項記錄所有的outputStreams
,因為outputStream
會通過依賴
dependencies
來和parent DStream形成依賴鏈,通過outputStreams
向前追溯遍歷就可以得到所有上游的DStream,另外,DStreamGraph
還會記錄所有的inputStreams
,避免每次為查詢 input stream 而對 output steam 進行 BFS 的消耗。
繼續回到例子,這裡通過ssc.socketTextStream 建立了一個ReceiverInputDStream
,在其父類 InputDStream 中會將該ReceiverInputDStream
新增到inputStream
裡。
接著呼叫了flatMap方法:
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
--------------------------------------------------------------------
private[streaming]
class FlatMappedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
flatMapFunc: T => TraversableOnce[U]
) extends DStream[U](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
}
}
建立了一個 FlatMappedDStream
,而該類的compute方法是在父 DStream(ReceiverInputDStream) 在對應batch時間的RDD上呼叫了flatMap方法,也就是構造了 rdd.flatMap(func)
這樣的程式碼,後面的操作類似,隨後形成的是rdd.flatMap(func1).map(func2).reduceByKey(func3).take()
,這不就是我們spark core裡的東西嗎。另外其dependencies
是直接指向了其構造引數parent,也就是剛才的ReceiverInputDStream
,每個新建的DStream的dependencies都是指向了其父DStream,這樣就構成了一個依賴鏈,也就是形成了DStream DAG。
這裡我們再看看最後的 print() 操作:
----
def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("-------------------------------------------")
println(s"Time: $time")
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println
}
}
foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}
----
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
----
#ForEachDStream
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}
在print() 方法裡構建了一個foreachFunc方法:對一個rdd進行了take操作並列印(spark core中的action操作)。隨後建立了ForEachDStream例項並呼叫了register()方法:
private[streaming] def register(): DStream[T] = {
ssc.graph.addOutputStream(this)
this
}
將 OutputStream 新增到DStreamGraph
的outputStreams
裡。可以看到剛才構建的 foreachFunc 方法最終用在了ForEachDStream
例項的generateJob
方法裡,並建立了一個Streaming 中的Job,在job中的run方法中會呼叫這個方法,也就是會觸發action操作。
注意這裡Spark Streaming的Job和Spark Core裡的Job是不一樣的,Streaming的Job執行的是前面構造的方法,方法裡面是Core裡的Job,方法可以定義多個core裡的Job,也可以一個core裡的job都沒有。