hadoop中mapReduce整理
1.1 MapReduce定義
1.2 MapReduce優缺點
1.2.1 優點
1.2.2 缺點
1.3MapReduce核心思想
1)分散式的運算程式往往需要分成至少2個階段。
2)第一個階段的MapTask併發例項,完全並行執行,互不相干。
3)第二個階段的ReduceTask併發例項互不相干,但是他們的資料依賴於上一個階段的所有MapTask併發例項的輸出。
4)MapReduce程式設計模型只能包含一個Map階段和一個Reduce階段,如果使用者的業務邏輯非常複雜,那就只能多個MapReduce程式,序列執行。
總結:分析WordCount資料流走向深入
1.4MapReduce程序
1.7MapReduce程式設計規範
使用者編寫的程式分成三個部分:Mapper、Reducer和Driver。
如果在IDEA上執行程式碼操作。
1 <dependencies> 2 <dependency> 3 <groupId>junit</groupId> 4 <artifactId>junit</artifactId> 5 <version>RELEASE</version> 6在pom.xml檔案中新增如下依賴</dependency> 7 <dependency> 8 <groupId>org.apache.logging.log4j</groupId> 9 <artifactId>log4j-core</artifactId> 10 <version>2.8.2</version> 11 </dependency> 12 <dependency> 13<groupId>org.apache.hadoop</groupId> 14 <artifactId>hadoop-common</artifactId> 15 <version>2.7.2</version> 16 </dependency> 17 <dependency> 18 <groupId>org.apache.hadoop</groupId> 19 <artifactId>hadoop-client</artifactId> 20 <version>2.7.2</version> 21 </dependency> 22 <dependency> 23 <groupId>org.apache.hadoop</groupId> 24 <artifactId>hadoop-hdfs</artifactId> 25 <version>2.7.2</version> 26 </dependency> 27 </dependencies>
在專案的src/main/resources目錄下,新建一個檔案,命名為“log4j.properties”
1 log4j.rootLogger=INFO, stdout 2 log4j.appender.stdout=org.apache.log4j.ConsoleAppender 3 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 4 log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n 5 log4j.appender.logfile=org.apache.log4j.FileAppender 6 log4j.appender.logfile.File=target/spring.log 7 log4j.appender.logfile.layout=org.apache.log4j.PatternLayout 8 log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%nlog4j.properties
2.1 序列化概述
MapReduce框架原理
3.1InputFormat資料輸入
切片與MapTask並行度決定機制
2.FileInputFormat切片原始碼解析(input.getSplits(job))
FileInputFormat切片機制
CombineTextInputFormat切片機制
框架預設的TextInputFormat切片機制是對任務按檔案規劃切片,不管檔案多小,都會是一個單獨的切片,都會交給一個MapTask,這樣如果有大量小檔案,就會產生大量的MapTask,處理效率極其低下。
MapReduce工作流程
上面的流程是整個MapReduce最全工作流程,但是Shuffle過程只是從第7步開始到第16步結束,具體Shuffle過程詳解,如下:
1)MapTask收集我們的map()方法輸出的kv對,放到記憶體緩衝區中
2)從記憶體緩衝區不斷溢位本地磁碟檔案,可能會溢位多個檔案
3)多個溢位檔案會被合併成大的溢位檔案
4)在溢位過程及合併的過程中,都要呼叫Partitioner進行分割槽和針對key進行排序
5)ReduceTask根據自己的分割槽號,去各個MapTask機器上取相應的結果分割槽資料
6)ReduceTask會取到同一個分割槽的來自不同MapTask的結果檔案,ReduceTask會將這些檔案再進行合併(歸併排序)
7)合併成大檔案後,Shuffle的過程也就結束了,後面進入ReduceTask的邏輯運算過程(從檔案中取出一個一個的鍵值對Group,呼叫使用者自定義的reduce()方法)
3.注意
Shuffle中的緩衝區大小會影響到MapReduce程式的執行效率,原則上說,緩衝區越大,磁碟io的次數越少,執行速度就越快。
緩衝區的大小可以通過引數調整,引數:io.sort.mb預設100M。
Map方法之後,Reduce方法之前的資料處理過程稱之為Shuffle
Partition分割槽
WritableComparable排序
GroupingComparator分組(輔助排序)
對Reduce階段的資料根據某一個或幾個欄位進行分組。
分組排序步驟:
(1)自定義類繼承WritableComparator
(2)重寫compare()方法
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 比較的業務邏輯
return result;
}
(3)建立一個構造將比較物件的類傳給父類
protected OrderGroupingComparator() {
super(OrderBean.class, true);
}
MapTask工作機制
1)Read階段:MapTask通過使用者編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。
(2)Map階段:該節點主要是將解析出的key/value交給使用者編寫map()函式處理,併產生一系列新的key/value。
(3)Collect收集階段:在使用者編寫map()函式中,當資料處理完成後,一般會呼叫OutputCollector.collect()輸出結果。在該函式內部,它會將生成的key/value分割槽(呼叫Partitioner),並寫入一個環形記憶體緩衝區中。
(4)Spill階段:即“溢寫”,當環形緩衝區滿後,MapReduce會將資料寫到本地磁碟上,生成一個臨時檔案。需要注意的是,將資料寫入本地磁碟之前,先要對資料進行一次本地排序,並在必要時對資料進行合併、壓縮等操作。
溢寫階段詳情:
步驟1:利用快速排序演算法對快取區內的資料進行排序,排序方式是,先按照分割槽編號Partition進行排序,然後按照key進行排序。這樣,經過排序後,資料以分割槽為單位聚集在一起,且同一分割槽內所有資料按照key有序。
步驟2:按照分割槽編號由小到大依次將每個分割槽中的資料寫入任務工作目錄下的臨時檔案output/spillN.out(N表示當前溢寫次數)中。如果使用者設定了Combiner,則寫入檔案之前,對每個分割槽中的資料進行一次聚集操作。
步驟3:將分割槽資料的元資訊寫到記憶體索引資料結構SpillRecord中,其中每個分割槽的元資訊包括在臨時檔案中的偏移量、壓縮前資料大小和壓縮後資料大小。如果當前記憶體索引大小超過1MB,則將記憶體索引寫到檔案output/spillN.out.index中。
(5)Combine階段:當所有資料處理完成後,MapTask對所有臨時檔案進行一次合併,以確保最終只會生成一個數據檔案。
當所有資料處理完後,MapTask會將所有臨時檔案合併成一個大檔案,並儲存到檔案output/file.out中,同時生成相應的索引檔案output/file.out.index。
在進行檔案合併過程中,MapTask以分割槽為單位進行合併。對於某個分割槽,它將採用多輪遞迴合併的方式。每輪合併io.sort.factor(預設10)個檔案,並將產生的檔案重新加入待合併列表中,對檔案排序後,重複以上過程,直到最終得到一個大檔案。
讓每個MapTask最終只生成一個數據檔案,可避免同時開啟大量檔案和同時讀取大量小檔案產生的隨機讀取帶來的開銷。
ReduceTask工作機制
(1)Copy階段:ReduceTask從各個MapTask上遠端拷貝一片資料,並針對某一片資料,如果其大小超過一定閾值,則寫到磁碟上,否則直接放到記憶體中。
(2)Merge階段:在遠端拷貝資料的同時,ReduceTask啟動了兩個後臺執行緒對記憶體和磁碟上的檔案進行合併,以防止記憶體使用過多或磁碟上檔案過多。
(3)Sort階段:按照MapReduce語義,使用者編寫reduce()函式輸入資料是按key進行聚集的一組資料。為了將key相同的資料聚在一起,Hadoop採用了基於排序的策略。由於各個MapTask已經實現對自己的處理結果進行了區域性排序,因此,ReduceTask只需對所有資料進行一次歸併排序即可。
(4)Reduce階段:reduce()函式將計算結果寫到HDFS上。
Join多種應用