1. 程式人生 > 實用技巧 >Hadoop之MapReduce

Hadoop之MapReduce

定義:分散式運算程式的程式設計框架

核心功能:將使用者編寫的業務邏輯程式碼和自帶元件整合成一個分散式運算程式,併發執行在hadoop叢集上

優點:高容錯,任務計算失敗會重啟4次

適合離線資料

缺點:不善於實時和流式計算即有(DAG)向圖計算:

思想就是多個MR串聯,不斷的進行輸入輸出到下一個MR,耗費資源,不太擅長此類計算

MapReduce核心程式設計思想(中間有一個排序的過程)

計算角度切片,儲存角度塊

1)Map階段,以workcount程式為例

1.讀資料,按行處理

2.按空格切分行內單詞

3.kv鍵值對(單詞,1)

4.將所有kv值對中單詞,按單詞首字母,分成2個分割槽溢寫到磁碟

2)Reduce階段

1.分別統計2個分割槽的彙總

2.分別輸出到檔案

MapReduce程序

1.MrAppMaster(子)本質就是yarn的applicationMaster(父),1個

2.MapTask:個數由多少個任務決定

3.ReduceTask:個數由計算分割槽決定

官方wordcount原始碼

找到jar包中的example的jar包,用資料裡的反編譯軟體進行反編譯

常用資料型別 和 mapreduce的資料型別對應

String>-Text

其他加Writable即可

MapReduce程式設計規範

Mapper, Reducer,Driver

1)Mapper階段:繼承自己的父類,輸入資料是KV對形式,業務邏輯寫在mpa()方法中,輸出也是KV對,map()方法(MapTask程序)對每個kv呼叫一次

2Reducer階段:繼承自己的父類,輸入/出是KV對,邏輯寫reduce()方法中,reduce()方法(reduceTask程序)對每個kv呼叫一次

3)Driver:有本地和叢集模式,相當與yarn叢集的客戶端,提交程式到yarn叢集

-7mapreduce案例編寫mapper

1)設定自定義wordcountmapper extend mapper<Longwritable,Txet(輸入的kv),text,Intwritable(輸出的kv)>

2)重寫map方法

讀取一行 輸入的Text的kv對的value:

String line =value.toString;

把讀到的按空格分隔單詞:

String [] words=line.split(" ");

設定每個單詞的配置到輸出kv對裡面:

遍歷陣列words for(String word: words){

k.set(word);

context(k,v(設定為1));

}

value個數預設為1個

-8mapreduce案例編寫reducer

1)設定自定義的wordcountreducer extend reducer

<Text,IntWritable(mapper的輸出),Text,Intwritable(reducer的最終輸出)>(<k,v,k,v >)

遍歷reduce中的輸入迭代器

sum=0;

for(Iteable: values){

sum+count.get();

}

v.set(sum);

-9mapreduce案例編寫Driver

1)自定義主方法main(){

1.建立job物件:job.getinstanc();

2.自定義配置物件Configuration conf = new Configuration();

3.設定jar類的載入 :job.setjarbyclass();

4.設定mapper和ruducer的類: job.setMapperclass(wordcountmapper.class) .etc略

5.設定map輸出類:job.setMapOutputKeyclass(Text.class);

6設定最終輸出類:job.setOutputKeyClass(Text.class);

7設定輸入路徑和輸出路徑FileInputFormat.setInputPaths(job,new Path("args[0]"))

8提交job:job.waitForCompletion(true)

}

-10mapreduce案例編寫的執行方式

1)IDEA中java本地測試

FileInputFormat.setInputPaths(job,
new Path("D:\\Develop\\AiStocker\\hello1.txt"));
FileOutputFormat.setOutputPath(job,
new Path("D:\\Develop\\AiStocker\\output"));

2)叢集上測試

moven上打成jar包,直接拖到shell軟體中的Hadoop_Home目錄中即可

執行命令:hadoop jar 包的的路徑 驅動類所在的jar包裡面所在packet包的全路徑 設定的輸入路徑 設定的輸出路徑

3)windows上向叢集提交任務

在上面程式碼中新增以下資訊後打包,並將jar包設定到Driver中

//設定HDFSNameNode的地址

configuration.set("fs.defaultFS", "hdfs://hadoop102:8020");

// 指定MapReduce執行在Yarn

configuration.set("mapreduce.framework.name","yarn");

// 指定mapreduce可以在遠端叢集執行

configuration.set("mapreduce.app-submission.cross-platform","true");

//指定Yarnresourcemanager的位置

configuration.set("yarn.resourcemanager.hostname","hadoop103");

打包後,設定到Driver中,即修改job.setJarByClass為jobsetJar("jar包的在本地磁碟的物理路徑")

編輯後在視窗右上角修改下拉框中的Edit Configuration,設定VM options 為 -DHADOOP_USER_NAME=指定叢集使用者名稱

Program arguments 改為 hdfs://hadoop102:8020/hdfs檔案目錄的檔案輸入路徑 hdfs://Hadoop102:8020/hdfs檔案目錄的檔案輸出路徑

-13序列化

為何要序列化:序列化可以儲存活的物件,併發送活的物件到遠端計算機,不怕斷電,在記憶體中丟失

1)java中的序列化要實現serializable介面.

java中的serialVersionUID為了保證序列化和反序列化的資料的安全

通過ObejectOutputStreame中的writeObject即readObject實現序列化反序列化

2)hadoop的序列化

1.需要支援序列化的類要實現Wriable介面

提供無參構造器(反序列化時通過反射的方式呼叫無參構造器構造物件)

重寫write,實現序列化

重寫readFields,實現反序列化

序列化和反序列化順序要一致

一般會重寫toString方法,將結果物件寫入最終檔案裡面,會呼叫物件的toString進行列印

Driver的JOB物件要例項化後設置值

MapReduce框架原理

MapRecuce的流程:map階段和reduce階段

詳細:資料輸入(InputFormat)-->Mapper-->shuffle-->Reducer-->資料輸出(OutputFormat)

原始碼:map階段(map+sort)+reduce階段(copy+sort+reduce)

copy是因為map輸出後會落盤,因為map和reduce不能保證在同一臺機器執行,所以落盤後複製過去執行

sort+copy+sort 大概就是shuffle的階段內容

1)inputFormat(資料輸入)

資料輸入切片,處理輸入資料可以識別的

2)shuffle洗牌機制

mapper輸出context後到reducer之間進行處理洗牌

3)outputFormat(資料輸出)

reducer輸出經過outputFormat進行寫出

InputFormat的原始碼

切片

1)概念:計算時的概念

資料塊:是資料在HDFS中儲存的基本單元.是從物理上將原始資料進行分塊.

資料切片:資料在MR中計算的基本單元,是從邏輯上將原始資料進行切片操作,

實際上每個切片就是來記錄應該從哪個位置讀取到哪個位置

讀取時是整體資料讀取再切片,而不是按塊裡面取切片

2)切片與MapTask

切片個數決定MapTask的個數.

如果每個MapTask處理的資料量是比較合適的,MapTask越多越好

單個檔案單獨切片

3)切片的大小

預設情況下,切片的大小等於塊的大小,避免跨機器讀取資料切片

4)FileInput

Shuffle的原始碼機制?

-8分割槽

獲取分割槽器物件mapTask711行

大於1,讀取配置paririoneer.calss獲取,獲取不到就使用Hashpatrutionner分割槽器

11-MapReduce排序?

1)java排序

Comparable-->compareTo()

Comparator-->compare()

2)hadoop的排序

Writablecomparable-->

writablecompare-->

13-全排序例項操作

14-分割槽排序例項

1回顧

2-Combiner

1)繼承了reducer,是MR程式中Mapper和Reducer之外的一種元件

2)combiner在每一個MapTask節點執行,Reducer接收全域性所有的Mapper輸出結果

3)意義是對Mapper輸出區域性彙總,減少網路傳輸,應用前提是不能影響最終業務邏輯

具體實現就是自定義Combiner類繼承reducer,實現reduce方法,

在job設定job.setCombinerClass(wordCounterReducer.class)

分組

1)reducer端將資料copy過來,進行歸併排序,進入reduce方法時,進行分組

2)分組操作就是reduce方法處理資料過程進行的,每讀取一個kv,預讀下一個kv,並判斷下個kv跟當前kv是否相同(是否是一組)

3)Hadoop如何進行分組比較

ReducerTask'

Hadoop會通過group.coomparator獲取分組比較器

能獲取,則用,獲取不到,則嘗試獲取排序比較器,往後處理,參考獲取排序比較器的過程

-6分組比較案例分組比較器

如何分組後取到id中的第一個最大的值?

-7OutputFormat輸出 介紹

1)

2)自定義OutputFormat