從 WordCount 到 MapReduce 計算模型
概述
雖然現在都在說大記憶體時代,不過記憶體的發展怎麼也跟不上資料的步伐吧。所以,我們就要想辦法減小資料量。這裡說的減小可不是真的減小資料量,而是讓資料分散開來。分開儲存、分開計算。這就是 MapReduce 分散式的核心。
版權說明
目錄
MapReduce 簡介
要了解 MapReduce,首先要了解 MapReduce 的載體是什麼。在 Hadoop 中,用於執行 MapReduce 任務的機器有兩個角色:一個是 JobTracker,另一個是 TaskTracker。JobTracker 是用於管理和排程工作的,TaskTracker 是用於執行工作的。一個 Hadoop 叢集中只有一臺 JobTracker(當然在 Hadoop 2.x 中,一個 Hadoop 叢集中可能有多個 JobTracker)。
MapReduce 原理
MapReduce 模型的精髓在於它的演算法思想——分治。對於分治的過程可以參見我之前的一篇部落格《大資料演算法:對5億資料進行排序》。還有就是可以去學習一下排序演算法中的歸併排序,在這個排序演算法中就是基於分治思想的。
迴歸正題,在 MapReduce 模型中,可以把分治的這一概念表現得淋漓盡致。在處理大量資料的時候(比如說 1 TB,你別說沒有這麼多的資料,大公司這點資料也不算啥的),如果只是單純地依賴我們的硬體,就顯得有些力不從心了。首先我們的記憶體沒有那麼大,如放在磁碟上處理,那麼過多的 IO 操作無疑是一個死穴。聰明的 Google 工程師總是給我們這些渣渣帶來驚喜,他們想把了把這些資料分散到許多機器上,在這些機器上完成一些初步的計算,再經過一系列的彙總,最後在我們的機器上(Master/Namenode)統計結果。
要知道我們不可能把我們的資料分散到隨意的 N 臺機器上。那麼我們就必須讓這些機器之間建立一種可靠的關聯,這樣的關聯形成了一個計算機叢集。這樣我們的資料就可以分發到叢集中的各個計算機上了。在 Hadoop 裡這一操作可以通過 -put
當資料被上傳到 Hadoop 的 HDFS 檔案系統上之後,就可以通過 MapReduce 模型中的 Mapper 先將資料讀進記憶體,過程像下面這樣:
經過 Mapper 的處理,資料會變成這樣
好了,到了這裡,Map 的過程就已經結束了。接下來就是 Reduce 的過程了。
可以看到這裡有一個 conbin 的過程,這個過程,也可以沒有的。而有的時候是一定不能有的,在後面我們可以會單獨來說說這裡的 conbin,不過不是本文的內容,就不詳述了。
這樣整個 MapReduce 過程就已經 over 了,下面看看具體的實現及測試結果吧。
WordCount 程式
需求分析
- 現在有大量的檔案
- 每個檔案又有大量的單詞
- 要求統計每個單詞的詞頻
邏輯實現
Mapper
public static class CoreMapper extends Mapper<Object, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private static Text label = new Text();
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
StringTokenizer tokenizer = new StringTokenizer(value.toString());
while(tokenizer.hasMoreTokens()) {
label.set(tokenizer.nextToken());
context.write(label, one);
}
}
}
Reducer
public static class CoreReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable count = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
if (null == values) {
return;
}
int sum = 0;
for (IntWritable intWritable : values) {
sum += intWritable.get();
}
count.set(sum);
context.write(key, count);
}
}
Client
public class ComputerClient extends Configuration implements Tool {
public static void main(String[] args) {
ComputerClient client = new ComputerClient();
args = new String[] {
AppConstant.INPUT,
AppConstant.OUTPUT
};
try {
ToolRunner.run(client, args);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public Configuration getConf() {
return this;
}
@Override
public void setConf(Configuration arg0) {
}
@Override
public int run(String[] args) throws Exception {
Job job = new Job(getConf(), "ComputerClient-job");
job.setJarByClass(CoreComputer.class);
job.setMapperClass(CoreComputer.CoreMapper.class);
job.setCombinerClass(CoreComputer.CoreReducer.class);
job.setReducerClass(CoreComputer.CoreReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
}
本地執行
關於本地執行沒什麼好說的,就是在 Eclipse 裡配置好執行引數或是直接在程式碼裡指定輸入輸出路徑。然後 Run As 一個 Hadoop 程式即可。
分散式執行
在分散式執行 MapReduce 的過程中,主要有以下幾個步驟:
1. 打包
2. 上傳源資料
3. 分散式執行
打包
在打包的過程中,可以使用命令列打包,也可以使用 Eclipse 自帶的 Export。在 Eclipse 的打包匯出過程中,與打包匯出一個 Java 的 jar 過程是一樣的。這裡就不多說了。假設我們打成的 jar 包為: job.jar
上傳源資料
上傳源資料是指將本地的資料上傳到 HDFS 檔案系統上。
在上傳源資料之前我們需要在 HDFS 上新建你需要上傳的目標路徑,然後使用下面的這條指令即可完成資料的上傳。
$ hadoop fs -mkdir <hdfs_input_path>
$ hadoop fs -put <local_path> <hdfs_input_path>
如果這裡之前你不進行建立目錄,上傳過程會因為找不到目錄而出現異常情況。
資料上傳完成後,這些資料會分佈在你整個叢集的 DataNode 上,而不只是在你的本地機器上了。
分散式執行
等上面的所有事情已經就緒,那麼就可以使用下面的 hadoop 指令執行我們的 hadoop 程式。
$ hadoop jar job.jar <hdfs_input_path> <hdfs_output_path>
結果視窗
開啟瀏覽器
這裡是程式中執行的過程中,進度的變化情況
下面是程式執行完成時的網頁截圖
Ref
- 《Hadoop 實戰》