MR計算模型二
mapreduce函式的編寫 1
map函式
繼承Mapper<Object, Object, Object, Object>
重寫public void map(Object key, Object value, Context context) throws IOException, InterruptedException 方法
map函式主要用於資料的清洗和原始處理
map函式的輸入輸出
map函式每執行一次,處理一條資料
map的輸入,key預設是行號的偏移量,value是一行的內容
context.write(Object, Object)方法輸出
map的輸出是reduce的輸入
mapreduce函式的編寫 2
reduce函式
繼承Reducer<Object, Object, Object, Object>
重寫public void reduce(Object key, Iterable<Object> values, Context context) throws IOException, InterruptedException 方法 reduce函式是主要的業務處理和資料探勘部分
reduce函式的輸入輸出
context.write(data, new IntWritable(1))方法輸出
reduce的輸入時map的輸出,但不是直接輸出,而是按照相同key彙總過後的集合
context.write(Object, Object)方法輸出
mapreduce函式的編寫 3
編寫job
logger.warn("HelloHadoopSort已啟動"); Configuration coreSiteConf = new Configuration(); coreSiteConf.addResource(Resources.getResource("core-site.xml")); Job job = Job.getInstance(coreSiteConf, "HelloHadoopSort"); job.setJarByClass(HelloHadoopSort.class); //設定Map和Reduce處理類 job.setMapperClass(SortMapper.class); job.setReducerClass(SortReducer.class); //設定map輸出型別 job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("/sort/input")); FileOutputFormat.setOutputPath(job, new Path("/sort/output")); boolean flag = job.waitForCompletion(true); logger.warn("HelloHadoopSort已完成,執行結果:" + flag);
WordCountMap類繼承了
org.apache.hadoop.mapreduce.Mapper,4個泛型型別分別是map函式輸入key的型別,輸入value的型別,輸出key的型別,輸出value的型別。
WordCountReduce類繼承了org.apache.hadoop.mapreduce.Reducer,4個泛型型別含義與map類相同。
map的輸出型別與reduce的輸入型別相同,而一般情況下,map的輸出型別與reduce的輸出型別相同,因此,reduce的輸入型別與輸出型別相同。
在map中,讀取一行內容,按照空格分組,得到一行中的每個單詞,把單詞做為key輸出,value的內容可以為空或任意內容。
在reduce中,獲取到某個單詞及所有集合,集合的尺寸即是該單詞出現的數量,把單詞及其數量輸出到hdfs中