MapReduce詳解!詳解!詳解!
阿新 • • 發佈:2018-12-16
理解 MapReduce 執行過程
- 以統計檔案中 單詞出現的個數為例
- 一共三個檔案
1.以整個檔案的角度進行圖解 ( 每個方塊就是一個檔案)
2.根據程式碼進行圖解
放上程式碼,僅供參考
WCMapper.java
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ // 重寫 map 方法 @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)throws IOException, InterruptedException { // value 代表檔案中的每一行資料 String line = value.toString(); // 根據空格拆分字串 {hello lilei} String[] results = line.split(" "); // 遍歷陣列 ,獲取每一個結果 for (String str : results) { context.write(new Text(str), new LongWritable(1));// {hello, 1}{lilei, 1} } } }
WCReduce.java
public class WCReduce extends Reducer<Text, LongWritable, Text, LongWritable> { // 重寫reduce 方法 @Override protected void reduce(Text key2, Iterable<LongWritable> v2,Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { // 寫出自己的邏輯,統計單詞個數 // 定義變數存放累加資料 long count = 0; // v2 -> <hello, {1,1,1,1} for (LongWritable lw : v2) { // 累加操作 count += lw.get(); } // 輸出 k3,v3 -> string, long context.write(key2, new LongWritable(count)); } }
WCCount.java
public class WCCount { public static void main(String[] args) throws Exception { // 1.獲取job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2.指定job使用的類 指定程式的入口 job.setJarByClass(WordCount.class); // 3.設定Mapper的屬性 job.setMapperClass(WCMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 4.設定輸入檔案 FileInputFormat.setInputPaths(job, new Path("/words")); // 5.設定reducer的屬性 job.setReducerClass(WCReduce.class); job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 6.設定輸出資料夾 檢視結果,儲存到 hdfs 資料夾中的位置 FileOutputFormat.setOutputPath(job, new Path("/wcoutx1")); // 7.提交 true 提交的時候列印日誌資訊 job.waitForCompletion(true); } }