學習Hadoop MapReduce與WordCount例子分析
阿新 • • 發佈:2019-02-14
/* MapReduce框架一直圍繞著key-value這樣的資料結構,下面以官方自帶的WordCount為例子,自己分析MapReduce的工作機制。MapReduce可以分為Map和Reduce過程, 程式碼實現了兩個類,分別是繼承Mapper和Reduceer,Mapper類裡面有map介面,Reduceer類有reduce介面,對於統計單詞這個例子來說,MapReduce會把檔案以行為 拆分物件,每分析一行就會呼叫Mapper類裡面的map介面,然後map接口裡面的程式碼由程式設計師實現其邏輯,然後把map介面處理完的結果輸送給Reduceer的reduce的接 口,中間還可以插入一個combiner的介面用於對map介面的資料進行中間結果處理再丟給reduce做最終的彙總。具體流程看程式碼註釋。 */ import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ /* Mapper他是一個模板類,Class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>, KEYIN 輸入key的型別,VALUEIN輸入value的型別 KEYOUT 輸出key的型別,VALUEOUT輸出value的型別 四個型別決定了map介面的輸入與輸出型別 比較形象地描述key,value,在map,combiner,reduce流轉的 (input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output) 其中還有規定,就是KEY和VALUE型別必須是實現了Writeable介面的,KEY型別還需要額外實現WritableComparable介面 通常在Mapper模板裡面,KEYIN是不需要特指定的,就用基類Object就可以了,VAULEIN指定為Text,這個Text是<pre name="code" class="java"> org.apache.hadoop.io.Text,這個Text已經滿足了實現Writeable介面的條件了,在這個例子裡面VALUE就是檔案的行內 容,所以定義型別為Text。 對於KEYOUT和VALUEOUT,作為輸出key型別和value型別,這裡定義為Text和IntWritable,keyout就是需要統計單詞個數 的單詞,IntWriteable就是對應某個單詞的次數,其實這個就是一個Int型別,為了符合介面需要所以就基礎了Writeable Context它是一個貫通map介面<-->combiner介面<-->reduce介面的上下文資料,在map接口裡面,單詞對應次數會儲存在context 裡面,到了reduce介面,MapReduce會把之前map的context用key對應結果集合的形式給reduce介面。 */ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } /* 下面是對兩個檔案統計單詞呼叫map介面之後的context結果 For the given sample input the first map emits: < Hello, 1> < World, 1> < Bye, 1> < World, 1> The second map emits: < Hello, 1> < Hadoop, 1> < Goodbye, 1> < Hadoop, 1> */ public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { /* Reduceer也是一個類模板,跟Mapper一樣需要指定KEYIN,VALUEIN,KEYOUT,VALUEOUT, 其中KEYIN和VALUEIN必須跟Mapper的KEYOUT,VALUEOUT一一對應,因為map介面輸出的結果key->value 就是reduce介面的輸入,只是MapReduce框架把map接口裡面相同的key變成一個key->values 的values集合,所以在reduce接口裡面KEYIN是Text也就是單詞,VALUEOUT是IntWriteable集合的 迭代器Interable<IntWriteable>,context就是reduce的輸出結果了 */ private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } /* 在例子裡面,還指定了combiner,其實cominer和reduce都是同一個介面reduce,第一次呼叫reduce介面是combiner過程,把每個檔案 的單詞做了key->value 到 key->values的彙總,結果如下 The output of the first map: < Bye, 1> < Hello, 1> < World, 2> The output of the second map: < Goodbye, 1> < Hadoop, 2> < Hello, 1> */ /* 第二次呼叫reduce介面,就是reduce的過程,把combiner處理過的中間結果做一次最終的彙總 < Bye, 1> < Goodbye, 1> < Hadoop, 2> < Hello, 2> < World, 2> */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }