[MapReduce_1] 執行 Word Count 示例程式
阿新 • • 發佈:2018-11-05
0. 說明
MapReduce 實現 Word Count 示意圖 && Word Count 程式碼編寫
1. MapReduce 實現 Word Count 示意圖
1. Map:預處理階段,將原始資料對映成每個 K-V,傳送給 reduce
2. Shuffle:混洗(分類),將相同的 Key傳送給同一個 reduce
3. Reduce:聚合階段,把相同的 Key 進行聚合然後進行輸出
2. Word Count 程式碼編寫
[2.1 WCMapper]
package hadoop.mr.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * Mapper 程式 */ public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {/** * map 函式,被呼叫過程是通過 while 迴圈每行呼叫一次 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 將 value 變為 String 格式 String line = value.toString(); // 將一行文字進行截串 String[] arr = line.split(" ");for (String word : arr) { context.write(new Text(word), new IntWritable(1)); } } }
[2.2 WCReducer]
package hadoop.mr.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Reducer 類 */ public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> { /** * 通過迭代所有的 key 進行聚合 */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(key,new IntWritable(sum)); } }
[2.3 WCApp]
package hadoop.mr.wc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Word Count APP */ public class WCApp { public static void main(String[] args) throws Exception { // 初始化配置檔案 Configuration conf = new Configuration(); // 僅在本地開發時使用 // conf.set("fs.defaultFS", "file:///"); // 通過配置檔案初始化 job Job job = Job.getInstance(conf); // 設定 job 名稱 job.setJobName("Word Count"); // job 入口函式類 job.setJarByClass(WCApp.class); // 設定 mapper 類 job.setMapperClass(WCMapper.class); // 設定 reducer 類 job.setReducerClass(WCReducer.class); // 設定 map 的輸出 K-V 型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 設定 reduce 的輸出 K-V 型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 設定輸入路徑和輸出路徑 // Path pin = new Path("E:/test/wc/1.txt"); // Path pout = new Path("E:/test/wc/out"); Path pin = new Path(args[0]); Path pout = new Path(args[1]); FileInputFormat.addInputPath(job, pin); FileOutputFormat.setOutputPath(job, pout); // 執行 job job.waitForCompletion(true); } }