StreamingListener記錄(spark-2.2.0)
阿新 • • 發佈:2018-01-31
let report bmi ssi -s completed listen exe called
記錄spark streaming 中監聽器StreamingListener的相關信息
概述
StreamingListener 是針對spark streaming的各個階段的事件監聽機制
在用法上跟SparkListener很類似,但是有些細節區別
代碼記錄
//需要監聽spark streaming中各個階段的事件只需實現這個特質中對應的事件函數即可 //本身既有註釋說明 trait StreamingListener { /** Called when the streaming has been started */ /** streaming 啟動的事件 */ def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { } /** Called when a receiver has been started */ /** 接收啟動事件 */ def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { } /** Called when a receiver has reported an error */ def onReceiverError(receiverError: StreamingListenerReceiverError) { } /** Called when a receiver has been stopped */ def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { } /** Called when a batch of jobs has been submitted for processing. */ /** 每個批次提交的事件 */ def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { } /** Called when processing of a batch of jobs has started. */ /** 每個批次啟動的事件 */ def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { } /** Called when processing of a batch of jobs has completed. */ /** 每個批次完成的事件 */ def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { } /** Called when processing of a job of a batch has started. */ def onOutputOperationStarted( outputOperationStarted: StreamingListenerOutputOperationStarted) { } /** Called when processing of a job of a batch has completed. */ def onOutputOperationCompleted( outputOperationCompleted: StreamingListenerOutputOperationCompleted) { } }
示例代碼
package z.cloud.test.listener class MyTestStreamingListener(ssc : StreamingContext) extends StreamingListener with Logging{ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { val batchInfo = batchCompleted.batchInfo val execTime = batchInfo.processingDelay.getOrElse(0L) val schedulingTime = batchInfo.schedulingDelay.getOrElse(0L) logInfo(s"執行時間: $execTime 調度延時 : $schedulingTime") } }
示例代碼應用
//streamingListener不需要在配置中設置,可以直接添加到streamingContext中 object My{ def main(args : Array[String]) : Unit = { val sparkConf = new SparkConf() val ssc = new StreamingContext(sparkConf,Seconds(20)) ssc.addStreamingListener(new MyTestStreamingListener(ssc)) .... } }
StreamingListener記錄(spark-2.2.0)