【spark】儲存資料到hdfs,自動判斷合理分塊數量(repartition和coalesce)(一)
本人菜鳥一隻,也處於學習階段,如果有什麼說錯的地方還請大家批評指出!
首先我想說明下該文章是幹嘛的,該文章粗略介紹了hdfs儲存資料檔案塊策略和spark的repartition、coalesce兩個運算元的區別,是為了下一篇文章的自動判斷合理分塊數做知識的鋪墊,如果對於這部分知識已經瞭解,甚至精通的同學,可以直接跳到該系列的第二篇文章!
背景:
spark讀取Hive表或者HDFS甚至各種框架的資料,生成了Rdd或者Dataset(或者DataFrame,但是在spark2以後的版本DataFrame這個物件已經被取消了,全部用Dataset替代),經過一大堆邏輯處理之後,將資料又存回了HDFS。但是這時候就會遇到一個問題,就是這份資料在HDFS上會被分成幾份,個數是否合理?
原因:
為什麼要考慮這個問題?是因為HDFS不適合存太多小檔案,因為每個檔案塊都會有一段地址存在namenode的記憶體中,如果太多小檔案或者目錄太深,會大大降低HDFS儲存的效能,而且在某些極端的情況下,spark甚至會儲存很多空檔案,這些空檔案還會影響之後的計算(因為需要啟動一個map來讀取這個空檔案,消耗了資源和時間,卻在做無用功)
例如:
如圖,我們想要的是,每個檔案塊的大小都在幾十M甚至200M,300M之間(不需要十分的精確)。
但是每個檔案也不能太大,為什麼?
壓縮格式 | 工具 |
演算法 | 副檔名 | 是否可切分 |
DEFLATE | 無 | DEFLATE | .deflate | 否 |
Gzip | gzip | DEFLATE |
.gz | 否 |
bzip2 | bzip2 | bzip2 | .bz2 | 是 |
LZO | lzop | LZO | .lzo | 否 |
LZ4 | 無 | LZ4 | .lz4 | 否 |
Snappy | 無 | Snappy | .snappy | 否 |
該表格來自於:(作者:瓜牛呱呱)https://blog.csdn.net/lin_wj1995/article/details/78967486
解釋下:hadoop的MR(MapReduce)和spark的引擎在讀取資料的時候,一個block可能會使用多個執行緒一起讀。
例如:
執行緒1讀取偏移量:0+10000的資料
執行緒2讀取偏移量:10001+20000的資料
.....
執行緒7讀取偏移量:60001+62043的資料
如圖(原諒我資料不夠大,只有一個執行緒在讀,spark預設是128M啟動一個執行緒讀,也就是說如果檔案有300M,那麼spark會啟動3個核心來同時讀取這個資料):
以此類推,來加快資料讀取的速度,但是問題來了,如果HDFS上的資料做了壓縮,那就只有bzip2的壓縮格式才可以多個執行緒加速讀的,所以如果一個檔案塊非常大的話,只能用一個執行緒來讀這個非常大的檔案,讀取資料的時間就會非常的長,延長任務執行時間。因此,相對合理的方式就是每個檔案塊大小在128M之間,這樣不管是否壓縮,讀取資料的時候都不會受到太大的影響。
前言:
來看2個API(repartition和coalesce)和兩個問題
1、repartition和coalesce的區別
-1.先說shuffle:
spark有個階段叫shuffle,他的shuffle和MR的shuffle不一樣,但是都會經歷重分割槽的過程,如何判斷什麼時候有shuffle階段呢?看看程式碼中是否有這些過程就好了,例如:去重,join,group by ,各種聚合函式(reduceByKey等),排序。沒有shuffle的spark程式碼,就相當於只對資料做簡單的過濾,對應到MR上,就是隻有map邏輯,沒有reduce邏輯,所以程式碼中shuffle越多,消耗的資源也越多,速度也會越慢,因為spark的shuffle過程和MR的reduce之前的shuffle過程一樣,資料需要在不同的節點之間傳遞。
-2.repartition和coalesce的區別:
repartition會觸發shuffle,coalesce不會,所以repartition效能比coalesce差!
看spark原始碼:
def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
Repartition(numPartitions, shuffle = true, logicalPlan)
}
def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
Repartition(numPartitions, shuffle = false, logicalPlan)
}
-3.原理:repartition將所有資料重新分割槽,coalesce是單純將不同分割槽的資料直接合併到一個分割槽裡。直觀來看,repartition之後,每個檔案塊大小會基本一樣,coalesce之後,每個檔案塊大小一般都是不一樣的,甚至會差很多。
-4.總結:repartition一般是用來增加分割槽數(當然也可以減少),coalesce只能用來減少分割槽數。所以如果不介意儲存的檔案塊大小不一樣,可以使用coalesce來減少分割槽數,儲存的時候一個分割槽就會生成一個檔案塊。
2、spark檔案塊什麼時候增加的,增加有什麼用?
接下來的文字描述,是針對於sparksql(也就是把資料載入成Dataset之後再處理)來說的。
-1.增加分割槽數,可以增加並行度,當spark申請的cpu核心足夠的情況下,可以同時跑不同分割槽的資料(因為一個分割槽的資料,只能由一個核心來跑,不能多個)
-2.手動增加,使用repartition來將所有資料打散
-3.自動增加,spark有個引數:spark.sql.shuffle.partitions,預設值為200。也就是說當觸發shuffle邏輯的時候,資料會自動分為200個分割槽執行,但是在資料量大的情況下,每個分割槽的資料量太大,而且假設spark申請到了300個核心,但是因為分割槽數只有200,會導致只有200個核心在執行,另外100個核心在空轉(雖然佔用資源但是卻不幹活)。所以可以將該引數設定為500甚至更大,來增加分割槽數和並行度。
3、spark檔案塊在儲存前如何減少?
在上一個(計算的)步驟,我們將資料增加分割槽,一個分割槽會生成一個檔案塊,如果沒有做任何修改,並且spark.sql.shuffle.partitions引數值設定為200,那麼不管這個資料多大,都會生成200個檔案塊。所以減少檔案塊的方法就是通過在資料儲存之前呼叫repartition或者coalesce這兩個API來減少或者增加檔案塊。
例如:
Dataset<Row> tb = spark.table("資料庫.表名")
.groupBy(col("日期"))
.agg(countDistinct(col("id")).as("uv"),count(lit(1)).as("pv"))
.select("日期","uv","pv");
//這裡也可以使用coalesce來代替repartition
tb.repartition(1)
.write()
.partitionBy("日期")
.mode(SaveMode.Overwrite)
.format("hive").saveAsTable("資料庫.新的表名");