大資料求索(3):實戰MapReduce
MapReduce 概述
主要用於離線、海量資料運算
WordCount編寫
下面這張經典圖很好地說明了如何編寫一個WordCount,也清楚說明了MapReduce的流程
對於輸入的一個文字(可以存放在HDFS上,可以非常非常大),先對檔案進行拆分,假設這裡一行一份,對於每一行,按空格進行切分,然後給每個單詞賦初值為1,這裡同一個map裡有相同的單詞,也是不會覆蓋的,會保留兩個(word, 1),不同的map之間是沒有依賴關係的,是獨立的、並行的。shuffing階段是為了將相同的聚到一起。map的輸出會作為reduce的輸入,注意,這裡會對map的結果做排序,然後reduce階段進行求和,最終得到統計的詞頻。
MapReduce程式設計模型
框架會把輸入看做由鍵值對組成的集合,key和value都需要被框架所序列化,所以都要實現Writable介面,同時預設會進行排序,所以key還需要實現WritableComparable介面。
將作業拆分成map階段和reduce階段,執行map tasks和reduce tasks,MR的執行流程如下
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <lk3, v3>(output)
程式碼
有了上面的分析,可以直接進行編寫了,程式碼如下:
package org.wds.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 使用MapReduce開發wordcount應用程式
*/
public class WordCount {
/**
* map: 讀取輸入的檔案
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
LongWritable one = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 接收到的每一行資料
String line = value.toString();
// 按照分隔符拆分
String[] words = line.split(" ");
for (String word : words) {
// 通過上下文把map的處理結果輸出
context.write(new Text(word), one);
}
}
}
/**
* Reduce:歸併操作
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable value : values) {
// 求key出現的次數總和
sum += value.get();
}
// 最終統計結果的輸出
context.write(key, new LongWritable(sum));
}
}
/**
* 定義Driver : 封裝了MapReduce作業的所有資訊
* @param args
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 設定
Configuration conf = new Configuration();
// 建立job
Job job = Job.getInstance(conf, "wordcount");
// 設定job的處理類
job.setJarByClass(WordCount.class);
// 設定作業處理的輸入路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 設定Map相關引數
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 設定reduce相關引數
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 設定作業處理的輸出路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
這裡輸入輸出是自己指定,輸出是到hdfs,然後使用maven把專案打包,上傳到伺服器,使用hadoop jar xx input output就可以運行了。
這裡還有些問題,就是多次運行同一個job會出現路徑已存在的錯誤,解決辦法便是每次執行前檢查路徑是否存在,存在則刪除,操作hdfs很容易實現,在main函式裡裡加入如下程式碼:
// 清理已存在的輸出目錄
Path outpath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(conf);
if (fileSystem.exists(outpath)) {
fileSystem.delete(outpath, true);
System.out.println("output file exists, but it has deleted");
}
MapReduce核心概念
-
Split
交由MR作業來處理的資料庫,是MR中最小的計算單元,類比於HDFS的blocksize,blocksize是HDFS最小的儲存單元,預設是128M。
預設情況下,兩者是一一對應的,當然也可以修改(不建議)。
-
InputFormat
將輸入資料進行分片,將一個檔案拆分為多個split,底層呼叫的是InputSplit[] getSplits(JobConf job, int numSplits)
比較常用的是TextInputFormat,用於處理文字
-
OutputFormat
輸出
-
Combiner
如下一個經典的圖可以很好地解釋
對map的結果先進行合併,合併之後總共有4條資料,沒合併之前有9條,如果資料很大,這樣就能夠大大減少網路傳輸的消耗,相當於map在本地做了一個reduce。
使用Combiner也非常簡單,直接在設定裡面加入如下程式碼
// 通過job設定combiner處理類,其實邏輯上和reduce一模一樣 // combiner使用場景是有限制的,比如求和、排序,但是求平均是錯誤的 job.setCombinerClass(MyReducer.class);
-
Partitioner
Partitioner決定了MapTask輸出的資料交由哪個ReduceTask進行處理,預設情況下,是由分發的key的hash值對Reduce Task個數進行取模。