Spark學習筆記(26)在DStream的Action操作之外也可能產生Job操作
阿新 • • 發佈:2019-01-10
本期內容:
1. Spark Streaming產生Job的機制
2. Spark Streaming的其它產生Job的方式
1. Spark Streaming產生Job的機制
Scala程式中,函式可以作為引數傳遞,因為函式也是物件。有函式物件不意味著函式馬上就執行。Spark Streaming中,常利用執行緒的run來呼叫函式,從而導致函式的最終執行。
Spark Streaming中,Job物件包含函式成員。
NetworkWordCount程式中,DStream.print導致了Job的產生。
DStream.print:
def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("-------------------------------------------")
println("Time: " + time)
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println
}
}
foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}
Spark Streaming應用程式中,除了print,saveAsObjectFiles、saveAsTextFiles等也能呼叫foreachRDD,生成ForEachDStream,才能在後面產生Job。
DStream.foreachRDD:
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
通過register註冊,新生成的ForEachDStream加入到DStreamGraph的成員outputDStreams中。
如果沒有print、count、saveAsObjectFiles、saveAsTextFiles等這樣的程式碼,DStreamGraph中outputDStreams就為空,那麼DStreamGraph.generateJobs就產生結果呢?
DStreamGraph.generateJobs:
def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs } DStreamGraph.generateJobs就會產生空的Job序列。
通過對DStream(或其子類)定製自己的方法,可以使foreachFunc的定義中不含有RDD.take這樣的語句。 這樣的話,foreachRDD中的foreachFunc不一定會產生Job。如果其中的函式foreachFunc裡面沒有Action操作,就不會觸發Job。 2. Spark Streaming的其它產生Job的方式
一定要action才會有Job嗎?不是。ForEachDStream.transform就可能產生Job。ForEachDStream.transform有兩個定義,是呼叫關係。 ForEachDStream.transform: /** * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean val cleanedF = context.sparkContext.clean(transformFunc, false) transform((r: RDD[T], t: Time) => cleanedF(r)) } /** * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean val cleanedF = context.sparkContext.clean(transformFunc, false) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 1) cleanedF(rdds.head.asInstanceOf[RDD[T]], time) } new TransformedDStream[U](Seq(this), realTransformFunc) } 其中的函式型別的引數transformFunc是輸入RDD併產生一個新的RDD。最終實際會生成TransformedDStream物件。 在第8課中提到過,一般的DStream子類的Compute方法,僅僅是呼叫父類DStream的getOrCompute,而TransformedDStream的compte方法不是這樣。 TransformedDStream.compute:
override def compute(validTime: Time): Option[RDD[U]] = { val parentRDDs = parents.map { parent => parent.getOrCompute(validTime).getOrElse( // Guard out against parent DStream that return None instead of Some(rdd) to avoid NPE throw new SparkException(s"Couldn't generate RDD from parent at time $validTime")) } val transformedRDD = transformFunc(parentRDDs, validTime) if (transformedRDD == null) { throw new SparkException("Transform function must not return null. " + "Return SparkContext.emptyRDD() instead to represent no element " + "as the result of transformation.") } Some(transformedRDD) } 和別的DStream子類不同,TransformedDStream的compute方法還呼叫了transformFunc,函式transformFunc是被馬上執行的。這就不會等到JobScheduler排程後再執行。 transformFunc其中如果有count、print等action操作,就也會觸發這個Job的執行。這其實可以理解為是個漏洞。 此前說的各種操作是lazy級別,不能馬上拿到結果。而由於transformFunc不接受Spark的統一排程,這樣可以根據計算結果做出判斷再後續操作。不會因為lazy級別而不能必須做後續的transform。
def print(num: Int): Unit = ssc.withScope {
DStreamGraph.generateJobs:
def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs } DStreamGraph.generateJobs就會產生空的Job序列。
通過對DStream(或其子類)定製自己的方法,可以使foreachFunc的定義中不含有RDD.take這樣的語句。 這樣的話,foreachRDD中的foreachFunc不一定會產生Job。如果其中的函式foreachFunc裡面沒有Action操作,就不會觸發Job。 2. Spark Streaming的其它產生Job的方式
一定要action才會有Job嗎?不是。ForEachDStream.transform就可能產生Job。ForEachDStream.transform有兩個定義,是呼叫關係。 ForEachDStream.transform: /** * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean val cleanedF = context.sparkContext.clean(transformFunc, false) transform((r: RDD[T], t: Time) => cleanedF(r)) } /** * Return a new DStream in which each RDD is generated by applying a function * on each RDD of 'this' DStream. */ def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope { // because the DStream is reachable from the outer object here, and because // DStreams can't be serialized with closures, we can't proactively check // it for serializability and so we pass the optional false to SparkContext.clean val cleanedF = context.sparkContext.clean(transformFunc, false) val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { assert(rdds.length == 1) cleanedF(rdds.head.asInstanceOf[RDD[T]], time) } new TransformedDStream[U](Seq(this), realTransformFunc) } 其中的函式型別的引數transformFunc是輸入RDD併產生一個新的RDD。最終實際會生成TransformedDStream物件。 在第8課中提到過,一般的DStream子類的Compute方法,僅僅是呼叫父類DStream的getOrCompute,而TransformedDStream的compte方法不是這樣。 TransformedDStream.compute:
override def compute(validTime: Time): Option[RDD[U]] = { val parentRDDs = parents.map { parent => parent.getOrCompute(validTime).getOrElse( // Guard out against parent DStream that return None instead of Some(rdd) to avoid NPE throw new SparkException(s"Couldn't generate RDD from parent at time $validTime")) } val transformedRDD = transformFunc(parentRDDs, validTime) if (transformedRDD == null) { throw new SparkException("Transform function must not return null. " + "Return SparkContext.emptyRDD() instead to represent no element " + "as the result of transformation.") } Some(transformedRDD) } 和別的DStream子類不同,TransformedDStream的compute方法還呼叫了transformFunc,函式transformFunc是被馬上執行的。這就不會等到JobScheduler排程後再執行。 transformFunc其中如果有count、print等action操作,就也會觸發這個Job的執行。這其實可以理解為是個漏洞。 此前說的各種操作是lazy級別,不能馬上拿到結果。而由於transformFunc不接受Spark的統一排程,這樣可以根據計算結果做出判斷再後續操作。不會因為lazy級別而不能必須做後續的transform。