hadoop-WordCount單詞統計
阿新 • • 發佈:2018-12-19
/**
*
*Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*
*輸入 key 文字中偏移量
*value 文字中的內容
*
*輸出 key 是文字的內容
*
*value 是單詞出現的次數
*/
public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text k=new Text();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//1 獲取一行的資料
String line=value.toString();
//2 切割 按照空格切分
String[] words=line.split(" ");
for(String word:words) {
k.set(word); //把String型別的word 轉換為Text型別
//3 輸出到Reduce
context.write(k, new IntWritable(1));
}
}
//需要實現Map方法編寫業務邏輯
}
/*
hello 1
*hadoop 1
*
*hadoop 1
*
*hadoop 2
*
*把相同key的values進行累加
*/
public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable< IntWritable> values,
Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
int sum=0;
for(IntWritable count:values) {
sum+=count.get();
}
//輸出
context.write(key, new IntWritable(sum));
}
}
public class Driver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1 獲得配置資訊
Configuration config=new Configuration();
// 例項化 job類 並且把配置資訊傳給job
Job job=Job.getInstance(config);
// 通過反射機制 載入主類的位置
job.setJarByClass(Driver.class);
//設定map和reduce類
job.setMapperClass(WordCountMap.class);
job.setReducerClass(WordCountReduce.class);
//設定map的輸出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//設定redue的輸出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//設定檔案的輸入 輸出路徑
FileInputFormat.setInputPaths(job, new Path("/input"));
FileOutputFormat.setOutputPath(job, new Path("/output"));
//提交任務
boolean result=job.waitForCompletion(true);
System.exit(result?0:1);
}
}