1. 程式人生 > 其它 >spark sql合併小檔案_Spark SQL小檔案問題在OPPO的解決方案

spark sql合併小檔案_Spark SQL小檔案問題在OPPO的解決方案

技術標籤:spark sql合併小檔案

Spark SQL小檔案是指檔案大小顯著小於hdfs block塊大小的的檔案。過於繁多的小檔案會給HDFS帶來很嚴重的效能瓶頸,對任務的穩定和叢集的維護會帶來極大的挑戰。

一般來說,通過Hive排程的MR任務都可以簡單設定如下幾個小檔案合併的引數來解決任務產生的小檔案問題:

sethive.merge.mapfiles=true;
sethive.merge.mapredfiles=true;
sethive.merge.size.per.task=xxxx;
sethive.merge.smallfiles.avgsize=xxx;

然而在我們將離線排程任務逐步從Hive遷移到Spark的過程中,由於Spark本身並不支援小檔案合併功能,小檔案問題日益突出,對叢集穩定性造成很大影響,一度阻礙了我們的遷移工作。

為了解決小檔案問題,我們經歷了從開始的不斷調整引數到後期的程式碼開發等不同階段,這裡給大家做一個簡單的分享。

1. Spark為什麼會產生小檔案

Spark生成的檔案數量直接取決於RDD裡partition的數量和表分割槽數量。注意這裡的兩個分割槽概念並不相同,RDD的分割槽與任務並行度相關,而表分割槽則是Hive的分割槽數目。生成的檔案數目一般是RDD分割槽數和表分割槽的乘積。因此,當任務並行度過高或者分割槽數目很大時,很容易產生很多的小檔案。

f7a2169e54aa093a341656c7bbf424f5.png

圖1:Spark RDD分割槽數

因此,如果需要從引數調整來減少生成的檔案數目,就只能通過減少最後一個階段RDD的分割槽數來達到了(減少分割槽數目限制於歷史資料和上下游關係,難以修改)

2. 基於社群版本的引數進行調整的方案

2.1不含有Shuffle運算元的簡單靜態分割槽SQL

這樣的SQL比較簡單,主要是filter上游表一部分資料寫入到下游表,或者是兩張表簡單UNION起來的任務,這種任務的分割槽數目主要是由讀取檔案時Partition數目決定的。

  • 因為從Spark 2.4以來,對Hive orc表和parquet支援已經很不錯了,為了加快執行速率,我們開啟了將Hive orc/parquet表自動轉為DataSource的引數。對於這種DataSource表的型別,partition數目主要是由如下三個引數控制其關係。

spark.sql.files.maxPartitionBytes;
spark.sql.files.opencostinbytes;
spark.default.parallelism;

其關係如下圖所示,因此可以通過調整這三個引數來輸入資料的分片進行調整:

39dd42f921cb511c41875d085ae67c75.png

  • 而非DataSource表,使用CombineInputFormat來讀取資料,因此主要是通過MR引數來進行分片調整:

    mapreduce.input.fileinputformat.split.minsize

雖然我們可以通過調整輸入資料的分片來對最終檔案數量進行調整,但是這樣的調整是不穩定的,上游資料大小發生一些輕微的變化,就可能帶來引數的重新適配。

為了簡單粗暴的解決這個問題,我們對這樣的SQL加了repartition的hint,引入了新的shuffle,保證檔案數量是一個固定值。

d19fa3c8f83e45ab05f3f3bccb7f4566.png

2.2帶有Shuffle運算元的靜態分割槽任務

在ISSUE SPARK-9858中,引入了一個新的引數:

spark.sql.adaptive.shuffle.targetPostShuffleInputSize,

後期基於spark adaptive又對這個引數做了進一步增強,可以動態的調整partition數量,儘可能保證每個task處理targetPostShuffleInputSize大小的資料,因此這個引數我們也可以用來在一定程度上控制生成的檔案數量。

2.3動態分割槽任務

動態分割槽任務因為存在著分割槽這一變數,單純調整rdd這邊的partition數目很難把控整體的檔案數量。

在hive裡,我們可以通過設定hive.optimize.sort.dynamic.partition來緩解動態分割槽產生檔案過多導致任務執行時task節點經常oom的狀況。這樣的引數會引入新的的shuffle,來對資料進行重排序,將相同的partition分給同一個task處理,從而避免了一個task同時持有多個檔案控制代碼。

因此,我們可以藉助這樣的思想,使用distribute by語句來修改sql,從而控制檔案數量。一般而言,假設我們想對於每個分割槽生成不超過N個檔案,則可以在SQL末尾增加DISTRIBUTE BY [動態分割槽列],ceil(rand() * N)。

2fd31fc47c7466fd040562186737d73e.png

3. 自研可合併檔案的commitProtocol方案

綜上種種,每個方法都存在一定的弊端,眾多規則也在實際使用過程中對業務方造成很大困擾。

因此我們產生了想在spark這邊實現和hive類似的小檔案合併機制。在幾個可能的方案選型中,我們最終選擇了:重寫spark.sql.sources.commitProtocolClass方法。

一方面,該方案對Spark程式碼無侵入,便於Spark原始碼的維護,另一方面,該方案對業務方使用友好,可以動態通過set命令設定,如果出現問題回滾也十分方便。業務方在使用過程中,只需要簡單設定:

spark.sql.sources.commitProtocolClass,即可控制是否開啟小檔案合併。

在開啟小檔案合併引數後,我們會在commit階段拿到生成的所有檔案,引入兩個新的job來對這些檔案進行處理。首先我們在第一個job獲取到所有大小小於spark.compact.smallfile.size的檔案,在查詢完成後按照spark.compact.size引數值對組合檔案,並在第二個job中對這些檔案進行合併。

6cf3babbeb6f0ca4353f465a91036aad.png

691c6edc73247ac49d50c86e26a9829a.png

END

招聘資訊

OPPO網際網路技術團隊招聘一大波崗位,涵蓋C++、Go、OpenJDK、Java、DevOps、容器、Linux核心開發、產品經理、專案經理等多個方向,請在公眾號後臺回覆關鍵詞“招聘”檢視查詳細資訊。

你可能還喜歡

OPPO自研ESA DataFlow架構與實踐

OPPO 實時數倉揭祕:從頂層設計實現離線與實時的平滑遷移

OPPO異地多活實踐——快取篇

OPPO百萬級高併發MongoDB叢集效能數十倍提升優化實踐(上)

OPPO百萬級高併發MongoDB叢集效能數十倍提升優化實踐(下)

更多技術乾貨

掃碼關注

OPPO網際網路技術

5b4cbd1bad88c82085233309ea2b0026.png 我就知道你“在看” d34700448dcb6849d0fd25d3d2f80db4.gif