1. 程式人生 > >[MapReduce_1] 執行 Word Count 示例程式

[MapReduce_1] 執行 Word Count 示例程式


 

0. 說明

  MapReduce 實現 Word Count 示意圖 && Word Count 程式碼編寫

 

 

 


 1. MapReduce 實現 Word Count 示意圖

  

 

  1. Map:預處理階段,將原始資料對映成每個 K-V,傳送給 reduce
  2. Shuffle:混洗(分類),將相同的 Key傳送給同一個 reduce
  3. Reduce:聚合階段,把相同的 Key 進行聚合然後進行輸出

 


 

2. Word Count 程式碼編寫 

  [2.1 WCMapper]

 

package hadoop.mr.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * Mapper 程式
 */
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
/** * map 函式,被呼叫過程是通過 while 迴圈每行呼叫一次 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 將 value 變為 String 格式 String line = value.toString(); // 將一行文字進行截串 String[] arr = line.split(" ");
for (String word : arr) { context.write(new Text(word), new IntWritable(1)); } } }

 

  [2.2 WCReducer]

package hadoop.mr.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Reducer 類
 */
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    /**
     * 通過迭代所有的 key 進行聚合
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;

        for (IntWritable value : values) {
            sum += value.get();
        }

        context.write(key,new IntWritable(sum));
    }
}

 

  [2.3 WCApp]

package hadoop.mr.wc;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * Word Count APP
 */
public class WCApp {
    public static void main(String[] args) throws Exception {
        // 初始化配置檔案
        Configuration conf = new Configuration();

        // 僅在本地開發時使用
//        conf.set("fs.defaultFS", "file:///");

        // 通過配置檔案初始化 job
        Job job = Job.getInstance(conf);

        // 設定 job 名稱
        job.setJobName("Word Count");

        // job 入口函式類
        job.setJarByClass(WCApp.class);

        // 設定 mapper 類
        job.setMapperClass(WCMapper.class);

        // 設定 reducer 類
        job.setReducerClass(WCReducer.class);

        // 設定 map 的輸出 K-V 型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 設定 reduce 的輸出 K-V 型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 設定輸入路徑和輸出路徑
//        Path pin = new Path("E:/test/wc/1.txt");
//        Path pout = new Path("E:/test/wc/out");
        Path pin = new Path(args[0]);
        Path pout = new Path(args[1]);
        FileInputFormat.addInputPath(job, pin);
        FileOutputFormat.setOutputPath(job, pout);

        // 執行 job
        job.waitForCompletion(true);
    }
}