1. 程式人生 > >【hadoop】wordcount例項編寫

【hadoop】wordcount例項編寫

mr例項分為兩個階段,一個是map階段,一個是reduce階段,中間用shuff來銜接,我們想執行mapreduce例項,只需要實現map業務和reduce業務邏輯即可。
map實現

//hadoop首先將input輸入的檔案內容split分為多份,每一份的內容用mapper.map來處理,其中Value就是需要處理的文字內容。context是上下文,用作連線和傳遞資料流的工具。
public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        //序列化的整數
        private
final IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { //將一段內容分解為一個個的字串單詞 StringTokenizer itr = new StringTokenizer(value.toString()); //將一個一個的單詞和對應的數量反饋給context,context接收到之後,經過shuff之後,傳遞給reduce來整合。
while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }

reduce實現

public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new
IntWritable(); //key就是單詞,也就是map中key,IntWritable就是一個個的value組成的列表。 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; //將單詞出現的次數一個個的傳遞過來,然後相加得到結果。 for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }

提交job

public static void main(String[] args) throws Exception {
        //使用預設配置
        Configuration conf = new Configuration();
        //解析命令列引數
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 2) {
            System.out.println("Usage:wordcount");
            System.exit(2);
        }
        //構造一個mr任務
        Job job = Job.getInstance(conf, "word count");
        //設定執行的jar
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

最後將程式碼打成jar包,用hadoop jar提交到mapreduce上,hadoop預設是mapreduce框架來執行,但我們可以修改預設配置,將其改為yarn。最終可以在yarn的監控介面展示任務執行的資訊。