Hadoop實戰專案:小檔案合併
阿新 • • 發佈:2019-01-09
public class MergeSmallFilesToHDFS {
private static FileSystem hdfs = null; //定義HDFS上的檔案系統物件
private static FileSystem local = null; //定義本地檔案系統物件
/**
*
* @function 過濾 regex 格式的檔案
*
*/
public static class RegexExcludePathFilter implements PathFilter
{
private final String regex;
public RegexExcludePathFilter(String regex) {
// TODO Auto-generated constructor stub
this.regex = regex;
}
@Override
public boolean accept(Path path) {
// TODO Auto-generated method stub
boolean flag = path.toString().matches(regex);
return !flag;
}
}
/**
*
* @function 接受 regex 格式的檔案
*
*/
public static class RegexAcceptPathFilter implements PathFilter
{
private final String regex;
public RegexAcceptPathFilter(String regex) {
// TODO Auto-generated constructor stub
this.regex = regex;
}
@Override
public boolean accept(Path path) {
// TODO Auto-generated method stub
boolean flag = path.toString().matches(regex);
return flag;
}
}
/**
* @param args
* @throws IOException
* @throws URISyntaxException
*/
public static void main(String[] args) throws URISyntaxException, IOException {
// TODO Auto-generated method stub
list();
}
private static void list() throws URISyntaxException, IOException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();//讀取Hadoop配置檔案
//設定檔案系統訪問介面,並建立FileSystem在本地的執行模式
URI uri = new URI("hdfs://Centpy:9000");
hdfs = FileSystem.get(uri, conf);
local = FileSystem.getLocal(conf);//獲取本地檔案系統
//過濾目錄下的svn檔案
FileStatus[] dirstatus = local.globStatus(new Path("D://Code/EclipseCode/mergeSmallFilesTestData/*"),
new RegexExcludePathFilter("^.*svn$"));
//獲取D:\Code\EclipseCode\mergeSmallFilesTestData目錄下的所有檔案路徑
Path[] dirs = FileUtil.stat2Paths(dirstatus);
FSDataOutputStream out = null;
FSDataInputStream in = null;
for(Path dir:dirs)
{//比如拿2018-03-23為例
//將資料夾名稱2018-03-23的-去掉,直接,得到20180323資料夾名稱
String fileName = dir.getName().replace("-", "");//檔名稱
//只接受2018-03-23日期目錄下的.txt檔案
FileStatus[] localStatus = local.globStatus(new Path(dir + "/*"),
new RegexAcceptPathFilter("^.*txt$"));
// 獲得2018-03-23日期目錄下的所有檔案
Path[] listPath = FileUtil.stat2Paths(localStatus);
// 輸出路徑
Path outBlock = new Path("hdfs://Centpy:9000/mergeSmallFiles/result/"+ fileName + ".txt");
System.out.println("合併後的檔名稱:"+fileName+".txt");
// 開啟輸出流
out = hdfs.create(outBlock);
//迴圈操作2018-03-23日期目錄下的所有檔案
for(Path p:listPath)
{
in = local.open(p);// 開啟輸入流
IOUtils.copyBytes(in, out, 4096, false);// 複製資料
in.close();// 關閉輸入流
}
if (out != null) {
out.close();// 關閉輸出流