[離線計算-Spark|Hive] HDFS小檔案處理
背景
HDFS 小檔案過多會對hadoop 擴充套件性以及穩定性造成影響, 因為要在namenode 上儲存維護大量元資訊.
大量的小檔案也會導致很差的查詢分析效能,因為查詢引擎執行查詢時需要進行太多次檔案的開啟/讀取/關閉.
小檔案解決思路
通常能想到的方案就是通過Spark API 對檔案目錄下的小檔案進行讀取,然後通過Spark的運算元repartition操作進行合併小檔案,repartition 分割槽數通過輸入檔案的總大小和期望輸出檔案的大小通過預計算而得。
總體流程如下:
該方案適合針對已發現有小檔案問題,然後對其進行處理. 下面介紹下hudi是如何實現在寫入時實現對小檔案的智慧處理.
Hudi小檔案處理
Hudi會自管理檔案大小,避免向查詢引擎暴露小檔案,其中自動處理檔案大小起很大作用
在進行insert/upsert操作時,Hudi可以將檔案大小維護在一個指定檔案大小
hudi 小檔案處理流程:
每次寫入都會遵循此過程,以確保Hudi表中沒有小檔案。
核心程式碼:
寫入檔案分配:
org.apache.hudi.table.action.commit.UpsertPartitioner#assignInserts
//獲取分割槽路徑 Set<String> partitionPaths = profile.getPartitionPaths(); //根據先前提交期間寫入的記錄獲取平均記錄大小。用於估計有多少記錄打包到一個檔案中。 long averageRecordSize = averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),config); LOG.info("AvgRecordSize => " + averageRecordSize); //獲取每個分割槽檔案路徑下小檔案 Map<String, List<SmallFile>> partitionSmallFilesMap = getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), jsc); for (String partitionPath : partitionPaths) { ... List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath); //未分配的寫入記錄 long totalUnassignedInserts = pStat.getNumInserts(); ... for (SmallFile smallFile : smallFiles) { //hoodie.parquet.max.file.size 資料檔案最大大小,Hudi將試著維護檔案大小到該指定值 //算出資料檔案大小 - 小檔案 就是剩餘可以寫入檔案大小, 除以平均記錄大小就是插入的記錄行數 long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize, totalUnassignedInserts); //分配記錄到小檔案中 if (recordsToAppend > 0 && totalUnassignedInserts > 0) { // create a new bucket or re-use an existing bucket int bucket; if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) { bucket = updateLocationToBucket.get(smallFile.location.getFileId()); LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket); } else { bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId()); LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); } bucketNumbers.add(bucket); recordsPerBucket.add(recordsToAppend); //減去已經分配的記錄數 totalUnassignedInserts -= recordsToAppend; } //如果記錄沒有分配完 if (totalUnassignedInserts > 0) { //hoodie.copyonwrite.insert.split.size 每個分割槽條數 long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize(); //是否自動計算每個分割槽條數 if (config.shouldAutoTuneInsertSplits()) { insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize; } //計算要建立的bucket int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket); ... for (int b = 0; b < insertBuckets; b++) { bucketNumbers.add(totalBuckets); if (b == insertBuckets - 1) { //針對最後一個buket處理,就是寫完剩下的記錄 recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket); } else { recordsPerBucket.add(insertRecordsPerBucket); } BucketInfo bucketInfo = new BucketInfo(); bucketInfo.bucketType = BucketType.INSERT; bucketInfo.partitionPath = partitionPath; bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx(); bucketInfoMap.put(totalBuckets, bucketInfo); totalBuckets++; } } } }
獲取每個分割槽路徑下小檔案:
org.apache.hudi.table.action.commit.UpsertPartitioner#getSmallFiles
if (!commitTimeline.empty()) { // if we have some commits HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView() .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList()); for (HoodieBaseFile file : allFiles) { //獲取小於 hoodie.parquet.small.file.limit 引數值就為小檔案 if (file.getFileSize() < config.getParquetSmallFileLimit()) { String filename = file.getFileName(); SmallFile sf = new SmallFile(); sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); sf.sizeBytes = file.getFileSize(); smallFileLocations.add(sf); } } }
UpsertPartitioner繼承spark的Partitioner, hudi在寫入的時候會利用spark 自定分割槽的機制優化記錄分配到不同檔案的能力, 從而達到在寫入時不斷優化解決小檔案問題.
涉及到的關鍵配置:
-
hoodie.parquet.max.file.size:資料檔案最大大小,Hudi將試著維護檔案大小到該指定值;
-
hoodie.parquet.small.file.limit:小於該大小的檔案均被視為小檔案;
-
hoodie.copyonwrite.insert.split.size:單檔案中插入記錄條數,此值應與單個檔案中的記錄數匹配(可以根據最大檔案大小和每個記錄大小來確定)
在hudi寫入時候如何使用、配置引數?
在寫入hudi的程式碼中 .option中配置上述引數大小,如下:
.option(HoodieStorageConfig.DEFAULT_PARQUET_FILE_MAX_BYTES, 120 * 1024 * 1024)
總結
本文主要介紹小檔案的處理方法思路,以及通過閱讀原始碼和相關資料學習hudi 如何在寫入時智慧的處理小檔案問題新思路.Hudi利用spark 自定義分割槽的機制優化記錄分配到不同檔案的能力,達到小檔案的合併處理.