1. 程式人生 > >Hadoop的WordCount案例

Hadoop的WordCount案例

第一步:先建立一個mapper類
注:WordCountMapper繼承了Mapper類,重寫了map()方法,定義了輸入的檔案的內容的型別:LongWritable, Text和輸出的型別:Text, IntWritable。

package cn.lsm.bigdata.WordCount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 * KEYIN,VALUEIN:輸入進入的型別
 * context:周圍環境,上下文,來龍去脈
 * @author lsm
 *
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
      //重寫map方法
      @Override
      protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
           //拿到一行資料轉換城String
           String line = value.toString();
           //將每一行進行切割
           String[] words = line.split(" ");
           //遍歷陣列,輸出<單詞>
           //輸出的方式:context(key,value)
           for(String word: words) {
                 context.write(new Text(word), new IntWritable(1));
           }
      }
}


第二步:定義一個reducer類 注:wordcountreducer繼承了reducer類。並重寫了reudcer()方法,輸入的資料型別就是map的輸出的型別:Text, IntWritable,輸出的型別是:Text, IntWritable
package cn.lsm.bigdata.WordCount;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * myreduce的生命週期:框架每傳遞一個kv組,reduce方法被呼叫一次
 *
 * @author lsm
 *
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        // 先定義一個計數器
        int count = 0;

        // 遍歷一個kv組的所有v,累加到count中
        for (IntWritable value : values) {
            count += value.get();
        }
        // 將獲得的kv對輸出出去
        context.write(key, new IntWritable(count));
    }
}

第三步:建立一個job 注:這個類的屬性就是為了將我們的mapper和reducer提交給叢集來處理檔案的,其中的步驟是: 1.新建一個configuration 2.新建一個job 3.指定jar的位置 4.指定map和reduce的位置 5.指定map和reudce的輸出的型別 6.提交job
package cn.lsm.bigdata.WordCount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 提交任務的job,解釋哪一個是map,哪一個是reduce,要處理的資料在哪裡,輸出的資料在哪裡
 *
 * @author lsm
 *
 */
public class WordCountRunner {
      public static void main(String[] args) throws Exception {
           // 新建一個配置檔案
           Configuration conf = new Configuration();
           Job wcjob = Job.getInstance(conf);
           // 指定我這個job所在的jar包位置,放在jar包所在的位置
           wcjob.setJarByClass(WordCountRunner.class);

           // 設定map和reduce的class檔案
           wcjob.setMapperClass(WordCountMapper.class);
           wcjob.setReducerClass(WordCountReducer.class);

           // 設定我們的的map輸出的key和value的型別
           wcjob.setMapOutputKeyClass(Text.class);
           wcjob.setOutputValueClass(IntWritable.class);

           // 設定reduce輸出的key和value的型別
           wcjob.setOutputKeyClass(Text.class);
           wcjob.setOutputValueClass(IntWritable.class);

           // 指定要處理的資料所在的位置
           FileInputFormat.setInputPaths(wcjob, new Path(args[0]));
           // 指定要處理的資料的所輸出的位置
           FileOutputFormat.setOutputPath(wcjob, new Path(args[1]));

           // 向yarn叢集提交這個job
           boolean res = wcjob.waitForCompletion(true);
           System.exit(res ? 0 : 1);

      }
}

第四步:執行任務 [[email protected] hadoop]# hadoop jar mywordcount1.jar cn.lsm.bigdata.WordCount.WordCountRunner /wordcount/input/ /wordcount/output2/
執行的結果