Spark學習筆記(18)Spark Streaming中空RDD處理
阿新 • • 發佈:2019-01-11
本期內容:
1 Spark Streaming中的空RDD處理
2 Spark Streaming程式的停止
1 Spark Streaming中的空RDD處理
在Spark Streaming應用程式中,無論使用什麼 DStream,底層實際上就是操作RDD。 從一個應用程式片段開始,進行剖析: ... val lines = ssc.socketTextStream("Master", 9999) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords => {
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => {
val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")"
val stmt = connection.createStatement();
stmt.executeUpdate(sql);
})
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
...
程式中有一個這樣的問題:wordCounts.foreachRDD裡面,開始時並沒有判斷rdd是否為空,就進行處理了。
rdd為空時,也獲取CPU core等計算資源,並進行裡面的計算。這顯然是不合適的。
雖然Spark中定義了EmptyRDD,且讓其Compute時丟擲異常,但實際Spark應用程式並沒有使用EmptyRDD。
應該對每個rdd進行處理前,應該判斷rdd是否為空。
再看看RDD.isEmpty:
def isEmpty(): Boolean = withScope {
partitions.length == 0 || take(1).length == 0
}
故前面應用程式的程式碼可以在加一行程式碼:
wordCounts.foreachRDD { rdd =>
if (!rdd.isEmpty) {
rdd.foreachPartition { partitionOfRecords => { ...
}
...
2 Spark Streaming程式的停止 先看StreamingContext.top: def stop( stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true) ): Unit = synchronized { stop(stopSparkContext, false) } 真正好的停止一個Spark Streaming應用程式,應該用另一個stop: def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = { var shutdownHookRefToRemove: AnyRef = null if (AsynchronousListenerBus.withinListenerThread.value) { throw new SparkException("Cannot stop StreamingContext within listener thread of" + " AsynchronousListenerBus") } synchronized { try { state match { case INITIALIZED => logWarning("StreamingContext has not been started yet") case STOPPED => logWarning("StreamingContext has already been stopped") case ACTIVE => scheduler.stop(stopGracefully) // Removing the streamingSource to de-register the metrics on stop() env.metricsSystem.removeSource(streamingSource) uiTab.foreach(_.detach()) StreamingContext.setActiveContext(null) waiter.notifyStop() if (shutdownHookRef != null) { shutdownHookRefToRemove = shutdownHookRef shutdownHookRef = null } logInfo("StreamingContext stopped successfully") } } finally { // The state should always be Stopped after calling `stop()`, even if we haven't started yet state = STOPPED } } if (shutdownHookRefToRemove != null) { ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove) } // Even if we have already stopped, we still need to attempt to stop the SparkContext because // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true). if (stopSparkContext) sc.stop() } stopGracefully引數預設是false,生產環境應該設定為 true,具體做法是配置檔案中把spark.streaming.stopGeacefullyOnShutdown設定為true,這樣能保證已執行的程式執行完再停止,以保證資料處理的完整。 Spark Streaming程式是怎麼做到的呢?StreamingContext.stopShutDown呼叫了上面的stop。 StreamingContext.stopShutDown: private def stopOnShutdown(): Unit = { val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false) logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook") // Do not stop SparkContext, let its own shutdown hook stop it stop(stopSparkContext = false, stopGracefully = stopGracefully) } 在StreamingContext.start中,會加一個hook來呼叫stopShutDown: StreamingContext.start:
def start(): Unit = synchronized { state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { validate() // Start the streaming scheduler in a new thread, so that thread local properties // like call sites and job groups can be reset without affecting those of the // current thread. ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") scheduler.start() } state = StreamingContextState.ACTIVE } catch { case NonFatal(e) => logError("Error starting the context, marking it as stopped", e) scheduler.stop(false) state = StreamingContextState.STOPPED throw e } StreamingContext.setActiveContext(this) } shutdownHookRef = ShutdownHookManager.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } } 在StreamingContext啟動時,就用了鉤子,定義了在shutdown時必須呼叫有stopGracefully引數的stop方法。
在Spark Streaming應用程式中,無論使用什麼 DStream,底層實際上就是操作RDD。 從一個應用程式片段開始,進行剖析: ... val lines = ssc.socketTextStream("Master", 9999) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
rdd.foreachPartition { partitionOfRecords => { ...
}
...
2 Spark Streaming程式的停止 先看StreamingContext.top: def stop( stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true) ): Unit = synchronized { stop(stopSparkContext, false) } 真正好的停止一個Spark Streaming應用程式,應該用另一個stop: def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = { var shutdownHookRefToRemove: AnyRef = null if (AsynchronousListenerBus.withinListenerThread.value) { throw new SparkException("Cannot stop StreamingContext within listener thread of" + " AsynchronousListenerBus") } synchronized { try { state match { case INITIALIZED => logWarning("StreamingContext has not been started yet") case STOPPED => logWarning("StreamingContext has already been stopped") case ACTIVE => scheduler.stop(stopGracefully) // Removing the streamingSource to de-register the metrics on stop() env.metricsSystem.removeSource(streamingSource) uiTab.foreach(_.detach()) StreamingContext.setActiveContext(null) waiter.notifyStop() if (shutdownHookRef != null) { shutdownHookRefToRemove = shutdownHookRef shutdownHookRef = null } logInfo("StreamingContext stopped successfully") } } finally { // The state should always be Stopped after calling `stop()`, even if we haven't started yet state = STOPPED } } if (shutdownHookRefToRemove != null) { ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove) } // Even if we have already stopped, we still need to attempt to stop the SparkContext because // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true). if (stopSparkContext) sc.stop() } stopGracefully引數預設是false,生產環境應該設定為 true,具體做法是配置檔案中把spark.streaming.stopGeacefullyOnShutdown設定為true,這樣能保證已執行的程式執行完再停止,以保證資料處理的完整。 Spark Streaming程式是怎麼做到的呢?StreamingContext.stopShutDown呼叫了上面的stop。 StreamingContext.stopShutDown: private def stopOnShutdown(): Unit = { val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false) logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook") // Do not stop SparkContext, let its own shutdown hook stop it stop(stopSparkContext = false, stopGracefully = stopGracefully) } 在StreamingContext.start中,會加一個hook來呼叫stopShutDown: StreamingContext.start:
def start(): Unit = synchronized { state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { validate() // Start the streaming scheduler in a new thread, so that thread local properties // like call sites and job groups can be reset without affecting those of the // current thread. ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") scheduler.start() } state = StreamingContextState.ACTIVE } catch { case NonFatal(e) => logError("Error starting the context, marking it as stopped", e) scheduler.stop(false) state = StreamingContextState.STOPPED throw e } StreamingContext.setActiveContext(this) } shutdownHookRef = ShutdownHookManager.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } } 在StreamingContext啟動時,就用了鉤子,定義了在shutdown時必須呼叫有stopGracefully引數的stop方法。