spark sql/hive小檔案問題
阿新 • • 發佈:2020-08-03
針對hive on mapreduce
1:我們可以通過一些配置項來使Hive在執行結束後對結果檔案進行合併:
引數詳細內容可參考官網:https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties
例如:
2:如果結果表使用了壓縮格式,則必須配合Sequence File來儲存,否則無法進行合併
3:Hadoop的歸檔檔案格式也是解決小檔案問題的方式之一。而且Hive提供了原生支援,如果使用的不是分割槽表,則可建立成外部表,並使用har://協議來指定路徑
4:對於通常的應用,使用Hive結果合併就能達到很好的效果。如果不想因此增加執行時間,可以自行編寫一些指令碼,在系統空閒時對分割槽內的檔案進行合併,也能達到目的。
5:Reducer數量的減少也即意味著結果檔案的減少,從而解決產生小檔案的問題。
但是,對於通過sparksql來處理資料的話,在conf裡新增上面引數調整是沒有作用的,不過可以通過下面的方式來規避小檔案:
1.通過使用repartition重分割槽動態調整檔案輸出個數
比如 spark.sql("sql").repartition(1).write().mode(SaveMode.Overwrite).saveAsTable("test");
2.使用Adaptive Execution動態設定shuffle partition
1 2 3 4 |
hive.merge.mapfiles 在 map-only job後合併檔案,預設 true
hive.merge.mapredfiles 在map-reduce job後合併檔案,預設 false
hive.merge.size.per.task 合併後每個檔案的大小,預設256000000
hive.merge.smallfiles.avgsize 平均檔案大小,是決定是否執行合併操作的閾值,預設16000000
|
set hive.merge.mapfiles = true:在只有map的作業結束時合併小檔案, set hive.merge.mapredfiles = true:在Map-Reduce的任務結束時合併小檔案,預設為False; set hive.merge.size.per.task = 256000000; 合併後每個檔案的大小,預設256000000 set hive.merge.smallfiles.avgsize=256000000; 當輸出檔案的平均大小小於該值時並且(mapfiles和mapredfiles為true)
1 2 3 4 5 6 7 8 9 10 11 12 13 |
SparkConf conf = new SparkConf();
conf.set( "spark.sql.adaptive.enabled" , "true" );
conf.set( "spark.sql.adaptive.shuffle.targetPostShuffleInputSize" , "67108864b" );
conf.set( "spark.sql.adaptive.join.enabled" , "true" );
conf.set( "spark.sql.autoBroadcastJoinThreshold" , "20971520" );
SparkSession spark = SparkSession
.builder()
.appName( "JointSitePlan" )
.master( "local" )
.config(conf)
.enableHiveSupport()
.getOrCreate();
|
shuffle partition是通過引數spark.sql.shuffle.partitions來指定的,預設是200,但是對於資料不大,或者資料傾斜的情況,會生成很多的小檔案,幾兆甚至幾KB大小,自適應執行則會根據引數 spark.sql.adaptive.shuffle.targetPostShuffleInputSize動態調整reducer數量.
附:
我在spark sql執行insert overwrite操作時,僅加了set spark.sql.hive.mergeFiles=true; 也可以有效阻止小檔案的產生,可能是因為我的資料量本身就比較大
參考: