Mapreduce原理及應用
Mapreduce原理
MapReduce(以下簡稱MR)是一種程式設計模型,用於大規模資料集(大於1TB)的並行運算。概念"Map(對映)“和"Reduce(歸約)”,是它們的主要思想,都是從函數語言程式設計語言裡借來的,還有從向量程式語言裡借來的特性。它極大地方便了程式設計人員在不會分散式並行程式設計的情況下,將自己的程式執行在分散式系統上。 當前的軟體實現是指定一個Map(對映)函式,用來把一組鍵值對對映成一組新的鍵值對,指定併發的Reduce(歸約)函式,用來保證所有對映的鍵值對中的每一個共享相同的鍵組。
MR主要思想:分久必合
MR是由兩個階段組成
Map端
Reduce端
MR核心思想:“相同”的key為一組,呼叫一次reduce方法,方法內迭代這一組資料進行計算
MapReduce分散式計算原理
MR在計算之前,會將HDFS上的檔案劃分切片
預設大小 block = split切片 = map task 注:split會比block大幾kb或小几kb,因為block是嚴格按照位元組切分,防止資料亂碼,會將block下一塊的第一行資料也新增進去;
Shuffle write階段
Map task將處理後的每一條記錄打上標籤,打標籤的目的就是為讓這一條知道將來被哪一個redcuce task處理,然後進入buffer後,每一條記錄是由三部分組成:1、分割槽號 2、key 3、value,Map task往buffer中寫入過程中,一旦寫入到80M,此時會將這80M的記憶體封鎖,封鎖後,會對記憶體中的資料進行combiner(小聚合),然後進行排序,將相同分割槽的資料放到一起,並且分割槽的資料是有序的,以上的combiner以及排序完成後,就開始溢寫資料到磁碟上,此時的磁碟檔案就是一個根據分割槽號,分好區的,並且內部有序的檔案combiner、sort、spill每進行一次溢寫,就會產生一個磁碟小檔案
Map task計算完畢後,會將磁碟上的小檔案合併成一個大檔案,在合併的時候會使用歸併排序的演算法,將各個小檔案合併成一個有序的大檔案
Shuffle read階段
從map端讀取相應的分割槽資料,將分割槽資料寫入到記憶體中,記憶體滿了就會溢寫,溢寫之前會排序,當把所有的資料取過來之後,會將溢寫產生的磁碟小檔案合併 排序成有序的大檔案,然後 每一個大檔案啟動一個Reduce task進行計算,然後將結果輸出到結果檔案中,最後返回給客戶端。
MapReduce的核心程式執行機制
(1) 一個 MR 程式啟動的時候,最先啟動的是 MRAppMaster, MRAppMaster 啟動後根據本次 job 的描述資訊,計算出需要的 maptask 例項數量,然後向叢集申請機器啟動相應數量的 maptask 程序
(2) maptask 程序啟動之後,根據給定的資料切片(哪個檔案的哪個偏移量範圍)範圍進行數 據處理,主體流程為:
A、 利用客戶指定的 inputformat 來獲取 RecordReader 讀取資料,形成輸入 KV 對
B、 將輸入 KV 對傳遞給客戶定義的 map()方法,做邏輯運算,並將 map()方法輸出的 KV 對收 集到快取
C、 將快取中的 KV 對按照 K 分割槽排序後不斷溢寫到磁碟檔案 (超過快取記憶體寫到磁碟臨時檔案,最後都寫到該檔案,ruduce 獲取該檔案後,刪除 )
(3) MRAppMaster 監控到所有 maptask 程序任務完成之後(真實情況是,某些 maptask 程序處理完成後,就會開始啟動 reducetask 去已完成的 maptask 處資料),會根據客戶指 定的引數啟動相應數量的 reducetask 程序,並告知 reducetask 程序要處理的資料範圍(資料分割槽)
(4) Reducetask 程序啟動之後,根據 MRAppMaster 告知的待處理資料所在位置,從若干臺 maptask 執行所在機器上獲取到若干個 maptask 輸出結果檔案,並在本地進行重新歸併排序, 然後按照相同 key 的 KV 為一個組,呼叫客戶定義的 reduce()方法進行邏輯運算,並收集運
算輸出的結果 KV,然後呼叫客戶指定的 outputformat 將結果資料輸出到外部儲存
Mapreduce應用
MapReduce-WordCount示例編寫及編碼規範
(1) 程式有一個 main 方法,來啟動任務的執行,其中 job 物件就儲存了該程式執行的必要 資訊,比如指定 Mapper 類和 Reducer 類
public class WC {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration(true);
Job job = Job.getInstance(conf);
//設定當前main函式所在類
job.setJarByClass(WC.class);
job.setJar("d:/HDFS/wc.jar");
//設定輸入路徑
FileInputFormat.setInputPaths(job, "/input/wc.txt");
//設定輸出路徑
Path outputPath = new Path("/output");
FileSystem fs = outputPath.getFileSystem(conf);
if(fs.exists(outputPath)){
fs.delete(outputPath,true);
}
FileOutputFormat.setOutputPath(job, outputPath);
//設定Map class
job.setMapperClass(WCMapper.class);
//設定map輸出key、value的型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//設定reduce class
job.setReducerClass(WCReduce.class);
job.setNumReduceTasks(2);
job.waitForCompletion(true);
}
}
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
(2) 該程式中的 TokenizerMapper 類繼承了 Mapper 類
(3) 該程式中的 IntSumReducer 類繼承了 Reducer 類
MapReduce 程式的業務編碼分為兩個大部分,一部分配置程式的執行資訊,一部分 編寫該 MapReduce 程式的業務邏輯,並且業務邏輯的 map 階段和 reduce 階段的程式碼分別繼 承 Mapper 類和 Reducer 類
(1) 使用者編寫的程式分成三個部分: Mapper, Reducer, Driver(提交執行 MR 程式的客戶端)
(2) Mapper 的輸入資料是 KV 對的形式( KV 的型別可自定義)
(3) Mapper 的輸出資料是 KV 對的形式( KV 的型別可自定義)
(4) Mapper 中的業務邏輯寫在 map()方法中
(5) map()方法( maptask 程序)對每一個<K,V>呼叫一次
package com.hpe.mr.wc;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text myKey = new Text();
IntWritable myValue = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
System.out.println(key+"==========");
// value.toString().split(" ")
String[] words = StringUtils.split(value.toString(), ' ');
for (String word : words) {
myKey.set(word);
context.write(myKey,myValue);
}
}
}
(6) Reducer 的輸入資料型別對應 Mapper 的輸出資料型別,也是 KV
(7) Reducer 的業務邏輯寫在 reduce()方法中
(8) Reducetask 程序對每一組相同 k 的<k,v>組呼叫一次 reduce()方法
package com.hpe.mr.wc;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WCReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}
(9) 使用者自定義的 Mapper 和 Reducer 都要繼承各自的父類
(10) 整個程式需要一個 Drvier 來進行提交,提交的是一個描述了各種必要資訊的 job 物件
WordCount 的業務邏輯:
1、 maptask 階段處理每個資料分塊的單詞統計分析,思路是每遇到一個單詞則把其轉換成 一個 key-value 對,比如單詞 hello,就轉換成<’hello’,1>傳送給 reducetask 去彙總
2、 reducetask 階段將接受 maptask 的結果, 來做彙總計數