Hadoop的WordCount案例
阿新 • • 發佈:2019-01-05
第一步:先建立一個mapper類
注:WordCountMapper繼承了Mapper類,重寫了map()方法,定義了輸入的檔案的內容的型別:LongWritable, Text和輸出的型別:Text, IntWritable。
package cn.lsm.bigdata.WordCount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * KEYIN,VALUEIN:輸入進入的型別 * context:周圍環境,上下文,來龍去脈 * @author lsm * */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ //重寫map方法 @Override protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { //拿到一行資料轉換城String String line = value.toString(); //將每一行進行切割 String[] words = line.split(" "); //遍歷陣列,輸出<單詞> //輸出的方式:context(key,value) for(String word: words) { context.write(new Text(word), new IntWritable(1)); } } }
第二步:定義一個reducer類 注:wordcountreducer繼承了reducer類。並重寫了reudcer()方法,輸入的資料型別就是map的輸出的型別:Text, IntWritable,輸出的型別是:Text, IntWritable
package cn.lsm.bigdata.WordCount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * myreduce的生命週期:框架每傳遞一個kv組,reduce方法被呼叫一次 * * @author lsm * */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 先定義一個計數器 int count = 0; // 遍歷一個kv組的所有v,累加到count中 for (IntWritable value : values) { count += value.get(); } // 將獲得的kv對輸出出去 context.write(key, new IntWritable(count)); } }
第三步:建立一個job 注:這個類的屬性就是為了將我們的mapper和reducer提交給叢集來處理檔案的,其中的步驟是: 1.新建一個configuration 2.新建一個job 3.指定jar的位置 4.指定map和reduce的位置 5.指定map和reudce的輸出的型別 6.提交job
package cn.lsm.bigdata.WordCount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 提交任務的job,解釋哪一個是map,哪一個是reduce,要處理的資料在哪裡,輸出的資料在哪裡 * * @author lsm * */ public class WordCountRunner { public static void main(String[] args) throws Exception { // 新建一個配置檔案 Configuration conf = new Configuration(); Job wcjob = Job.getInstance(conf); // 指定我這個job所在的jar包位置,放在jar包所在的位置 wcjob.setJarByClass(WordCountRunner.class); // 設定map和reduce的class檔案 wcjob.setMapperClass(WordCountMapper.class); wcjob.setReducerClass(WordCountReducer.class); // 設定我們的的map輸出的key和value的型別 wcjob.setMapOutputKeyClass(Text.class); wcjob.setOutputValueClass(IntWritable.class); // 設定reduce輸出的key和value的型別 wcjob.setOutputKeyClass(Text.class); wcjob.setOutputValueClass(IntWritable.class); // 指定要處理的資料所在的位置 FileInputFormat.setInputPaths(wcjob, new Path(args[0])); // 指定要處理的資料的所輸出的位置 FileOutputFormat.setOutputPath(wcjob, new Path(args[1])); // 向yarn叢集提交這個job boolean res = wcjob.waitForCompletion(true); System.exit(res ? 0 : 1); } }
第四步:執行任務 [[email protected] hadoop]# hadoop jar mywordcount1.jar cn.lsm.bigdata.WordCount.WordCountRunner /wordcount/input/ /wordcount/output2/
執行的結果: