wordcount 原始碼詳解
1.原始碼解釋
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
/*定義hadoop資料型別IntWritable例項one,並且賦值為1,為什麼是1呢?
因為不管一個單詞出現幾次,我們都會直接輸出1
假如輸入資料是 nice to nice 那麼用context.write(word,one)處理的結果就是
nice 1
to 1
nice 1
假如賦值是2 那麼輸出就變成了,
nice 2
to 2
nice 2
*/
private final static IntWritable one = new IntWritable(1);
//定義hadoop資料型別Text
private Text word = new Text();
//實現map函式,其中countext引數可以記錄key和value
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString()); //字串分解類 把一行單詞分解成一個個
while (itr.hasMoreTokens()) { //迴圈條件表示返回是否還有分隔符
word.set(itr.nextToken()); //itr.nextToken獲取單詞,word.set() 方法將Java資料型別轉換成hadoop資料型別轉換,只有這樣才能輸出
context.write(word, one); //按照格式輸出結果,比如nice 1
}
}
}
public static class IntSumReducer //繼承泛型類reducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context
) throws IOException, InterruptedException {
//用sum來記錄相同單詞的數量,是java型別的資料
int sum = 0;
for (IntWritable val : values) { //遍歷得到結果
sum += val.get(); //因為val的資料型別是IntWritable,所以需要將IntWritable型別的資料轉換成java可以識別的資料型別,Intwritable.set(), IntWritable.get()都能進行資料轉換,set()方法是將Java資料型別轉換成hadoop可識別的資料型別,而get()方法是將hadoop的資料型別轉換成java可識別的資料型別
}
result.set(sum); //將java的資料型別轉換成Hadoop可識別的資料型別
context.write(key, result); //輸出結果到hdfs
}
}
public static void main(String[] args) throws Exception {
//讀取hadoop的配置引數,也就是安裝hadoop時候的配置檔案例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等檔案裡的資訊
Configuration conf = new Configuration();
//GenericOptionsParser類,它是用來解釋常用hadoop命令,並根據需要為Configuration物件設定相應的值
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//執行WordCount程式時候一定是兩個引數,如果不是就會報錯退出,其實這2個引數的作用就是告訴機器輸入檔案的地址和輸出檔案的地址
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count"); //建立一個任務,第二個引數word count是任務的名稱
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class); //裝載程式設計師編寫好的計算程式,例如我們的程式類名就是WordCount了。這裡我要做下糾正,雖然我們編寫mapreduce程式只需要實現map函式和reduce函式,但是實際開發我們要實現三個類,第三個類是為了配置mapreduce如何執行map和reduce函式,準確的說就是構建一個mapreduce能執行的job了,例如WordCount類。
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class); //指定要使用的reduce類
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class); //定義輸出結果的key/value的型別,也就是最終儲存在hdfs上結果檔案的key/value的型別。
FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //確定輸入檔案的路徑
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); //確定輸出檔案的路徑
System.exit(job.waitForCompletion(true) ? 0 : 1); //如果job成功執行,我們的程式就會正常退出
}
}
2.自己動手寫出WordCount程式
分析WordCount的示例程式,可以看出一個基本的結構
底層的東西,hadoop的框架已經幫我們搞定了,我們剩下的工作其實就是設計以下3個方面
- mapper類
- reducer類
- main() 方法
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class xxx {
public static class MyMapper extends Mapper<xxx, xxx, xxx, xxx>{
public void map(xxx key, xxx value, Context context) throws IOException, InterruptedException {
編寫你的程式碼
}
}
public static class MyReducer extends Reducer<xxx,xxx,xxx,xxx> {
public void reduce(xxx key, xxx values, Context context) throws IOException, InterruptedException {
編寫你的程式碼
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "xxx");
job.setJarByClass(xxx.class); //根據自己編寫的類來填寫
job.setMapperClass(xxx.class);
job.setCombinerClass(xxx.class);
job.setReducerClass(xxx.class);
job.setOutputKeyClass(xxx.class);
job.setOutputValueClass(xxx.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}