使用者定義的java計數器
阿新 • • 發佈:2018-12-09
mapreduce 計數器用來做某個資訊的統計。
計數器是全域性的。mapreduce 框架將跨所有map和reduce聚集這些計數器,並且作業結束時產生一個最終的結果。
語法像 java 的 enum 型別。
需求: 統計某個目錄下,各個檔案一共出現的行數,和出現單詞的總數。
思路: 定義一個計數器。
package com.mapreduce.count; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class CountDerived { // 定義一個全域性的計數器,每個map,reduce都可以訪問到 enum COUNT{ LINES_COUNT, WORDS_COUNT } public static void main(String[] args) throws Exception { // 1 獲取configuration Configuration configuration = new Configuration();// 2 job Job job = Job.getInstance(configuration); // 3 作業jar包 job.setJarByClass(CountDerived.class); // 4 map, reduce jar 包 job.setMapperClass(CounterMap.class); // 5 map 輸出型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 6 最終 輸出型別 (reducer) job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 7 inputformatclass , outputformatclass 輸入輸出入檔案型別 可能決定分片資訊 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); // 8 輸入輸出檔案路徑 FileInputFormat.setInputPaths(job, new Path("d:/input")); FileOutputFormat.setOutputPath(job, new Path("d:/output1")); // 9 job提交 job.waitForCompletion(true); } }
package com.mapreduce.count; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapreduce.Mapper; /* * job 那邊定義 全域性計數器 count { lineCount, wordsCount } */ import com.mapreduce.count.CountDerived.COUNT; public class CounterMap extends Mapper<LongWritable, Text, Text, IntWritable>{ Text k = new Text(); IntWritable v = new IntWritable(); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 動態獲取計數器 Counter line_counter = (Counter) context.getCounter(COUNT.LINES_COUNT); //將計數器 + 1 line_counter.increment(1); String line = value.toString(); String[] words = line.split(" "); v.set(1); for(String w:words){ // 同理 context.getCounter(COUNT.WORDS_COUNT).increment(1); k.set(w); context.write(k, v); } } }