1. 程式人生 > >Hadoop實戰專案:小檔案合併

Hadoop實戰專案:小檔案合併

public class MergeSmallFilesToHDFS { private static FileSystem hdfs = null; //定義HDFS上的檔案系統物件 private static FileSystem local = null; //定義本地檔案系統物件 /** * * @function 過濾 regex 格式的檔案 * */ public static class RegexExcludePathFilter implements PathFilter {
private final String regex; public RegexExcludePathFilter(String regex) { // TODO Auto-generated constructor stub this.regex = regex; } @Override public boolean accept(Path path) { // TODO Auto-generated method stub boolean flag = path.toString().matches(regex);
return !flag; } } /** * * @function 接受 regex 格式的檔案 * */ public static class RegexAcceptPathFilter implements PathFilter { private final String regex; public RegexAcceptPathFilter(String regex) { // TODO Auto-generated constructor stub
this.regex = regex; } @Override public boolean accept(Path path) { // TODO Auto-generated method stub boolean flag = path.toString().matches(regex); return flag; } } /** * @param args * @throws IOException * @throws URISyntaxException */ public static void main(String[] args) throws URISyntaxException, IOException { // TODO Auto-generated method stub list(); } private static void list() throws URISyntaxException, IOException { // TODO Auto-generated method stub Configuration conf = new Configuration();//讀取Hadoop配置檔案 //設定檔案系統訪問介面,並建立FileSystem在本地的執行模式 URI uri = new URI("hdfs://Centpy:9000"); hdfs = FileSystem.get(uri, conf); local = FileSystem.getLocal(conf);//獲取本地檔案系統 //過濾目錄下的svn檔案 FileStatus[] dirstatus = local.globStatus(new Path("D://Code/EclipseCode/mergeSmallFilesTestData/*"), new RegexExcludePathFilter("^.*svn$")); //獲取D:\Code\EclipseCode\mergeSmallFilesTestData目錄下的所有檔案路徑 Path[] dirs = FileUtil.stat2Paths(dirstatus); FSDataOutputStream out = null; FSDataInputStream in = null; for(Path dir:dirs) {//比如拿2018-03-23為例 //將資料夾名稱2018-03-23的-去掉,直接,得到20180323資料夾名稱 String fileName = dir.getName().replace("-", "");//檔名稱 //只接受2018-03-23日期目錄下的.txt檔案 FileStatus[] localStatus = local.globStatus(new Path(dir + "/*"), new RegexAcceptPathFilter("^.*txt$")); // 獲得2018-03-23日期目錄下的所有檔案 Path[] listPath = FileUtil.stat2Paths(localStatus); // 輸出路徑 Path outBlock = new Path("hdfs://Centpy:9000/mergeSmallFiles/result/"+ fileName + ".txt"); System.out.println("合併後的檔名稱:"+fileName+".txt"); // 開啟輸出流 out = hdfs.create(outBlock); //迴圈操作2018-03-23日期目錄下的所有檔案 for(Path p:listPath) { in = local.open(p);// 開啟輸入流 IOUtils.copyBytes(in, out, 4096, false);// 複製資料 in.close();// 關閉輸入流 } if (out != null) { out.close();// 關閉輸出流