1. 程式人生 > 實用技巧 >spark sql/hive小檔案問題

spark sql/hive小檔案問題

針對hive on mapreduce 1:我們可以通過一些配置項來使Hive在執行結束後對結果檔案進行合併: 引數詳細內容可參考官網:https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties
1 2 3 4 hive.merge.mapfiles 在 map-only job後合併檔案,預設true hive.merge.mapredfiles 在map-reduce job後合併檔案,預設false hive.merge.size.per.task 合併後每個檔案的大小,預設256000000 hive.merge.smallfiles.avgsize 平均檔案大小,是決定是否執行合併操作的閾值,預設16000000
例如:
set hive.merge.mapfiles = true:在只有map的作業結束時合併小檔案,
set hive.merge.mapredfiles = true:在Map-Reduce的任務結束時合併小檔案,預設為False;
set hive.merge.size.per.task = 256000000; 合併後每個檔案的大小,預設256000000
set hive.merge.smallfiles.avgsize=256000000; 當輸出檔案的平均大小小於該值時並且(mapfiles和mapredfiles為true)
2:如果結果表使用了壓縮格式,則必須配合Sequence File來儲存,否則無法進行合併 3:Hadoop的歸檔檔案格式也是解決小檔案問題的方式之一。而且Hive提供了原生支援,如果使用的不是分割槽表,則可建立成外部表,並使用har://協議來指定路徑 4:對於通常的應用,使用Hive結果合併就能達到很好的效果。如果不想因此增加執行時間,可以自行編寫一些指令碼,在系統空閒時對分割槽內的檔案進行合併,也能達到目的。 5:Reducer數量的減少也即意味著結果檔案的減少,從而解決產生小檔案的問題。 但是,對於通過sparksql來處理資料的話,在conf裡新增上面引數調整是沒有作用的,不過可以通過下面的方式來規避小檔案: 1.通過使用repartition重分割槽動態調整檔案輸出個數   比如 spark.sql("sql").repartition(1).write().mode(SaveMode.Overwrite).saveAsTable("test"); 2.使用Adaptive Execution動態設定shuffle partition
1 2 3 4 5 6 7 8 9 10 11 12 13 SparkConf conf =newSparkConf(); conf.set("spark.sql.adaptive.enabled","true"); conf.set("spark.sql.adaptive.shuffle.targetPostShuffleInputSize","67108864b"); conf.set("spark.sql.adaptive.join.enabled","true"); conf.set("spark.sql.autoBroadcastJoinThreshold","20971520"); SparkSession spark = SparkSession
.builder() .appName("JointSitePlan") .master("local") .config(conf) .enableHiveSupport() .getOrCreate();

  shuffle partition是通過引數spark.sql.shuffle.partitions來指定的,預設是200,但是對於資料不大,或者資料傾斜的情況,會生成很多的小檔案,幾兆甚至幾KB大小,自適應執行則會根據引數 spark.sql.adaptive.shuffle.targetPostShuffleInputSize動態調整reducer數量.

附:

我在spark sql執行insert overwrite操作時,僅加了set spark.sql.hive.mergeFiles=true; 也可以有效阻止小檔案的產生,可能是因為我的資料量本身就比較大

參考:

https://www.cnblogs.com/zz-ksw/p/11293891.html

https://blog.csdn.net/a2011480169/article/details/100401858