1. 程式人生 > 其它 >Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 134217728

Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 134217728

Kryo序列化緩衝區大小導致任務失敗的問題

問題報錯

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/data/hadoop/yarn/local/filecache/185/spark2-hdp-yarn-archive.tar.gz/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hdp/3.1.0.0-78/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
21/07/07 16:58:15 WARN TaskSetManager: Lost task 88.0 in stage 3.0 (TID 1007, node181, executor 1): org.apache.spark.SparkException: Kryo serialization failed: Buffer overflow. Available: 0, required: 134217728. To avoid this, increase spark.kryoserializer.buffer.max value.
	at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:350)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:393)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 134217728
	at com.esotericsoftware.kryo.io.Output.require(Output.java:163)
	at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:246)
	at com.esotericsoftware.kryo.io.Output.write(Output.java:209)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.write(UnsafeRow.java:676)
	at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:505)
	at com.esotericsoftware.kryo.serializers.DefaultSerializers$KryoSerializableSerializer.write(DefaultSerializers.java:503)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:366)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:307)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
	at com.twitter.chill.Tuple3Serializer.write(TupleSerializers.scala:50)
	at com.twitter.chill.Tuple3Serializer.write(TupleSerializers.scala:45)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:366)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:307)
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
	at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:347)
	... 4 more
![image](https://img2020.cnblogs.com/blog/1395360/202107/1395360-20210707194728031-1780283605.png)

問題描述

spark-submit提交任務的時候使用kryo序列化引數,程式在進行計算的過程中出來報錯序列化緩衝大小的問題,任務提交程式碼:

spark-submit \
    --class cn.yd.spark.logAnalyze.LogAnalyze \
    --name 'log_analyze' \
    --queue offline \
    --master yarn \
    --deploy-mode cluster \
    --conf spark.sql.shuffle.partitions=500 \
    --conf spark.default.parallelism=500 \
    --conf spark.sql.parquet.compression.codec=snappy \
    --conf spark.kryoserializer.buffer.max=128m \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    --conf spark.locality.wait.node=6 \
    --num-executors 4 \
    --executor-cores 1 \
    --driver-memory 5g \
    --executor-memory 2g \
    --conf 'spark.driver.extraJavaOptions= -XX:+UseCodeCacheFlushing -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+ParallelRefProcEnabled  -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/data/log/offline/store_sort_week_driver_error.dump' \
    --conf 'spark.executor.extraJavaOptions=-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+ParallelRefProcEnabled  -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/data/log/offline/store_sort_week_executor_error.dump' \
    /data/xxx.jar

問題原因

該問題是由於對spark.kryoserializer.buffer.max=128m該引數值設定過小導致,由於序列化寫資料的時候需要對該引數進行校驗,如果要寫入的資料大於設定的最大值則會丟擲該異常
原始碼

// 檢視構造方法,這裡會對buffer的最大值進行設定
/** Creates a new Output for writing to a byte array.
 * @param bufferSize The initial size of the buffer.
 * @param maxBufferSize The buffer is doubled as needed until it exceeds maxBufferSize and an exception is thrown. Can be -1
 *           for no maximum. */
public Output (int bufferSize, int maxBufferSize) {
	if (maxBufferSize < -1) throw new IllegalArgumentException("maxBufferSize cannot be < -1: " + maxBufferSize);
	this.capacity = bufferSize;
	this.maxCapacity = maxBufferSize == -1 ? Integer.MAX_VALUE : maxBufferSize;
	buffer = new byte[bufferSize];
}


// 從這裡可看出來當capacity == maxCapacity,則會丟擲該異常
/** @return true if the buffer has been resized. */
	protected boolean require (int required) throws KryoException {
		if (capacity - position >= required) return false;
		if (required > maxCapacity)
			throw new KryoException("Buffer overflow. Max capacity: " + maxCapacity + ", required: " + required);
	flush();
	while (capacity - position < required) {
		if (capacity == maxCapacity)
		throw new KryoException("Buffer overflow. Available: " + (capacity - position) + ", required: " + required);
		// Grow buffer.
		if (capacity == 0) capacity = 1;
		capacity = Math.min(capacity * 2, maxCapacity);
		if (capacity < 0) capacity = maxCapacity;
		byte[] newBuffer = new byte[capacity];
		System.arraycopy(buffer, 0, newBuffer, 0, position);
		buffer = newBuffer;
	}
	return true;
}

解決方案

  1. 調整序序列化引數的最大值,比如1G
  2. 不適用序列化方式,在提交指令碼中去除著兩個引數