大資料(hadoop-小檔案合併、Mapreduce原理)
阿新 • • 發佈:2019-05-07
hadoop-小檔案合併
package com.andy.merge; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; public class RegexAcceptFilter implements PathFilter{ private final String regex ; public RegexAcceptFilter(String regex){ this.regex = regex ; } //只接受符合regex的檔案 @Override public boolean accept(Path path) { boolean flag = path.toString().matches(regex) ; return flag; } }
package com.andy.merge; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; //PathFilter是一個介面,裡面只有一個方法accept(Path path) public class RegexUncludeFilter implements PathFilter{ private final String regex ; public RegexUncludeFilter(String regex){ this.regex = regex ; } //過濾 regex 格式的檔案 @Override public boolean accept(Path path) { boolean flag = path.toString().matches(regex); //符合得我就接受,不符合的就過濾,所以是非flag return !flag; } }
package com.andy.merge; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; /** * 小檔案合併 * @author huang * */ public class MegerSmallFiles { //寫入到HDFS的FileSystem物件 private static FileSystem fs = null ; //本地檔案系統的FileSystem private static FileSystem local = null ; //HDFS服務路徑 private static final String HDFS_SERVER = "hdfs://192.168.153.111:9000" ; //合併小檔案的主要方法 public static void megerFiles() throws Exception { //設定系統使用者為hadoop System.setProperty("HADOOP_USER_NAME", "hadoop") ; //讀取hadoop檔案的配置資訊 Configuration conf = new Configuration() ; //建立URI URI uri = new URI(HDFS_SERVER) ; //建立兩個檔案系統的fs fs = FileSystem.get(uri, conf) ; //針對HDFS local = FileSystem.get(conf) ; //針對本地檔案系統 /* 獲取指定路徑下的所有檔案 * 過濾該路徑下的所有svn檔案 * ^匹配一行的開頭 ;.表示匹配任意一個字元 * *表示匹配0個或多個前面這個字元 ;$匹配一行的結束 * */ FileStatus[] globStatus = local.globStatus(new Path("D:/pdata/*"), new RegexUncludeFilter("^.*svn$")); //除錯輸出 for (FileStatus fileStatus : globStatus) { System.out.println(fileStatus.getPath().toString()); } //將一組FileStatus物件轉換成Path物件 Path[] dirs = FileUtil.stat2Paths(globStatus); //獲取輸入輸出流 FSDataOutputStream out = null ; FSDataInputStream in = null ; for (Path dir : dirs) { //具體的每個目錄下面的所有檔案 //檔名稱 String fileName = dir.getName().replaceAll("-", "") ; //只接受該目錄下的txt檔案 FileStatus[] txtPaths = local.globStatus(new Path(dir + "/*") , new RegexAcceptFilter("^.*txt$")); Path[] txtFiles = FileUtil.stat2Paths(txtPaths); //設定輸出路徑 Path hdfsFile = new Path(HDFS_SERVER + "/vip/" + fileName + ".txt") ; //開啟輸入輸出流,進行讀寫 out = fs.create(hdfsFile) ; //輸出流 for (Path p : txtFiles) { in = local.open(p) ; IOUtils.copyBytes(in, out, 4096, false); //關閉輸入流 in.close(); } if(null != out){ out.close(); } } } //程式入口 public static void main(String[] args) throws Exception { megerFiles() ; System.out.println("=====小檔案合併成功====="); } }