Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 134217728
阿新 • • 發佈:2021-07-07
Kryo序列化緩衝區大小導致任務失敗的問題
問題報錯
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/data/hadoop/yarn/local/filecache/185/spark2-hdp-yarn-archive.tar.gz/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/hdp/3.1.0.0-78/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 21/07/07 16:58:15 WARN TaskSetManager: Lost task 88.0 in stage 3.0 (TID 1007, node181, executor 1): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 134217728. To avoid this, increase spark.kryoserializer.buffer.max value. at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:350) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:393) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 134217728 at com.esotericsoftware.kryo.io.Output.require(Output.java:163) at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:246) at com.esotericsoftware.kryo.io.Output.write(Output.java:209) at org.apache.spark.sql.catalyst.expressions.UnsafeRow.write(UnsafeRow.java:676) at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:505) at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:503) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:366) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:307) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) at com.twitter.chill.Tuple3Serializer.write(TupleSerializers.scala:50) at com.twitter.chill.Tuple3Serializer.write(TupleSerializers.scala:45) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:366) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:307) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:347) ... 4 more ![image](https://img2020.cnblogs.com/blog/1395360/202107/1395360-20210707194728031-1780283605.png)
問題描述
spark-submit提交任務的時候使用kryo序列化引數,程式在進行計算的過程中出來報錯序列化緩衝大小的問題,任務提交程式碼:
spark-submit \ --class cn.yd.spark.logAnalyze.LogAnalyze \ --name 'log_analyze' \ --queue offline \ --master yarn \ --deploy-mode cluster \ --conf spark.sql.shuffle.partitions=500 \ --conf spark.default.parallelism=500 \ --conf spark.sql.parquet.compression.codec=snappy \ --conf spark.kryoserializer.buffer.max=128m \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.locality.wait.node=6 \ --num-executors 4 \ --executor-cores 1 \ --driver-memory 5g \ --executor-memory 2g \ --conf 'spark.driver.extraJavaOptions= -XX:+UseCodeCacheFlushing -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+ParallelRefProcEnabled -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/data/log/offline/store_sort_week_driver_error.dump' \ --conf 'spark.executor.extraJavaOptions=-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+ParallelRefProcEnabled -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/data/log/offline/store_sort_week_executor_error.dump' \ /data/xxx.jar
問題原因
該問題是由於對spark.kryoserializer.buffer.max=128m
該引數值設定過小導致,由於序列化寫資料的時候需要對該引數進行校驗,如果要寫入的資料大於設定的最大值則會丟擲該異常
原始碼:
// 檢視構造方法,這裡會對buffer的最大值進行設定 /** Creates a new Output for writing to a byte array. * @param bufferSize The initial size of the buffer. * @param maxBufferSize The buffer is doubled as needed until it exceeds maxBufferSize and an exception is thrown. Can be -1 * for no maximum. */ public Output (int bufferSize, int maxBufferSize) { if (maxBufferSize < -1) throw new IllegalArgumentException("maxBufferSize cannot be < -1: " + maxBufferSize); this.capacity = bufferSize; this.maxCapacity = maxBufferSize == -1 ? Integer.MAX_VALUE : maxBufferSize; buffer = new byte[bufferSize]; } // 從這裡可看出來當capacity == maxCapacity,則會丟擲該異常 /** @return true if the buffer has been resized. */ protected boolean require (int required) throws KryoException { if (capacity - position >= required) return false; if (required > maxCapacity) throw new KryoException("Buffer overflow. Max capacity: " + maxCapacity + ", required: " + required); flush(); while (capacity - position < required) { if (capacity == maxCapacity) throw new KryoException("Buffer overflow. Available: " + (capacity - position) + ", required: " + required); // Grow buffer. if (capacity == 0) capacity = 1; capacity = Math.min(capacity * 2, maxCapacity); if (capacity < 0) capacity = maxCapacity; byte[] newBuffer = new byte[capacity]; System.arraycopy(buffer, 0, newBuffer, 0, position); buffer = newBuffer; } return true; }
解決方案
- 調整序序列化引數的最大值,比如1G
- 不適用序列化方式,在提交指令碼中去除著兩個引數