Hadoop MapReduce資料處理過程以及更多示例
上一篇文章介紹了Hadoop的單機配置以及一個簡單的MapReduce示例,今天看看MapReduce處理資料的流程是怎樣的。建議閱讀本文前,最好能看一下上一篇文章的程式碼。
上圖以上一篇文章的MapReduce示例為例,展示了單機配置下MapReduce的處理流程,由於單機情況下更容易理解處理流程,所以這篇文章以單機處理為例,實際上,分散式配置時,也是這樣的流程,只是在每個環節的資料形式有所不同,後面的文章會進行介紹。先來看一下上面的流程。
1.input
該階段很簡單,就是將輸入檔案中的內容按行分割為key和value的形式。看一下上篇文章中的map方法的引數
這裡的key和value,就對應input階段生成的key和value。protected void map(LongWritable key, Text value, Context context)
2.Map
Map階段所做的工作就是我們map方法所做的事情,這是由我們自己定義的,主要對input實現細分和過濾,保留我們需要的資料,以key和value的形式,通過context輸出。之前例子中,我們在map方法中取出了每一個單詞的首字母,以首字母作為key,數值1作為value進行輸出。
3.Shuffle
注意上圖中Map輸出中所示的B,有兩行,也就是說,Map階段的輸出,會有key相同的記錄。而且,輸出是無序的。在Shuffle階段,會對map的輸出按照key進行合併和排序,然後作為reduce的輸入使用。如果使用自定義的類物件作為key,該類可以實現WritableComparable介面,定義自己的排序方式。
4.Reduce
Reduce的過程完成reduce方法所做的事情。我們的示例中將所屬key下的1進行了疊加,從而計算出一個字母作為首字母出現的次數。最後結果以key和value的方式輸出。
以上就是MapReduce的處理過程,最後說明一下,我們在派生Mapper類和Reducer類時,都分別傳入了四個型別引數,他們分別對應Map和Reduce階段的輸入key和value型別以及輸出key和value型別。對於基本型別,Hadoop提供了可序列化類,如示例中的LongWritable(對應Long),Text(對應String)等,最好使用這些類,以方便Hadoop在分散式情況下對資料進行處理。
瞭解了處理流程,這裡再給出兩個簡單示例,方便大家理解。先看第一個示例,我們將英文文章中出現的單詞進行排序,使用的輸入仍然是上一篇文章中的text。程式碼如下:
package com.yjp.mapreduce;
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordSort {
// 執行Map
private static class WordSortMapper
extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer token = new StringTokenizer(line);
while (token.hasMoreTokens()) {
String word = token.nextToken();
Pattern p = Pattern.compile("[A-Za-z]+");
Matcher m = p.matcher(word);
if (m.find()) {
context.write(new Text(m.group(0)), new Text());
}
}
}
}
// 執行Reduce
private static class WordSortReducer
extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
context.write(key, new Text());
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: WordSort <inout path> <output path>");
System.exit(-1);
}
// 設定類資訊,方便hadoop從JAR檔案中找到
Job job = Job.getInstance();
job.setJarByClass(WordSort.class);
job.setJobName("Word Sort");
// 新增輸入輸出路徑
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 設定執行Map和Reduce的類
job.setMapperClass(WordSortMapper.class);
job.setReducerClass(WordSortReducer.class);
//設定輸出資料型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
這個例子很簡單,上面的流程我們分析過,執行完Shuffle實際上已經完成了歸併和排序,所以,我們這個示例,Reduce過程很簡單,直接將傳遞進來的輸入進行輸出即可。
第二個示例,我們統計一段文字中出現的字母及其數目。瞭解了流程,這就簡單多了,我們只要基於上一篇文章的示例,修改它的map方法即可
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer token = new StringTokenizer(line);
while (token.hasMoreTokens()) {
String word = token.nextToken();
Pattern p = Pattern.compile("[A-Za-z]+");
Matcher m = p.matcher(word);
if (m.find()) {
word = m.group(0);
for (int i = 0; i < word.length(); i++) {
context.write(new Text(word.charAt(i) + ""), one);
}
}
}
}
程式碼已上傳直github.