1. 程式人生 > 實用技巧 >hadoop中mapReduce整理

hadoop中mapReduce整理

1.1 MapReduce定義

1.2 MapReduce優缺點

1.2.1 優點

1.2.2 缺點

1.3MapReduce核心思想

1)分散式的運算程式往往需要分成至少2個階段。

2)第一個階段的MapTask併發例項,完全並行執行,互不相干。

3)第二個階段的ReduceTask併發例項互不相干,但是他們的資料依賴於上一個階段的所有MapTask併發例項的輸出。

4MapReduce程式設計模型只能包含一個Map階段和一個Reduce階段,如果使用者的業務邏輯非常複雜,那就只能多個MapReduce程式,序列執行。

總結分析WordCount資料流走向深入

理解MapReduce核心思想。

1.4MapReduce程序

1.7MapReduce程式設計規範

使用者編寫的程式分成三個部分:MapperReducerDriver

如果在IDEA上執行程式碼操作。

 1 <dependencies>
 2         <dependency>
 3             <groupId>junit</groupId>
 4             <artifactId>junit</artifactId>
 5             <version>RELEASE</version>
 6
</dependency> 7 <dependency> 8 <groupId>org.apache.logging.log4j</groupId> 9 <artifactId>log4j-core</artifactId> 10 <version>2.8.2</version> 11 </dependency> 12 <dependency> 13
<groupId>org.apache.hadoop</groupId> 14 <artifactId>hadoop-common</artifactId> 15 <version>2.7.2</version> 16 </dependency> 17 <dependency> 18 <groupId>org.apache.hadoop</groupId> 19 <artifactId>hadoop-client</artifactId> 20 <version>2.7.2</version> 21 </dependency> 22 <dependency> 23 <groupId>org.apache.hadoop</groupId> 24 <artifactId>hadoop-hdfs</artifactId> 25 <version>2.7.2</version> 26 </dependency> 27 </dependencies>
在pom.xml檔案中新增如下依賴

在專案的src/main/resources目錄下,新建一個檔案,命名為“log4j.properties

1 log4j.rootLogger=INFO, stdout
2 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
3 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
4 log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
5 log4j.appender.logfile=org.apache.log4j.FileAppender
6 log4j.appender.logfile.File=target/spring.log
7 log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
8 log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.properties

2.1 序列化概述

MapReduce框架原理

3.1InputFormat資料輸入

切片MapTask並行度決定機制

2FileInputFormat切片原始碼解析(input.getSplits(job))

FileInputFormat切片機制

CombineTextInputFormat切片機制

框架預設的TextInputFormat切片機制是任務按檔案規劃切片不管檔案多小都會是一個單獨的切都會交給一個MapTask這樣如果有大量小檔案產生大量的MapTask處理效率極其低下。

MapReduce工作流程

上面的流程是整個MapReduce最全工作流程,但是Shuffle過程只是從7步開始16結束,具體Shuffle過程詳解如下:

1MapTask收集我們的map()方法輸出的kv對,放到記憶體緩衝區中

2)從記憶體緩衝區不斷溢位本地磁碟檔案,可能會溢位多個檔案

3)多個溢位檔案會被合併成大的溢位檔案

4)在溢位過程及合併的過程中,都要呼叫Partitioner進行分割槽和針對key進行排序

5ReduceTask根據自己的分割槽號,去各個MapTask機器上取相應的結果分割槽資料

6ReduceTask會取到同一個分割槽的來自不同MapTask的結果檔案,ReduceTask會將這些檔案再進行合併(歸併排序)

7)合併成大檔案後,Shuffle的過程也就結束了,後面進入ReduceTask的邏輯運算過程(從檔案中取出一個一個的鍵值對Group,呼叫使用者自定義的reduce()方法)

3注意

Shuffle中的緩衝區大小會影響到MapReduce程式的執行效率,原則上說,緩衝區越大,磁碟io的次數越少,執行速度就越快。

緩衝區的大小可以通過引數調整,引數:io.sort.mb預設100M

Map方法之後Reduce方法之前的資料處理過程稱之為Shuffle

Partition分割槽

WritableComparable排序

GroupingComparator分組(輔助排序

Reduce階段的資料根據某一個幾個欄位進行分組

分組排序步驟:

1)自定義類繼承WritableComparator

2重寫compare()方法

@Override

public int compare(WritableComparable a, WritableComparable b) {

// 比較的業務邏輯

return result;

}

3)建立一個構造比較物件的類傳父類

protected OrderGroupingComparator() {

super(OrderBean.class, true);

}

MapTask工作機制

1Read階段:MapTask通過使用者編寫的RecordReader從輸入InputSplit解析出一個個key/value

2Map階段:該節點主要是將解析出的key/value交給使用者編寫map()函式處理,併產生一系列新的key/value

3Collect收集階段:在使用者編寫map()函式中,當資料處理完成後,一般會呼叫OutputCollector.collect()輸出結果。在函式內部,它會生成的key/value分割槽呼叫Partitioner並寫入一個環形記憶體緩衝區中。

4Spill階段:即當環形緩衝區滿後,MapReduce將資料寫到本地磁碟上,生成一個臨時檔案。需要注意的是,將資料寫入本地磁碟之前,先要對資料進行一次本地排序,並在必要對資料進行合併壓縮等操作

寫階段詳情:

步驟1利用快速排序演算法對快取區內的資料進行排序,排序方式是,先按照分割槽編號Partition進行排序,然後按照key進行排序。這樣經過排序後,資料以分割槽為單位聚集在一起,且同一分割槽內所有資料按照key有序。

步驟2按照分割槽編號由小到大依次將每個分割槽中的資料寫入任務工作目錄下的臨時檔案output/spillN.outN表示當前溢寫次數)中。如果使用者設定了Combiner,則寫入檔案之前,對每個分割槽中資料進行一次聚集操作。

步驟3將分割槽資料的元資訊寫到記憶體索引資料結構SpillRecord,其中每個分割槽的元資訊包括在臨時檔案中的偏移量、壓縮前資料大小和壓縮後資料大小。如果當前記憶體索引大小超過1MB,則將記憶體索引寫到檔案output/spillN.out.index

5Combine階段:當所有資料處理完成後,MapTask所有臨時檔案進行一次合併,以確保最終只會生成一個數據檔案。

所有資料處理完後,MapTask將所有臨時檔案合併成一個大檔案儲存到檔案output/file.out,同時生成相應的索引檔案output/file.out.index

進行檔案合併過程中,MapTask分割槽為單位進行合併。對於某個分割槽,將採用多輪遞迴合併的方式每輪合併io.sort.factor(預設10)個檔案,並將產生的檔案重新加入待合併列表中,對檔案排序後,重複以上過程,直到最終得到一個大檔案。

每個MapTask最終只生成一個數據檔案,可避免同時開啟大量檔案和同時讀取大量小檔案產生的隨機讀取帶來的開銷。

ReduceTask工作機制

1Copy階段:ReduceTask各個MapTask上遠端拷貝一片資料,並針對某一片資料,如果其大小超過一定閾值,則寫到磁碟上,否則直接放到記憶體中。

2Merge階段:在遠端拷貝資料的同時,ReduceTask啟動了兩個後臺執行緒對記憶體和磁碟上的檔案進行合併,以防止記憶體使用過多或磁碟上檔案過多。

3Sort階段:按照MapReduce語義,使用者編寫reduce()函式輸入資料是按key進行聚集的一組資料。為了key相同的資料聚在一起,Hadoop採用了基於排序的策略。由於各個MapTask已經實現對自己的處理結果進行了區域性排序,因此,ReduceTask只需對所有資料進行一次歸併排序即可。

4Reduce階段:reduce()函式將計算結果寫到HDFS上。

Join多種應用