Hadoop 3.1.0 ErasureCodingPolicy導致spark streaming的任務失敗問題分析
一、ErasureCodePolicy問題,導致streaming任務退出
1、任務失敗的原因,當執行block時有uncaught 異常時,stop sparkcontext,如下
具體原始碼錯誤路徑,感興趣的,可以根據錯誤日誌跟蹤一下,這裡就不具體跟蹤了,只顯示比較重要的原始碼資訊
AsyncEventQueue
private val dispatchThread = new Thread(s"spark-listener-group-$name") { setDaemon(true) override def run(): Unit = Utils.tryOrStopSparkContext(sc) { dispatch() } } private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) { try { var next: SparkListenerEvent = eventQueue.take() while (next != POISON_PILL) { val ctx = processingTime.time() try { super.postToAll(next) } finally { ctx.stop() } eventCount.decrementAndGet() next = eventQueue.take() } eventCount.decrementAndGet() } catch { case ie: InterruptedException => logInfo(s"Stopping listener queue $name.", ie) } }
org.apache.spark.util.Utils
/** * Execute a block of code that evaluates to Unit, stop SparkContext if there is any uncaught * exception * * NOTE: This method is to be called by the driver-side components to avoid stopping the * user-started JVM process completely; in contrast, tryOrExit is to be called in the * spark-started JVM process . */ def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) { try { block } catch { case e: ControlThrowable => throw e case t: Throwable => val currentThreadName = Thread.currentThread().getName if (sc != null) { logError(s"uncaught error in thread $currentThreadName, stopping SparkContext", t) sc.stopInNewThread() } if (!NonFatal(t)) { logError(s"throw uncaught fatal error in thread $currentThreadName", t) throw t } } }
2、首先是個告警資訊,如下
2018-11-30 16:35:53 WARN Client:66 - Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. 2018-11-30 16:35:54 WARN ErasureCodeNative:55 - ISA-L support is not available in your platform... using builtin-java codec where applicable
3、當spark streaming任務跑完一批後,就開始報錯,一直迴圈同樣的報錯,跑不了多久任務就掛掉了,報錯資訊如下
2018-11-30 16:37:21 ERROR AsyncEventQueue:91 - Listener EventLoggingListener threw an exception
java.lang.IllegalStateException
at com.google.common.base.Preconditions.checkState(Preconditions.java:133)
at org.apache.hadoop.hdfs.DFSStripedOutputStream$CellBuffers.addTo(DFSStripedOutputStream.java:238)
at org.apache.hadoop.hdfs.DFSStripedOutputStream$CellBuffers.access$700(DFSStripedOutputStream.java:203)
at org.apache.hadoop.hdfs.DFSStripedOutputStream.writeChunk(DFSStripedOutputStream.java:520)
at org.apache.hadoop.fs.FSOutputSummer.writeChecksumChunks(FSOutputSummer.java:217)
at org.apache.hadoop.fs.FSOutputSummer.flushBuffer(FSOutputSummer.java:164)
at org.apache.hadoop.fs.FSOutputSummer.flushBuffer(FSOutputSummer.java:145)
at org.apache.hadoop.fs.FSOutputSummer.write1(FSOutputSummer.java:136)
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:111)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207)
at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:129)
at java.io.BufferedWriter.write(BufferedWriter.java:230)
at java.io.PrintWriter.write(PrintWriter.java:456)
at java.io.PrintWriter.write(PrintWriter.java:473)
at java.io.PrintWriter.print(PrintWriter.java:603)
at java.io.PrintWriter.println(PrintWriter.java:739)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:143)
at org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:164)
at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:82)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:89)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:89)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:83)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:79)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1319)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:78)
二、問題分析
報錯的 原始碼大致意思是
1、在 onTaskEnd 的時候 會呼叫 PrintWriter 輸出 even log 資訊
2、而 這個 PrintWriter 的write 最終 由 DFSStripedOutputStream 的 writeChunk去寫
3、寫之前有個 checkState 就是在 這裡報 的錯
4、相關 引數 有個 ErasureCodingPolicy 的 一個 cellSize
5、當 輸出的 position 大於 cellsize 時 就拋這個異常了
6、而且 spark.util.Utils 的 tryOrStopSparkContext 發現 執行 block 有 uncaught 異常 ,就stop sparkContext
7、從這來看 任務 在新叢集 沒跑一會兒 就掛了 很大原因 就是這個 引起的
8、EC分析:日誌輸出時,會將其分塊,然後寫入EC的緩衝池中,當緩衝池滿後,flush將其清空,導致異常當兩個很重要當原因:A、緩衝塊分的大 B、緩衝池比較小,由於這兩個原因導致異常發生
三、問題解決
將日誌輸出改為本地,不寫HDFS,或關閉日誌輸出的EC
四、程式碼分析
報錯程式碼
類:DFSStripedOutputStream
private int addTo(int i, byte[] b, int off, int len) {
ByteBuffer buf = this.buffers[i];
int pos = buf.position() + len;
Preconditions.checkState(pos <= DFSStripedOutputStream.this.cellSize);
buf.put(b, off, len);
return pos;
}
public ErasureCodingPolicy(String name, ECSchema schema, int cellSize, byte id) {
Preconditions.checkNotNull(name);
Preconditions.checkNotNull(schema);
Preconditions.checkArgument(cellSize > 0, "cellSize must be positive");
Preconditions.checkArgument(cellSize % 1024 == 0, "cellSize must be 1024 aligned");
this.name = name;
this.schema = schema;
this.cellSize = cellSize;
this.id = id;
}
有關EC導致的直接問題,目前還沒真正解決