Hadoop之MapReduce
定義:分散式運算程式的程式設計框架
核心功能:將使用者編寫的業務邏輯程式碼和自帶元件整合成一個分散式運算程式,併發執行在hadoop叢集上
優點:高容錯,任務計算失敗會重啟4次
適合離線資料
缺點:不善於實時和流式計算即有(DAG)向圖計算:
思想就是多個MR串聯,不斷的進行輸入輸出到下一個MR,耗費資源,不太擅長此類計算
MapReduce核心程式設計思想(中間有一個排序的過程)
計算角度切片,儲存角度塊
1)Map階段,以workcount程式為例
1.讀資料,按行處理
2.按空格切分行內單詞
3.kv鍵值對(單詞,1)
4.將所有kv值對中單詞,按單詞首字母,分成2個分割槽溢寫到磁碟
2)Reduce階段
1.分別統計2個分割槽的彙總
2.分別輸出到檔案
MapReduce程序
1.MrAppMaster(子)本質就是yarn的applicationMaster(父),1個
2.MapTask:個數由多少個任務決定
3.ReduceTask:個數由計算分割槽決定
官方wordcount原始碼
找到jar包中的example的jar包,用資料裡的反編譯軟體進行反編譯
常用資料型別 和 mapreduce的資料型別對應
String>-Text
其他加Writable即可
MapReduce程式設計規範
Mapper, Reducer,Driver
1)Mapper階段:繼承自己的父類,輸入資料是KV對形式,業務邏輯寫在mpa()方法中,輸出也是KV對,map()方法(MapTask程序)對每個kv呼叫一次
2Reducer階段:繼承自己的父類,輸入/出是KV對,邏輯寫reduce()方法中,reduce()方法(reduceTask程序)對每個kv呼叫一次
3)Driver:有本地和叢集模式,相當與yarn叢集的客戶端,提交程式到yarn叢集
-7mapreduce案例編寫mapper
1)設定自定義wordcountmapper extend mapper<Longwritable,Txet(輸入的kv),text,Intwritable(輸出的kv)>
2)重寫map方法
讀取一行 輸入的Text的kv對的value:
String line =value.toString;
把讀到的按空格分隔單詞:
String [] words=line.split(" ");
設定每個單詞的配置到輸出kv對裡面:
遍歷陣列words for(String word: words){
k.set(word);
context(k,v(設定為1));
}
value個數預設為1個
-8mapreduce案例編寫reducer
1)設定自定義的wordcountreducer extend reducer
<Text,IntWritable(mapper的輸出),Text,Intwritable(reducer的最終輸出)>(<k,v,k,v >)
遍歷reduce中的輸入迭代器
sum=0;
for(Iteable: values){
sum+count.get();
}
v.set(sum);
-9mapreduce案例編寫Driver
1)自定義主方法main(){
1.建立job物件:job.getinstanc();
2.自定義配置物件Configuration conf = new Configuration();
3.設定jar類的載入 :job.setjarbyclass();
4.設定mapper和ruducer的類: job.setMapperclass(wordcountmapper.class) .etc略
5.設定map輸出類:job.setMapOutputKeyclass(Text.class);
6設定最終輸出類:job.setOutputKeyClass(Text.class);
7設定輸入路徑和輸出路徑FileInputFormat.setInputPaths(job,new Path("args[0]"))
8提交job:job.waitForCompletion(true)
}
-10mapreduce案例編寫的執行方式
1)IDEA中java本地測試
FileInputFormat.setInputPaths(job,
new Path("D:\\Develop\\AiStocker\\hello1.txt"));
FileOutputFormat.setOutputPath(job,
new Path("D:\\Develop\\AiStocker\\output"));
2)叢集上測試
moven上打成jar包,直接拖到shell軟體中的Hadoop_Home目錄中即可
執行命令:hadoop jar 包的的路徑 驅動類所在的jar包裡面所在packet包的全路徑 設定的輸入路徑 設定的輸出路徑
3)windows上向叢集提交任務
在上面程式碼中新增以下資訊後打包,並將jar包設定到Driver中
//設定HDFSNameNode的地址
configuration.set("fs.defaultFS", "hdfs://hadoop102:8020");
// 指定MapReduce執行在Yarn上
configuration.set("mapreduce.framework.name","yarn");
// 指定mapreduce可以在遠端叢集執行
configuration.set("mapreduce.app-submission.cross-platform","true");
//指定Yarnresourcemanager的位置
configuration.set("yarn.resourcemanager.hostname","hadoop103");
打包後,設定到Driver中,即修改job.setJarByClass為jobsetJar("jar包的在本地磁碟的物理路徑")
編輯後在視窗右上角修改下拉框中的Edit Configuration,設定VM options 為 -DHADOOP_USER_NAME=指定叢集使用者名稱
Program arguments 改為 hdfs://hadoop102:8020/hdfs檔案目錄的檔案輸入路徑 hdfs://Hadoop102:8020/hdfs檔案目錄的檔案輸出路徑
-13序列化
為何要序列化:序列化可以儲存活的物件,併發送活的物件到遠端計算機,不怕斷電,在記憶體中丟失
1)java中的序列化要實現serializable介面.
java中的serialVersionUID為了保證序列化和反序列化的資料的安全
通過ObejectOutputStreame中的writeObject即readObject實現序列化反序列化
2)hadoop的序列化
1.需要支援序列化的類要實現Wriable介面
提供無參構造器(反序列化時通過反射的方式呼叫無參構造器構造物件)
重寫write,實現序列化
重寫readFields,實現反序列化
序列化和反序列化順序要一致
一般會重寫toString方法,將結果物件寫入最終檔案裡面,會呼叫物件的toString進行列印
Driver的JOB物件要例項化後設置值
MapReduce框架原理
MapRecuce的流程:map階段和reduce階段
詳細:資料輸入(InputFormat)-->Mapper-->shuffle-->Reducer-->資料輸出(OutputFormat)
原始碼:map階段(map+sort)+reduce階段(copy+sort+reduce)
copy是因為map輸出後會落盤,因為map和reduce不能保證在同一臺機器執行,所以落盤後複製過去執行
sort+copy+sort 大概就是shuffle的階段內容
1)inputFormat(資料輸入)
資料輸入切片,處理輸入資料可以識別的
2)shuffle洗牌機制
mapper輸出context後到reducer之間進行處理洗牌
3)outputFormat(資料輸出)
reducer輸出經過outputFormat進行寫出
InputFormat的原始碼
切片
1)概念:計算時的概念
資料塊:是資料在HDFS中儲存的基本單元.是從物理上將原始資料進行分塊.
資料切片:資料在MR中計算的基本單元,是從邏輯上將原始資料進行切片操作,
實際上每個切片就是來記錄應該從哪個位置讀取到哪個位置
讀取時是整體資料讀取再切片,而不是按塊裡面取切片
2)切片與MapTask
切片個數決定MapTask的個數.
如果每個MapTask處理的資料量是比較合適的,MapTask越多越好
單個檔案單獨切片
3)切片的大小
預設情況下,切片的大小等於塊的大小,避免跨機器讀取資料切片
4)FileInput
Shuffle的原始碼機制?
-8分割槽
獲取分割槽器物件mapTask711行
大於1,讀取配置paririoneer.calss獲取,獲取不到就使用Hashpatrutionner分割槽器
11-MapReduce排序?
1)java排序
Comparable-->compareTo()
Comparator-->compare()
2)hadoop的排序
Writablecomparable-->
writablecompare-->
13-全排序例項操作
14-分割槽排序例項
1回顧
2-Combiner
1)繼承了reducer,是MR程式中Mapper和Reducer之外的一種元件
2)combiner在每一個MapTask節點執行,Reducer接收全域性所有的Mapper輸出結果
3)意義是對Mapper輸出區域性彙總,減少網路傳輸,應用前提是不能影響最終業務邏輯
具體實現就是自定義Combiner類繼承reducer,實現reduce方法,
在job設定job.setCombinerClass(wordCounterReducer.class)
分組
1)reducer端將資料copy過來,進行歸併排序,進入reduce方法時,進行分組
2)分組操作就是reduce方法處理資料過程進行的,每讀取一個kv,預讀下一個kv,並判斷下個kv跟當前kv是否相同(是否是一組)
3)Hadoop如何進行分組比較
ReducerTask'
Hadoop會通過group.coomparator獲取分組比較器
能獲取,則用,獲取不到,則嘗試獲取排序比較器,往後處理,參考獲取排序比較器的過程
-6分組比較案例分組比較器
如何分組後取到id中的第一個最大的值?
-7OutputFormat輸出 介紹
1)
2)自定義OutputFormat