好程式設計師大資料學習路線之mapreduce概述
與HDFS解決問題的原理類似,HDFS是將大的檔案切分成若干小檔案,然後將它們分別儲存到叢集中各個主機中。
同樣原理,mapreduce是將一個複雜的運算切分成若個子運算,然後將它們分別交給叢集中各個主機,由各個主機並行運算。
1.1 mapreduce產生的背景
海量資料在單機上處理因為硬體資源限制,無法勝任。
而一旦將單機版程式擴充套件到叢集來分散式執行,將極大增加程式的複雜度和開發難度。
引入mapreduce框架後,開發人員可以將絕大部分工作集中在業務邏輯的開發上,而將分散式計算中的複雜交由框架來處理。
1.2 mapreduce程式設計模型
一種分散式計算模型。
MapReduce將這個平行計算過程抽象到兩個函式。
Map(對映):對一些獨立元素組成的列表的每一個元素進行指定的操作,可以高度並行。
Reduce(化簡 歸約):對一個列表的元素進行合併。
一個簡單的MapReduce程式只需要指定map()、reduce()、input和output,剩下的事由框架完成。
Mapreduce的幾個關鍵名詞
Job :使用者的每一個計算請求稱為一個作業。
Task:每一個作業,都需要拆分開了,交由多個主機來完成,拆分出來的執行單位就是任務。
Task又分為如下三種類型的任務:
Map:負責map階段的整個資料處理流程
Reduce:負責reduce階段的整個資料處理流程
MRAppMaster:負責整個程式的過程排程及狀態協調
1.4 mapreduce程式執行流程
具體流程說明:
一個mr程式啟動的時候,最先啟動的是MRAppMaster,MRAppMaster啟動後根據本次job的描述資訊,計算出需要的maptask例項數量,然後向叢集申請機器啟動相應數量的maptask程序
maptask程序啟動之後,根據給定的資料切片範圍進行資料處理,主體流程為:
– 利用客戶指定的inputformat來獲取RecordReader讀取資料,形成輸入KV對。
– 將輸入KV(k是檔案的行號,v是檔案一行的資料)對傳遞給客戶定義的map()方法,做邏輯運算,並將map()方法輸出的KV對收集到快取。
– 將快取中的KV對按照K分割槽排序後不斷溢寫到磁碟檔案
MRAppMaster監控到所有maptask程序任務完成之後,會根據客戶指定的引數啟動相應數量的reducetask程序,並告知reducetask程序要處理的資料範圍(資料分割槽)
Reducetask程序啟動之後,根據MRAppMaster告知的待處理資料所在位置,從若干臺maptask執行所在機器上獲取到若干個maptask輸出結果檔案,並在本地進行重新歸併排序,然後按照相同key的KV為一個組,呼叫客戶定義的reduce()方法進行邏輯運算,並收集運算輸出的結果KV,然後呼叫客戶指定的outputformat將結果資料輸出到外部儲存
1.5 編寫MapReduce程式
- 基於MapReduce 計算模型編寫分散式並行程式非常簡單,程式設計師的主要編碼工作就是實現Map 和Reduce函式。
- 其它的並行程式設計中的種種複雜問題,如分散式儲存,工作排程,負載平衡,容錯處理,網路通訊等,均由YARN框架負責處理。
- MapReduce中,map和reduce函式遵循如下常規格式:
map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)
- Mapper的介面:
protected void map(KEY key, VALUE value, Context context)
throws IOException, InterruptedException {
}
- Reduce的介面:
protected void reduce(KEY key, Iterable<VALUE> values,
Context context) throws IOException, InterruptedException {
}
- Mapreduce程式程式碼基本結構
maprecue例項開發
2.1 程式設計步驟
使用者編寫的程式分成三個部分:Mapper,Reducer,Driver(提交執行mr程式的客戶端)
Mapper的輸入資料是KV對的形式(KV的型別可自定義)
Mapper的輸出資料是KV對的形式(KV的型別可自定義)
Mapper中的業務邏輯寫在map()方法中
map()方法(maptask程序)對每一個<K,V>呼叫一次
Reducer的輸入資料型別對應Mapper的輸出資料型別,也是KV
Reducer的業務邏輯寫在reduce()方法中
Reducetask程序對每一組相同k的<k,v>組呼叫一次reduce()方法
使用者自定義的Mapper和Reducer都要繼承各自的父類
整個程式需要一個Drvier來進行提交,提交的是一個描述了各種必要資訊的job物件
2.2 經典的wordcount程式編寫
需求:有一批檔案(規模為TB級或者PB級),如何統計這些檔案中所有單詞出現次數
如有三個檔案,檔名是qfcourse.txt、qfstu.txt 和 qf_teacher
qf_course.txt內容:
php java linux
bigdata VR
C C++ java web
linux shell
qf_stu.txt內容:
tom jim lucy
lily sally
andy
tom jim sally
qf_teacher內容:
jerry Lucy tom
jim
方案
– 分別統計每個檔案中單詞出現次數 - map()
– 累加不同檔案中同一個單詞出現次數 - reduce()
實現程式碼
– 建立一個簡單的maven專案
– 新增hadoop client依賴的jar,pom.xml主要內容如下:
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
– 編寫程式碼
– 自定義一個mapper類
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* Maper裡面的泛型的四個型別從左到右依次是:
*
* LongWritable KEYIN: 預設情況下,是mr框架所讀到的一行文字的起始偏移量,Long, 類似於行號但是在hadoop中有自己的更精簡的序列化介面,所以不直接用Long,而用LongWritable
* Text VALUEIN:預設情況下,是mr框架所讀到的一行文字的內容,String,同上,用Text
*
* Text KEYOUT:是使用者自定義邏輯處理完成之後輸出資料中的key,在此處是單詞,String,同上,用Text
* IntWritable VALUEOUT:是使用者自定義邏輯處理完成之後輸出資料中的value,在此處是單詞次數,Integer,同上,用IntWritable
*/
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
/**
* map階段的業務邏輯就寫在自定義的map()方法中
* maptask會對每一行輸入資料呼叫一次我們自定義的map()方法
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//將maptask傳給我們的一行的文字內容先轉換成String
String line = value.toString();
//根據空格將這一行切分成單詞
String[] words = line.split(" ");
/**
*將單詞輸出為<單詞,1>
*如<lily,1> <lucy,1> <c,1> <c++,1> <tom,1>
*/
for(String word:words){
//將單詞作為key,將次數1作為value,以便於後續的資料分發,可以根據單詞分發,以便於相同單詞會到相同的reduce task
context.write(new Text(word), new IntWritable(1));
}
}
}
– 自定義一個reduce類
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* Reducer裡面的泛型的四個型別從左到右依次是:
* Text KEYIN: 對應mapper輸出的KEYOUT
* IntWritable VALUEIN: 對應mapper輸出的VALUEOUT
*
* KEYOUT, 是單詞
* VALUEOUT 是自定義reduce邏輯處理結果的輸出資料型別,是總次數
*/
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
/**
* <tom,1>
* <tom,1>
* <linux,1>
* <banana,1>
* <banana,1>
* <banana,1>
* 入參key,是一組相同單詞kv對的key
* values是若干相同key的value集合
* 如 <tom,[1,1]> <linux,[1]> <banana,[1,1,1]>
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count=0; //累加單詞的出現的次數
for(IntWritable value:values){
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
– 編寫一個Driver類
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 相當於一個yarn叢集的客戶端
* 需要在此封裝我們的mr程式的相關執行引數,指定jar包
* 最後提交給yarn
*/
public class WordcountDriver {
/**
* 該類是執行在hadoop客戶端的,main一執行,yarn客戶端就啟動起來了,與yarn伺服器端通訊
* yarn伺服器端負責啟動mapreduce程式並使用WordcountMapper和WordcountReducer類
*/
public static void main(String[] args) throws Exception {
//此程式碼需要兩個輸入引數 第一個引數支援要處理的原始檔;第二個引數是處理結果的輸出路徑
if (args == null || args.length == 0) {
args = new String[2];
//路徑都是 hdfs系統的檔案路徑
args[0] = "hdfs://192.168.18.64:9000/wordcount/input/";
args[1] = "hdfs://192.168.18.64:9000/wordcount/output";
}
/**
* 什麼也不設定時,如果在安裝了hadoop的機器上執行時,自動讀取
* /home/hadoop/app/hadoop-2.7.1/etc/hadoop/core-site.xml
* 檔案放入Configuration中
*/
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//指定本程式的jar包所在的本地路徑
job.setJarByClass(WordcountDriver.class);
//指定本業務job要使用的mapper業務類
job.setMapperClass(WordcountMapper.class);
//指定mapper輸出資料的kv型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定本業務job要使用的Reducer業務類
job.setReducerClass(WordcountReducer.class);
//指定最終輸出的資料的kv型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定job的輸入原始檔案所在目錄
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定job的輸出結果所在目錄
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//將job中配置的相關引數,以及job所用的java類所在的jar包,提交給yarn去執行
/*job.submit();*/
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
wordcount處理過程
將檔案拆分成splits,由於測試用的檔案較小,所以每個檔案為一個split,並將檔案按行分割形成<key,value>對,下圖所示。這一步由MapReduce框架自動完成,其中偏移量(即key值)包括了回車所佔的字元數(Windows/Linux環境不同)。
將分割好的<key,value>對交給使用者定義的map方法進行處理,生成新的<key,value>對,下圖所示。
得到map方法輸出的<key,value>對後,Mapper會將它們按照key值進行排序,並執行Combine過程,將key至相同value值累加,得到Mapper的最終輸出結果。下圖所示。
Reducer先對從Mapper接收的資料進行排序,再交由使用者自定義的reduce方法進行處理,得到新的<key,value>對,並作為WordCount的輸出結果,下圖所示。