1. 程式人生 > >Hadoop 3.1.0 ErasureCodingPolicy導致spark streaming的任務失敗問題分析

Hadoop 3.1.0 ErasureCodingPolicy導致spark streaming的任務失敗問題分析

一、ErasureCodePolicy問題,導致streaming任務退出

1、任務失敗的原因,當執行block時有uncaught 異常時,stop sparkcontext,如下

具體原始碼錯誤路徑,感興趣的,可以根據錯誤日誌跟蹤一下,這裡就不具體跟蹤了,只顯示比較重要的原始碼資訊
AsyncEventQueue

  private val dispatchThread = new Thread(s"spark-listener-group-$name") {
    setDaemon(true)
    override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
      dispatch()
    }
  }

  private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
    try {
      var next: SparkListenerEvent = eventQueue.take()
      while (next != POISON_PILL) {
        val ctx = processingTime.time()
        try {
          super.postToAll(next)
        } finally {
          ctx.stop()
        }
        eventCount.decrementAndGet()
        next = eventQueue.take()
      }
      eventCount.decrementAndGet()
    } catch {
      case ie: InterruptedException =>
        logInfo(s"Stopping listener queue $name.", ie)
    }
  }

org.apache.spark.util.Utils

  /**
   * Execute a block of code that evaluates to Unit, stop SparkContext if there is any uncaught
   * exception
   *
   * NOTE: This method is to be called by the driver-side components to avoid stopping the
   * user-started JVM process completely; in contrast, tryOrExit is to be called in the
   * spark-started JVM process .
   */
  def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) {
    try {
      block
    } catch {
      case e: ControlThrowable => throw e
      case t: Throwable =>
        val currentThreadName = Thread.currentThread().getName
        if (sc != null) {
          logError(s"uncaught error in thread $currentThreadName, stopping SparkContext", t)
          sc.stopInNewThread()
        }
        if (!NonFatal(t)) {
          logError(s"throw uncaught fatal error in thread $currentThreadName", t)
          throw t
        }
    }
  }

2、首先是個告警資訊,如下

2018-11-30 16:35:53 WARN  Client:66 - Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
2018-11-30 16:35:54 WARN  ErasureCodeNative:55 - ISA-L support is not available in your platform... using builtin-java codec where applicable

3、當spark streaming任務跑完一批後,就開始報錯,一直迴圈同樣的報錯,跑不了多久任務就掛掉了,報錯資訊如下

2018-11-30 16:37:21 ERROR AsyncEventQueue:91 - Listener EventLoggingListener threw an exception
java.lang.IllegalStateException
	at com.google.common.base.Preconditions.checkState(Preconditions.java:133)
	at org.apache.hadoop.hdfs.DFSStripedOutputStream$CellBuffers.addTo(DFSStripedOutputStream.java:238)
	at org.apache.hadoop.hdfs.DFSStripedOutputStream$CellBuffers.access$700(DFSStripedOutputStream.java:203)
	at org.apache.hadoop.hdfs.DFSStripedOutputStream.writeChunk(DFSStripedOutputStream.java:520)
	at org.apache.hadoop.fs.FSOutputSummer.writeChecksumChunks(FSOutputSummer.java:217)
	at org.apache.hadoop.fs.FSOutputSummer.flushBuffer(FSOutputSummer.java:164)
	at org.apache.hadoop.fs.FSOutputSummer.flushBuffer(FSOutputSummer.java:145)
	at org.apache.hadoop.fs.FSOutputSummer.write1(FSOutputSummer.java:136)
	at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:111)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
	at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
	at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
	at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
	at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207)
	at java.io.BufferedWriter.flushBuffer(BufferedWriter.java:129)
	at java.io.BufferedWriter.write(BufferedWriter.java:230)
	at java.io.PrintWriter.write(PrintWriter.java:456)
	at java.io.PrintWriter.write(PrintWriter.java:473)
	at java.io.PrintWriter.print(PrintWriter.java:603)
	at java.io.PrintWriter.println(PrintWriter.java:739)
	at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
	at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:143)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:143)
	at org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:164)
	at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
	at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:82)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:89)
	at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:89)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:83)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:79)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1319)
	at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:78)

二、問題分析

                報錯的 原始碼大致意思是

                1、在 onTaskEnd 的時候 會呼叫 PrintWriter  輸出 even log 資訊

                2、而 這個 PrintWriter 的write 最終 由 DFSStripedOutputStreamwriteChunk去寫

                3、寫之前有個 checkState 就是在 這裡報 的錯  

                4、相關 引數 有個 ErasureCodingPolicy 的 一個 cellSize

                5、當  輸出的 position 大於 cellsize 時 就拋這個異常了

                6、而且 spark.util.Utils 的 tryOrStopSparkContext 發現 執行 block 有 uncaught 異常 ,就stop sparkContext

                7、從這來看 任務 在新叢集 沒跑一會兒 就掛了 很大原因 就是這個 引起的

                8、EC分析:日誌輸出時,會將其分塊,然後寫入EC的緩衝池中,當緩衝池滿後,flush將其清空,導致異常當兩個很重要當原因:A、緩衝塊分的大 B、緩衝池比較小,由於這兩個原因導致異常發生

三、問題解決

                將日誌輸出改為本地,不寫HDFS,或關閉日誌輸出的EC

四、程式碼分析

報錯程式碼

類:DFSStripedOutputStream

private int addTo(int i, byte[] b, int off, int len) {
    ByteBuffer buf = this.buffers[i];
    int pos = buf.position() + len;
    Preconditions.checkState(pos <= DFSStripedOutputStream.this.cellSize);
    buf.put(b, off, len);
    return pos;
}
public ErasureCodingPolicy(String name, ECSchema schema, int cellSize, byte id) {
    Preconditions.checkNotNull(name);
    Preconditions.checkNotNull(schema);
    Preconditions.checkArgument(cellSize > 0, "cellSize must be positive");
    Preconditions.checkArgument(cellSize % 1024 == 0, "cellSize must be 1024 aligned");
    this.name = name;
    this.schema = schema;
    this.cellSize = cellSize;
    this.id = id;
}

有關EC導致的直接問題,目前還沒真正解決