HDFS 手寫mapreduce單詞計數框架
阿新 • • 發佈:2018-11-10
一、資料處理類
package com.css.hdfs; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; /** * 需求:檔案(hello world hello teacher hello john tom ) 統計每個單詞出現的次數? * 資料儲存在hdfs、統計出來的結果儲存到hdfs * * 2004google:dfs/bigtable/mapreduce * * 大資料解決的問題? * 1.海量資料的儲存 * hdfs * 2.海量資料的計算 * mapreduce * * 思路? * hello 2 * world 1 * hello 1 * ... * * 基於使用者體驗: * 使用者輸入資料 * 使用者處理的方式 * 使用者指定結果資料儲存位置*/ public class HdfsWordCount { public static void main(String[] args) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException, InterruptedException, URISyntaxException { // 反射 Properties pro = new Properties(); // 載入配置檔案 pro.load(HdfsWordCount.class.getClassLoader().getResourceAsStream("job.properties")); Path inPath = new Path(pro.getProperty("IN_PATH")); Path outPath = new Path(pro.getProperty("OUT_PATH")); Class<?> mapper_class = Class.forName(pro.getProperty("MAPPER_CLASS")); // 例項化 Mapper mapper = (Mapper) mapper_class.newInstance(); Context context = new Context(); // 構建hdfs客戶端物件 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.146.132:9000/"), conf, "root"); // 讀取使用者輸入的檔案 RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inPath, false); while (iter.hasNext()) { LocatedFileStatus file = iter.next(); // 開啟路徑 獲取輸入流 FSDataInputStream in = fs.open(file.getPath()); BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8")); String line = null; while ((line = br.readLine()) != null) { // 呼叫map方法執行業務邏輯 mapper.map(line, context); } // 關閉資源 br.close(); in.close(); } // 如果使用者輸入的結果路徑不存在 則建立一個 Path out = new Path("/wc/out/"); if (!fs.exists(out)) { fs.mkdirs(out); } // 將快取的結果放入hdfs中儲存 HashMap<Object, Object> contextMap = context.getContextMap(); FSDataOutputStream out1 = fs.create(outPath); // 遍歷hashmap Set<Entry<Object, Object>> entrySet = contextMap.entrySet(); for (Entry<Object, Object> entry : entrySet) { // 寫資料 out1.write((entry.getKey().toString() + "\t" + entry.getValue() + "\n").getBytes()); } // 關閉資源 out1.close(); fs.close(); System.out.println("資料統計結果完成......"); } }
二、介面類
package com.css.hdfs; /** * 思路: * 介面設計 */ public interface Mapper { // 呼叫方法 public void map(String line, Context context); }
三、資料傳輸類
package com.css.hdfs; import java.util.HashMap; /** * 思路: * 資料傳輸的類 * 封裝資料 * 集合 * <單詞,1> */ public class Context { // 資料封裝 private HashMap<Object, Object> contextMap = new HashMap<>(); // 寫資料 public void write(Object key, Object value){ // 放資料到map中 contextMap.put(key, value); } // 定義根據key拿到值方法 public Object get(Object key){ return contextMap.get(key); } // 拿到map中的資料內容 public HashMap<Object, Object> getContextMap(){ return contextMap; } }
四、單詞計數類
package com.css.hdfs; /** * 思路: * 新增一個map方法 單詞切分 相同key的value ++ */ public class WordCountMapper implements Mapper{ @Override public void map(String line, Context context) { // 拿到這行資料 切分 String[] words = line.split(" "); // 拿到單詞 相同的key value++ hello 1 world 1 for (String word : words) { Object value = context.get(word); if (null == value) { context.write(word, 1); }else { // 不為空 int v = (int)value; context.write(word, v+1); } } } }
五、配置檔案job.properties
IN_PATH=/wc/in OUT_PATH=/wc/out/rs.txt MAPPER_CLASS=com.css.hdfs.WordCountMapper