1. 程式人生 > >HDFS小檔案合併問題的優化:copyMerge的改進

HDFS小檔案合併問題的優化:copyMerge的改進

1.問題分析

用fsck命令統計 檢視HDFS上在某一天日誌的大小,分塊情況以及平均的塊大小,即

[[email protected] jar]$ hadoop fsck /wcc/da/kafka/report/2015-01-11
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

15/01/13 18:57:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Connecting to namenode via http://da-master:50070
FSCK started by hduser (auth:SIMPLE) from /172.21.101.30 for path /wcc/da/kafka/report/2015-01-11 at Tue Jan 13 18:57:24 CST 2015
....................................................................................................
....................................................................................................
........................................Status: HEALTHY
 Total size:	9562516137 B
 Total dirs:	1
 Total files:	240
 Total symlinks:		0
 Total blocks (validated):	264 (avg. block size 36221652 B)
 Minimally replicated blocks:	264 (100.0 %)
 Over-replicated blocks:	0 (0.0 %)
 Under-replicated blocks:	0 (0.0 %)
 Mis-replicated blocks:		0 (0.0 %)
 Default replication factor:	2
 Average block replication:	2.0
 Corrupt blocks:		0
 Missing replicas:		0 (0.0 %)
 Number of data-nodes:		5
 Number of racks:		1
FSCK ended at Tue Jan 13 18:57:24 CST 2015 in 14 milliseconds


The filesystem under path '/wcc/da/kafka/report/2015-01-11' is HEALTHY

用表格整理出來:

Date Time

Total(GB)

Total blocks

AveBlockSize(MB)

2014/12/21

9.39

268

36

2014/12/20

9.5

268

36

2014/12/19

8.89

268

34

2014/11/5

8.6

266

33

2014/10/1

9.31

268

36


分析問題的存在性:從表中可以看出,每天日誌量的分塊情況:總共大概有268左右的塊數,平均塊大小為36MB左右,遠遠不足128MB,這潛在的說明了一個問題。日誌產生了很多小檔案,大多數都不足128M,嚴重影響叢集的擴充套件性和效能:首先,在HDFS中,任何block,檔案或者目錄在記憶體中均以物件的形式儲存,每個物件約佔150byte,如果有1000 0000個小檔案,每個檔案佔用一個block,則namenode大約需要2G空間。如果儲存1億個檔案,則namenode需要20G空間,這樣namenode記憶體容量嚴重製約了叢集的擴充套件; 其次,訪問大量小檔案速度遠遠小於訪問幾個大檔案;HDFS最初是為流式訪問大檔案開發的,如果訪問大量小檔案,需要不斷的從一個datanode跳到另一個datanode,嚴重影響效能;最後,處理大量小檔案速度遠遠小於處理同等大小的大檔案的速度,因為每一個小檔案要佔用一個slot,而task啟動將耗費大量時間甚至大部分時間都耗費在啟動task和釋放task上,累計起來的總時長必然增加。我們採取的策略是先合併小檔案,比如整理日誌成user_report.tsv,client_report.tsv,AppLog_UserDevice.tsv, 再執行job。

2.解決方案

可以呼叫API的 FileUtil工具類的方法copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource,Configuration conf, String addString);

但是此方法並不適用,因為某一天日誌存在著三種類型的日誌,即:


要分別合併成三個檔案user_report.tsv,client_report.tsv和AppLog_UserDevice.tsv,故必須重新實現copyMerge函式,先分析copyMerge原始碼:

  /** Copy all files in a directory to one output file (merge). */
  public static boolean copyMerge(FileSystem srcFS, Path srcDir, 
                                  FileSystem dstFS, Path dstFile, 
                                  boolean deleteSource,
                                  Configuration conf, String addString) throws IOException {
	//生成合並後的目標檔案路徑dstFile,檔名為srcDir.getName(),即源路徑的目錄名,因此這裡我們不能自定義生成的日誌檔名,非常不方便							  
    dstFile = checkDest(srcDir.getName(), dstFS, dstFile, false);
	//判斷源路徑是否為檔案目錄
    if (!srcFS.getFileStatus(srcDir).isDirectory())
      return false;
   //建立輸出流,準備寫檔案
    OutputStream out = dstFS.create(dstFile);
    
    try {
	 // 得到每個源路徑目錄下的每個檔案
      FileStatus contents[] = srcFS.listStatus(srcDir);
	  //排序操作
      Arrays.sort(contents);
      for (int i = 0; i < contents.length; i++) {
        if (contents[i].isFile()) {
			//建立輸入流,讀檔案
          InputStream in = srcFS.open(contents[i].getPath());
          try {
		  //執行復制操作,寫入到目標檔案中
            IOUtils.copyBytes(in, out, conf, false);
            if (addString!=null)
              out.write(addString.getBytes("UTF-8"));
                
          } finally {
            in.close();
          } 
        }
      }
    } finally {
      out.close();
    }
    
	//若deleteSource為true,刪除源路徑目錄下的每個檔案
    if (deleteSource) {
      return srcFS.delete(srcDir, true);
    } else {
      return true;
    }
  }  

改進後:(這種方式只需要開啟關閉輸出流out 三次)
	/** Copy corresponding files in a directory to related output file (merge). */
	@SuppressWarnings("unchecked")
	public static boolean merge(FileSystem hdfs, Path srcDir, Path dstFile,
			boolean deleteSource, Configuration conf) throws IOException {
		if (!hdfs.getFileStatus(srcDir).isDirectory())
			return false;
		// 得到每個源目錄下的每個檔案;
		FileStatus[] fileStatus = hdfs.listStatus(srcDir);
		// 三種不同型別的檔案各自的檔案路徑存入不同的list;
		ArrayList<Path> userPaths = new ArrayList<Path>();
		ArrayList<Path> clientPaths = new ArrayList<Path>();
		ArrayList<Path> appPaths = new ArrayList<Path>();
		for (FileStatus fileStatu : fileStatus) {
			Path filePath = fileStatu.getPath();
			if (filePath.getName().startsWith("user_report")) {
				userPaths.add(filePath);
			} else if (filePath.getName().startsWith("client_report")) {
				clientPaths.add(filePath);
			} else if (filePath.getName().startsWith("AppLog_UserDevice")) {
				appPaths.add(filePath);
			}
		}
		// 分別寫入到目標檔案:user_report.tsv中
		if (userPaths.size() > 0) {
			Path userDstFile = new Path(dstFile.toString() + "/user_report.tsv");
			OutputStream out = hdfs.create(userDstFile);
			Collections.sort(userPaths);
			try {
				Iterator<Path> iterator = userPaths.iterator();
				while (iterator.hasNext()) {
					InputStream in = hdfs.open(iterator.next());
					try {
						IOUtils.copyBytes(in, out, conf, false);
					} finally {
						in.close();
					}
				}
			} finally {
				out.close();
			}
		}
		// 分別寫入到目標檔案:client_report.tsv中
		if (clientPaths.size() > 0) {
			Path clientDstFile = new Path(dstFile.toString()
					+ "/client_report.tsv");
			OutputStream out = hdfs.create(clientDstFile);
			Collections.sort(clientPaths);
			try {
				Iterator<Path> iterator = clientPaths.iterator();
				while (iterator.hasNext()) {
					InputStream in = hdfs.open(iterator.next());
					try {
						IOUtils.copyBytes(in, out, conf, false);
					} finally {
						in.close();
					}
				}
			} finally {
				out.close();
			}
		}
		// 分別寫入到目標檔案:AppLog_UserDevice.tsv中
		if (appPaths.size() > 0) {
			Path appDstFile = new Path(dstFile.toString()
					+ "/AppLog_UserDevice.tsv");
			OutputStream out = hdfs.create(appDstFile);
			Collections.sort(appPaths);
			try {
				Iterator<Path> iterator = appPaths.iterator();
				while (iterator.hasNext()) {
					InputStream in = hdfs.open(iterator.next());
					try {
						IOUtils.copyBytes(in, out, conf, false);
					} finally {
						in.close();
					}
				}
			} finally {
				out.close();
			}
		}
		if (deleteSource) {
			return hdfs.delete(srcDir, true);
		}
		return true;
	}

當然你也可以這樣:(每一個小檔案 都會開啟輸入輸出流一次,不過程式碼上看上去比較簡潔,還無需用ArrayList做中間儲存)
	public static boolean mergeFiles(FileSystem hdfs, Path srcDir,
			Path dstFile, boolean deleteSource, Configuration conf)
			throws IOException {
		if (!hdfs.getFileStatus(srcDir).isDirectory())
			return false;
		// 得到每個源目錄下的每個檔案;
		FileStatus[] fileStatus = hdfs.listStatus(srcDir);
		// 三種不同型別的檔案各自合併

		for (FileStatus fileStatu : fileStatus) {
			Path filePath = fileStatu.getPath();
			Path dstPath = new Path("");
			if (filePath.getName().startsWith("user_report")) {
				dstPath = new Path(dstFile.toString() + "/user_report.tsv");
			} else if (filePath.getName().startsWith("client_report")) {
				dstPath = new Path(dstFile.toString() + "/client_report.tsv");
			} else if (filePath.getName().startsWith("AppLog_UserDevice")) {
				dstPath = new Path(dstFile.toString() + "/client_report.tsv");
			}else{
				dstPath=new Path( "/error.tsv");
			}
			
			OutputStream out = hdfs.create(dstPath);
			try {
				InputStream in = hdfs.open(filePath);
				try {
					IOUtils.copyBytes(in, out, conf, false);
				} finally {
					in.close();
				}

			} finally {
				out.close();
			}
			
			
		}
		if (deleteSource) {
			return hdfs.delete(srcDir, true);
		}
		return true;
	}

3.總結

根據不同業務邏輯的需求,你可以自定義實現API介面函式。對於解決小檔案合併問題,如果你有更好的策略,歡迎交流!

相關推薦

HDFS檔案合併問題的優化copyMerge改進

1.問題分析 用fsck命令統計 檢視HDFS上在某一天日誌的大小,分塊情況以及平均的塊大小,即 [[email protected] jar]$ hadoop fsck /wcc/da/kafka/report/2015-01-11 DEPRECATED: U

hdfs 檔案合併方案(附程式碼)

背景: presto計算落地出現了大量的小檔案,目前暫時沒有發現可以通過引數優化解決,所以開發了小檔案合併工具 工具架構如下 工具主要分為三部分: collector 負責將合併規則推送到redis佇列,合併規則物件定義如下, public class FileCo

python spark中parquet檔案寫到hdfs,同時避免太多的檔案(block檔案合併

    在pyspark中,使用資料框的檔案寫出函式write.parquet經常會生成太多的小檔案,例如申請了100個block,而每個block中的結果 只有幾百K,這在機器學習演算法的結果輸出中經常出現,這是一種很大的資源浪費,那麼如何同時避免太多的小檔案(bloc

程式碼自留地檔案合併成大檔案,需要配置BytesZip使用,java

public class FileZip implements Serializable { String fileName = null; byte [] zipBytes = null;

HDFS檔案優化方法

1 HDFS小檔案弊端 HDFS上每個檔案都要在NameNode上建立一個索引,這個索引的大小約為150byte,這樣當小檔案比較多的時候,就會產生很多的索引檔案,一方面會大量佔用NameNode的記憶體空間,另一方面就是索引檔案過大使得索引速度變慢。 2 HDFS小檔案解決方案 小檔案

Hadoop實戰專案檔案合併

public class MergeSmallFilesToHDFS { private static FileSystem hdfs = null; //定義HDFS上的檔案系統物件 private static FileSystem local = null; //定義本地檔案系統物件

MaxCompute檔案問題優化方案

小檔案背景知識 小檔案定義 分散式檔案系統按塊Block存放,檔案大小比塊大小小的檔案(預設塊大小為64M),叫做小檔案。 如何判斷存在小檔案數量多的問題 檢視檔案數量 desc extended + 表名 判斷小檔案數量多的標準 1、非分割槽表,表文件數達到1000個,檔案平均大小小於64

網站架構優化從100併發抗到4000併發

    <httpModules>      <add name="UrlRewrite" type="Web.UrlRewrite.UrlRewrite,Web.UrlRewrite" />      <remove name="OutputCache" />     

解決MapReduce中多個檔案合併成大檔案問題

package inputformat; import java.io.IOException; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import

HDFS檔案處理及解決方案

小檔案為什麼會成為問題?小檔案的解決方案包括哪些?有沒有自己的解決方案?Hadoop Archive具體是如何處理小檔案的?Sequence file是如何解決小檔案的?CombineFileInputFormat是如何解決小檔案的?1、  概述小檔案是指檔案size小於H

HDFS檔案問題解決方案+SequenceFile簡單介紹

HDFS和MR主要針對大資料檔案來設計,在小檔案處理上效率低. 解決方法是選擇一個容器,將這些小檔案包裝起來,將整個檔案作為一條記錄,可以獲取更高效率的儲存和處理,避免多次開啟關閉流耗費計算資源.

Linux下大檔案分割和檔案合併

就像Android中除錯問題,有的時候Log連續儲存會很大,這個時候進行分割就很好地處理檔案了。 在Linux下分割檔案通過split工具進行,合併檔案可以通過cat進行。 1. split a. 可以按行數進行分割 split -l 500 big

對海量檔案儲存優化的一些理解和TFS介紹

在研究圖片伺服器問題時,瞭解到現在很多大公司基本上都是用分散式檔案系統來儲存海量小檔案,比如Facebook有haystack,淘寶有TFS,京東有JFS。最近在研究TFS,結合之前學習的linux下的inode相關知識,瞭解到在ext檔案系統中,對一個檔案的讀需要先從磁碟

大資料(hadoop-檔案合併、Mapreduce原理)

hadoop-小檔案合併 package com.andy.merge; import org.apache.hadoo

解決Spark讀取HDFS檔案的問題

若Spark讀取HDFS資料夾時, 其中的小檔案過多會降低程式效能,也給HDFS帶來壓力。 當Spark讀取檔案時, 會為每個檔案

HDFS簡單程式設計例項檔案合併

 下圖顯示了HDFS檔案系統中路徑為“localhost:50070/explorer.html#/user/hadoop”的目錄中所有的檔案資訊: 對於該目錄下的所有檔案,我們將執行以下操作: 首先,從該目錄中過濾出所有後綴名不為".abc"的檔案。 然後,對過濾之後的檔案進行讀取。

ElasticSearch58核心原理揭祕_最後優化寫入流程實現海量磁碟檔案合併(segment merge, optimize)

1.每秒一個segment file,檔案過多,而且每次search都要搜尋所有的segment,很耗時 預設會在後臺執行segment merge操作,在merge的時候,被標記為deleted的d

使用Impala合併檔案

1.文件編寫目的   Fayson在前面的文章《如何在Hadoop中處理小檔案》裡面介紹了多種處理方式。在Impala表中使用小檔案也會降低Impala和HDFS的效能,本篇文章Fayson主要介紹如何使用Impala合併小檔案。   內容概述

HDFS無法高效儲存大量檔案,如何處理好檔案

一、HAR檔案方案         為了緩解大量小檔案帶給namenode記憶體的壓力,Hadoop 0.18.0引入了Hadoop Archives(HAR files),其本質就是在HDFS之上構建一個分層檔案系統。通過執行hado

史上最全MapReduce檔案優化策略

小檔案的優化無非以下幾種方式:   在資料採集的時候,就將小檔案或小批資料合成大檔案再上傳 HDFS 在業務處理之前,在 HDFS 上使用 mapreduce 程式對小檔案進行合併 在 mapreduce 處理時,可採用 CombineTextInputForma