1. 程式人生 > >大資料09--MapReduce習題~統計單詞數量

大資料09--MapReduce習題~統計單詞數量

一、利用MapReduce計算單詞

WordcountDriver

**
 * 相當於一個yarn叢集的客戶端
 * 需要在此封裝我們的mr程式的相關執行引數,指定jar包
 * 最後提交給yarn
 */
public class WordcountDriver {

    public static void main(String[] args) throws Exception {
        System.setProperty("hadoop.home.dir", "e:/hadoop-2.8.3");
        System.setProperty("HADOOP_USER_NAME", "root") ;

        if (args == null || args.length == 0) {
            return;
        }
        //該物件會預設讀取環境中的 hadoop 配置。當然,也可以通過 set 重新進行配置
        Configuration conf = new Configuration();

        //job 是 yarn 中任務的抽象。
        Job job = Job.getInstance(conf);

        /*job.setJar("/home/hadoop/wc.jar");*/
        //指定本程式的jar包所在的本地路徑
        job.setJarByClass(WordcountDriver.class);

        //指定本業務job要使用的mapper/Reducer業務類
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        //指定mapper輸出資料的kv型別。需要和 Mapper 中泛型的型別保持一致
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //指定最終輸出的資料的kv型別。這裡也是 Reduce 的 key,value型別。
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

       job.setNumReduceTasks(2);
        //指定job的輸入原始檔案所在目錄
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job的輸出結果所在目錄
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //改變切塊大小
        FileInputFormat.setMinInputSplitSize(job, 301349250);

        //將job中配置的相關引數,以及job所用的java類所在的jar包,提交給yarn去執行
        /*job.submit();*/
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
    //C:\Users\Administrator\hdfsDemo01\out\artifacts\hdfsDemo01_jar

WordcountMapper

package com.neusoft;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 * 該類做為一個 mapTask 使用。類聲名中所使用的四個泛型意義為別為:
 *
 * KEYIN:   預設情況下,是mr框架所讀到的一行文字的起始偏移量,Long,
 *      但是在hadoop中有自己的更精簡的序列化介面,所以不直接用Long,而用LongWritable
 * VALUEIN: 預設情況下,是mr框架所讀到的一行文字的內容,String,同上,用Text
 * KEYOUT:  是使用者自定義邏輯處理完成之後輸出資料中的key,在此處是單詞,String,同上,用Text
 * VALUEOUT:是使用者自定義邏輯處理完成之後輸出資料中的value,在此處是單詞次數,Integer,同上,用IntWritable
 */

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    /**
     * map階段的業務邏輯就寫在自定義的map()方法中 maptask會對每一行輸入資料呼叫一次我們自定義的map()方法
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 將maptask傳給我們的文字內容先轉換成String,空格將這一行切分成單詞
        String[] splits = value.toString().split(" ");
        // 將單詞輸出為<單詞,1>
        for (String split:splits){
            // 將單詞作為key,將次數1作為value,以便於後續的資料分發,
            // 可以根據單詞分發,以便於相同單詞會到相同的reduce task
            context.write(new Text(split),new IntWritable(1));
        }
    }
}
// Mapperc處理完以後的結果<i,1><have,1><a,1><dream,1><a,1><dream,1>

WordcountReducer

package com.neusoft;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 與 Mapper 類似,繼承的同事聲名四個泛型。
 * KEYIN, VALUEIN 對應  mapper輸出的KEYOUT,VALUEOUT型別對應
 * KEYOUT, VALUEOUT 是自定義reduce邏輯處理結果的輸出資料型別。此處 keyOut 表示單個單詞,valueOut 對應的是總次數
 */
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    /**
     * Mapperc處理完以後的結果<i,1><have,1><a,1><dream,1><a,1><dream,1>
     * 入參key,是一組相同單詞kv對的key
     * 框架自動整合成(1-7):dream<list(1,1)> ====> dteam--key  list(1,1)---list
     * reduce
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count=0;
        for(IntWritable value : values){
            count += value.get();
        }
        //輸出每一個單詞出現的次數
        context.write(key, new IntWritable(count));
}
}