HDFS API之編寫詞頻統計框架
阿新 • • 發佈:2021-07-05
package com.imooc.bigdata.hadoop.hdfs; /* * 使用HDFS API完成WordCount統計 * * 需求:統計HDFS上的檔案的詞頻統計,然後將統計結果輸出到HDFS * * 功能拆解: * 1) 讀取HDFS上的檔案 ==》 HDFS API * 2) 業務處理(詞頻統計):對檔案中的每一行資料都要進行業務處理(按照分隔符分割) ==》 Mapper(抽象類/介面) * 3) 將處理結果快取起來 ==》 Context(抽象類/介面) * 4) 將結果輸出到HDFS ==》 HDFS API * */ import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.Set; public class HDFSWordCountApp01 { public static void main(String[] args) throws Exception{ // 1) 讀取HDFS上的檔案 ==》 HDFS APIPath input = new Path("/hdfsApi/test/hello.txt"); // 獲取要操作的HDFS檔案系統 FileSystem fs = FileSystem.get(new URI("hdfs://192.168.126.101:8020"), new Configuration(), "hadoop"); //將內容讀取出來,此處不使用遞迴 RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(input,false);//迭代開始: while (iterator.hasNext()){ LocatedFileStatus file = iterator.next(); FSDataInputStream in = fs.open(file.getPath()); BufferedReader reader = new BufferedReader(new InputStreamReader(in)); String line = ""; while ((line = reader.readLine())!= null){ //TODO... 2) 詞頻處理 } reader.close(); in.close(); } //TODO... 3) 將結果快取起來 Map Map<Object, Object> contextMap = new HashMap<Object, Object>(); // 4) 將結果輸出到HDFS ==》 HDFS API Path output = new Path("/hdfsApi/output/"); FSDataOutputStream out = fs.create(new Path(output, new Path("WCOut"))); //TODO... 將第三步快取中的內容輸出到out中去 Set<Map.Entry<Object, Object>> entries = contextMap.entrySet(); for (Map.Entry<Object, Object> entry : entries){ out.write((entry.getKey().toString() + "\t" + entry.getValue() + "\n").getBytes()); } out.close(); fs.close(); System.out.print("HDFS API統計詞頻執行成功"); } }