1. 程式人生 > >spark序列化異常和Executor的僵死問題

spark序列化異常和Executor的僵死問題

在Spark上執行hive語句的時候,出現類似於如下的異常:
  1. org.apache.spark.SparkDriverExecutionException: Execution error
  2.     at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:849)
  3.     at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1231)
  4.     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
  5.     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
  6.     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
  7.     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
  8.     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
  9.     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  10.     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  11.     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  12.     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  13. Caused by: java.lang.ClassCastException: scala.collection.mutable.HashSet cannot be cast to scala.collection.mutable.BitSet
  14.     at org.apache.spark.sql.execution.BroadcastNestedLoopJoin$anonfun$7.apply(joins.scala:336)
  15.     at org.apache.spark.rdd.RDD$anonfun$19.apply(RDD.scala:813)
  16.     at org.apache.spark.rdd.RDD$anonfun$19.apply(RDD.scala:810)
  17.     at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
  18.     at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:845)
複製程式碼
排查其前後的日誌,發現大都是序列化的東西:
  1. 14/08/13 11:10:01 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Serialized task 8.0:3 as 20849 bytes in 0 ms
  2. 14/08/13 11:10:01 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Finished TID 813 in 25 ms on sparktest0 (progress: 3/200)
複製程式碼
而在spark-default.conf中,事先設定了序列化方式為Kryo:
  1. spark.serializer org.apache.spark.serializer.KryoSerializer
複製程式碼
根據異常資訊,可見是HashSet轉為BitSet型別轉換失敗,Kryo把鬆散的HashSet轉換為了緊湊的BitSet,把序列化方式註釋掉之後,任務可以正常執行。難道Spark的Kryo序列化做的還不到位?此問題需要進一步跟蹤。 9 Executor僵死問題     執行一個Spark任務,發現其執行速度遠遠慢於執行同樣SQL語句的Hive的執行,甚至出現了OOM的錯誤,最後卡住達幾小時!並且Executor程序在瘋狂GC。     擷取其一Task的OOM異常資訊: 可以看到這是在序列化過程中發生的OOM。根據節點資訊,找到對應的Executor程序,觀察其Jstack資訊:
  1. Thread 36169: (state = BLOCKED)
  2. - java.lang.Long.valueOf(long) @bci=27, line=557 (Compiled frame)
  3. - com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=5, line=113 (Compiled frame)
  4. - com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=4, line=103 (Compiled frame)
  5. - com.esotericsoftware.kryo.Kryo.readClassAndObject(com.esotericsoftware.kryo.io.Input) @bci=158, line=732 (Compiled frame)
  6. - com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=158, line=338 (Compiled frame)
  7. - com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=4, line=293 (Compiled frame)
  8. - com.esotericsoftware.kryo.Kryo.readObject(com.esotericsoftware.kryo.io.Input, java.lang.Class, com.esotericsoftware.kryo.Serializer) @bci=136, line=651 (Compiled frame)
  9. - com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(com.esotericsoftware.kryo.io.Input, java.lang.Object) @bci=143, line=605 (Compiled frame)
  10. - com.esotericsoftware.kryo.serializers.FieldSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=44, line=221 (Compiled frame)
  11. - com.esotericsoftware.kryo.Kryo.readObject(com.esotericsoftware.kryo.io.Input, java.lang.Class, com.esotericsoftware.kryo.Serializer) @bci=136, line=651 (Compiled frame)
  12. - com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(com.esotericsoftware.kryo.io.Input, java.lang.Object) @bci=143, line=605 (Compiled frame)
  13. - com.esotericsoftware.kryo.serializers.FieldSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=44, line=221 (Compiled frame)
  14. - com.esotericsoftware.kryo.Kryo.readClassAndObject(com.esotericsoftware.kryo.io.Input) @bci=158, line=732 (Compiled frame)
  15. - org.apache.spark.serializer.KryoDeserializationStream.readObject(scala.reflect.ClassTag) @bci=8, line=118 (Compiled frame)
  16. - org.apache.spark.serializer.DeserializationStream$anon$1.getNext() @bci=10, line=125 (Compiled frame)
  17. - org.apache.spark.util.NextIterator.hasNext() @bci=16, line=71 (Compiled frame)
  18. - org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext() @bci=4, line=1031 (Compiled frame)
  19. - scala.collection.Iterator$anon$13.hasNext() @bci=4, line=371 (Compiled frame)
  20. - org.apache.spark.util.CompletionIterator.hasNext() @bci=4, line=30 (Compiled frame)
  21. - org.apache.spark.InterruptibleIterator.hasNext() @bci=22, line=39 (Compiled frame)
  22. - scala.collection.Iterator$anon$11.hasNext() @bci=4, line=327 (Compiled frame)
  23. - org.apache.spark.sql.execution.HashJoin$anonfun$execute$1.apply(scala.collection.Iterator, scala.collection.Iterator) @bci=14, line=77 (Compiled frame)
  24. - org.apache.spark.sql.execution.HashJoin$anonfun$execute$1.apply(java.lang.Object, java.lang.Object) @bci=9, line=71 (Interpreted frame)
  25. - org.apache.spark.rdd.ZippedPartitionsRDD2.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=48, line=87 (Interpreted frame)
  26. - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=262 (Interpreted frame)
複製程式碼
有大量的BLOCKED執行緒,繼續觀察GC資訊,發現大量的FULL GC。     分析,在插入Hive表的時候,實際上需要寫HDFS,在此過程的HashJoin時,伴隨著大量的Shuffle寫操作,JVM的新生代不斷GC,Eden Space寫滿了就往Survivor Space寫,同時超過一定大小的資料會直接寫到老生代,當新生代寫滿了之後,也會把老的資料搞到老生代,如果老生代空間不足了,就觸發FULL GC,還是空間不夠,那就OOM錯誤了,此時執行緒被Blocked,導致整個Executor處理資料的程序被卡住。 當處理大資料的時候,如果JVM配置不當就容易引起上述問題。解決的方法就是增大Executor的使用記憶體,合理配置新生代和老生代的大小,可以將老生代的空間適當的調大點