SparkStreaming local模式部署下遇到的requirement failed的問題
最近在local模式下部署了一個數據篩選的sparkstreaming程式,主要是對一些使用者的交易資料進行實時處理,環境配置如下:
spark :2.1.0,目前是採用單個節點的Standalone部署模式
scala:2.10.4
jdk:1.7.079
該程式在執行時沒有出現異常,但是在執行一段時間之後會報
IllegalArgumentException:Requirement failed的異常。
具體異常如下所示:
17/12/23 04:26:47 INFO BlockManager: Removing RDD 13276
17/12/23 04:26:47 INFO MapPartitionsRDD: Removing RDD 13279 from persistence list
17/12/23 04:26:47 INFO BlockManager: Removing RDD 13279
17/12/23 04:26:47 ERROR JobScheduler: Error in job generator
java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:212)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker .cleanupOldBatches(ReceivedBlockTracker.scala:168)
at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:233)
at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:274)
at org.apache.spark.streaming.scheduler .JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:184)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:212)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:168)
at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:233)
at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:274)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:184)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
17/12/23 04:26:47 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
17/12/23 04:26:47 INFO ReceiverTracker: ReceiverTracker stopped
17/12/23 04:26:47 INFO JobGenerator: Stopping JobGenerator immediately
17/12/23 04:26:47 INFO RecurringTimer: Stopped timer for JobGenerator after time 1513974470000
17/12/23 04:26:47 INFO JobGenerator: Stopped JobGenerator
17/12/23 04:26:47 INFO JobScheduler: Stopped JobScheduler
17/12/23 04:26:47 INFO StreamingContext: StreamingContext stopped successfully
17/12/23 04:26:47 INFO SparkContext: Invoking stop() from shutdown hook
17/12/23 04:26:47 INFO SparkUI: Stopped Spark web UI at http://*.*.*.*:4040
17/12/23 04:26:47 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/12/23 04:26:47 INFO MemoryStore: MemoryStore cleared
17/12/23 04:26:47 INFO BlockManager: BlockManager stopped
17/12/23 04:26:47 INFO BlockManagerMaster: BlockManagerMaster stopped
17/12/23 04:26:47 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/12/23 04:26:47 INFO SparkContext: Successfully stopped SparkContext
17/12/23 04:26:47 INFO ShutdownHookManager: Shutdown hook called
17/12/23 04:26:47 INFO ShutdownHookManager: Deleting directory /tmp/spark-21c7303b-7c66-4d0e-a6e6-4731b2770d92
通過StackTrace可以瞭解到該異常出現的位置,分別是下圖所示:
可以看到該方法是在清除元資料時通過ClearMetaData方法呼叫的,傳入cleanupOldBlocksAndBatches的引數為當前時間(time)減入參的時間間隔(maxRemeberDuration)也就是獲取該批次開始處理的時刻,結合原始碼中對於cleanupOldBatches函式的解釋,“清除舊的資料塊的資訊,如果等待完成標誌為true,這個方法將只會在檔案清理後返回“。
異常也是在該方法中require函式中丟擲的,clock.getTimeMillis方法的返回值為獲取ReciveTracker類的初始化時間,也就是說該異常是在資料處理後,在清除元資料時發現理論上的批次開始處理時間已經大於該類的初始化時間才會報錯,由此可以推斷出是處理時間過長所導致的。
所以解決辦法:由於部署的環境單核CPU,2G記憶體所以通過減少了BlockInterval的值直接減少了生成的block的數量,減少接收到資料後生成Job的開銷,並且對於接受資料的速率通過“spark.streaming.backpressure”引數設定為true成功解決問題。