1. 程式人生 > >Spark Sql效能測試及調優

Spark Sql效能測試及調優

1      問題描述

記憶體不足時group by操作失敗。

正常應該速度變慢,而不是失敗,因為還有磁碟可用

錯誤日誌:

Task:

java.io.IOException: Filesystem closed

       atorg.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:765)

       atorg.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:783)

       atorg.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:844)

       atjava.io.DataInputStream.read(DataInputStream.java:100)

       atorg.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)

       atorg.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)

       atorg.apache.hadoop.util.LineReader.readLine(LineReader.java:174)

       atorg.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246)

       atorg.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)

       atorg.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:244)

       atorg.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:210)

       atorg.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

       atorg.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)

       atscala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

       atscala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

       atorg.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:156)

       atorg.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151)

       atorg.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)

       atorg.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)

       atorg.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

       atorg.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)

       atorg.apache.spark.rdd.RDD.iterator(RDD.scala:230)

       atorg.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

       atorg.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)

       atorg.apache.spark.rdd.RDD.iterator(RDD.scala:230)

       atorg.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

       atorg.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

       atorg.apache.spark.scheduler.Task.run(Task.scala:56)

       atorg.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)

       atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

       atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

       atjava.lang.Thread.run(Thread.java:745)

2      資料

6.7 G  20.1 G /user/hive/warehouse/ldp.db/bigt2_2

Key數量:1億

總條數:1億

Shuffle write 2GB

Shuffle read 1.5GB

3      語句

4      GC測試

4.1    G1

spark-shell--num-executors 3 --executor-memory 12g --executor-cores 3 --driver-memory 2g--master yarn-client --conf spark.dynamicAllocation.enabled=false --confspark.shuffle.service.enabled=false --conf spark.shuffle.compress=true --confspark.shuffle.manager=sort --conf spark.sql.shuffle.partitions=20 --confspark.executor.extraJavaOptions="-XX:+UseG1GC -XX:+PrintFlagsFinal-XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps-XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions-XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=45" --confspark.shuffle.file.buffer.kb=10240 --conf spark.storage.memoryFraction=0.2--conf spark.shuffle.memoryFraction=0.6

stage1 + staage2   3.4 + 2.2 min

GC時間,max=25s   75%=5s

Stage1


Stage2

 

spark-shell--num-executors 3 --executor-memory 12g --executor-cores 3 --driver-memory 2g--master yarn-client --conf spark.dynamicAllocation.enabled=false --confspark.shuffle.service.enabled=false --conf spark.shuffle.compress=true --confspark.shuffle.manager=sort --conf spark.sql.shuffle.partitions=20 --confspark.executor.extraJavaOptions="-XX:+UseParallelGC-Xmn8g -XX:+PrintFlagsFinal-XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps-XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions-XX:+G1SummarizeConcMark" --conf spark.shuffle.file.buffer.kb=10240 --confspark.storage.memoryFraction=0.2 --conf spark.shuffle.memoryFraction=0.6

注:Xmn為新生代大小,且最大值和初始值相等。

stage1 + staage2   5.7 + 1.5 min

GC時間  max=4.7min  75%=15s

stage1


Stage2

4.3    結論

1.      G1比parallel的執行時間短了20%左右。

G1: 5.6min

Parallel: 7.2min

2.      且75%對比中,前者為5s,後者為15s

關於memoryFraction的調整:

由於groupby過程中沒有必要對RDD進行cache,即不需要RDD常駐記憶體,所以我們可以把記憶體節省下來用於shuffle過程中的排序等操作中,可以通過memoryFraction來調整。我們分兩次測試,以驗證該引數的變化對groupby速度的影響。

關於partition的調整:

為了減少reduce數量,我們把partition從200改成了20。後面會對該修改進行驗證測試。基本依據就是涉及到檔案操作(shuffle),越大越好。

當使用sortshuffle時,Reduce數量的減少意味著可以在不降低並行度的情況下減少相關sort buffer的數量,進而有了更多的空間增大每個sort buffer,從而提高sort速度。對於reduce端,降低reduce數量,較少了頻繁提交任務的開銷,同時也會降低reader控制代碼的數量。

使用hash shuffle時,減少partition數量也沒啥壞處

由於預設memoryFraction時,GC時間過長,我們把預設情況放在了後面,有時間就測測,唯一的目的也就是挑戰一下極端記憶體的情況,當然了也熟悉一下shuffle過程中的其他引數設定。

並調整file buffer大小為10MB

spark-shell--num-executors 3 --executor-memory 12g --executor-cores 3 --driver-memory 2g--master yarn-client --conf spark.dynamicAllocation.enabled=false --confspark.shuffle.service.enabled=false --conf spark.shuffle.compress=true --confspark.shuffle.manager=sort --conf spark.sql.shuffle.partitions=20 --confspark.executor.extraJavaOptions="-XX:+UseG1GC -XX:+PrintFlagsFinal-XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps-XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions-XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=45" --confspark.shuffle.file.buffer.kb=10240 --conf spark.storage.memoryFraction=0.2--conf spark.shuffle.memoryFraction=0.6

stage1 + staage2   3.4 + 2.2 min

GC時間,max=25s   75%=5s

Stage1


Stage2

 

spark-shell--num-executors 3 --executor-memory 12g --executor-cores 3 --driver-memory 2g--master yarn-client --conf spark.dynamicAllocation.enabled=false --confspark.shuffle.service.enabled=false --conf spark.shuffle.compress=true --confspark.shuffle.manager=sort --conf spark.sql.shuffle.partitions=20 --confspark.executor.extraJavaOptions="-XX:+UseParallelGC-Xmn8g -XX:+PrintFlagsFinal-XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps-XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions-XX:+G1SummarizeConcMark" --conf spark.shuffle.file.buffer.kb=10240 --confspark.storage.memoryFraction=0.2 --conf spark.shuffle.memoryFraction=0.6

注:Xmn為新生代大小,且最大值和初始值相等。

stage1 + staage2   5.7 + 1.5 min

GC時間  max=4.7min  75%=15s

stage1


Stage2

spark-shell--num-executors 1 --executor-memory 32g --executor-cores 8 --driver-memory 2g--master yarn-client --conf spark.dynamicAllocation.enabled=false --confspark.shuffle.service.enabled=false --conf spark.shuffle.compress=true --confspark.shuffle.manager=sort --conf spark.sql.shuffle.partitions=20 --confspark.executor.extraJavaOptions="-XX:+UseG1GC -XX:+PrintFlagsFinal-XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps-XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions-XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=45" --confspark.shuffle.file.buffer.kb=32 --conf spark.storage.memoryFraction=0.6 --confspark.shuffle.memoryFraction=0.2

第一批task執行時間大於10min,且出現超時現象。

 

stage1 + staage2 18 + 3.1 min

5.3    結論

變化詳情:0.6(storage)-> 0.2   0.2(shuffle)->0.6

增大shuffle.memoryFraction之後,執行時間相當於預設情況的1/3。

此處我們使用G1進行GC

spark-shell--num-executors 3 --executor-memory 12g --executor-cores 3 --driver-memory 2g--master yarn-client --conf spark.dynamicAllocation.enabled=false --confspark.shuffle.service.enabled=false --conf spark.shuffle.compress=true --confspark.shuffle.manager=sort --conf spark.sql.shuffle.partitions=NUM--conf spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:+PrintFlagsFinal-XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps-XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions-XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=45" --confspark.shuffle.file.buffer.kb=10240 --conf spark.storage.memoryFraction=0.2--conf spark.shuffle.memoryFraction=0.6

stage1 + staage2   35 + 3.7 min

GC max=8.3min 75% = 15s

stage1


Stage2

 

(同4.1 GC測試-G1)

stage1 + staage2   3.4 + 2.2 min

GC時間,max=25s   75%=5s

6.3    結論

該partition為20時的執行時間相當於200時的1/8。