hadoop核心組件(一)
阿新 • • 發佈:2018-01-20
poi 不可 組件 為我 med 批處理 數據庫 true 讀取
hadoop的核心組件:hdfs(分布式文件系統)、mapreduce(分布式計算框架)、Hive(基於hadoop的數據倉庫)、HBase(分布式列存數據庫)、Zookeeper(分布式協作服務)、Sqoop(數據同步工具)和Flume(日誌手機工具)
hdfs(分布式文件系統):
由client、NameNode、DataNode組成
避免推測執行
自定義partition
適當添加combiner
自定義reduce端的grouping Comparator
- mapred.reduce.tasks:手動設置reduce個數
- mapreduce.map.output.compress:map輸出結果是否壓縮
- mapreduce.map.output.compress.codec
- mapreduce.output.fileoutputformat.compress:job輸出結果是否壓縮
- mapreduce.output.fileoutputformat.compress.type
- mapreduce.output.fileoutputformat.compress.codec
9、調優文件以及參數
一、調優的目的
充分的利用機器的性能,更快的完成mr程序的計算任務。甚至是在有限的機器條件下,能夠支持運行足夠多的mr程序。
二、調優的總體概述
從mr程序的內部運行機制,我們可以了解到一個mr程序由mapper和reducer兩個階段組成,其中mapper階段包括數據的讀取、map處理以及寫出操作(排序和合並/sort&merge),而reducer階段包含mapper輸出數據的獲取、數據合並(sort&merge)、reduce處理以及寫出操作。那麽在這七個子階段中,能夠進行較大力度的進行調優的就是map輸出、reducer數據合並以及reducer個數這三個方面的調優操作。也就是說雖然性能調優包括cpu、內存、磁盤io以及網絡這四個大方面,但是從mr程序的執行流程中,我們可以知道主要有調優的是內存、磁盤io以及網絡。在mr程序中調優,主要考慮的就是減少網絡傳輸和減少磁盤IO操作,故本次課程的mr調優主要包括服務器調優、代碼調優、mapper調優、reducer調優以及runner調優這五個方面。
三、服務器調優
服務器調優主要包括服務器參數調優和jvm調優。在本次項目中,由於我們使用hbase作為我們分析數據的原始數據存儲表,所以對於hbase我們也需要進行一些調優操作。除了參數調優之外,和其他一般的java程序一樣,還需要進行一些jvm調優。
hdfs調優
1. dfs.datanode.failed.volumes.tolerated: 允許發生磁盤錯誤的磁盤數量,默認為0,表示不允許datanode發生磁盤異常。當掛載多個磁盤的時候,可以修改該值。
2. dfs.replication: 復制因子,默認3
3. dfs.namenode.handler.count: namenode節點並發線程量,默認10
4. dfs.datanode.handler.count:datanode之間的並發線程量,默認10。
5. dfs.datanode.max.transfer.threads:datanode提供的數據流操作的並發線程量,默認4096。
一般將其設置為linux系統的文件句柄數的85%~90%之間,查看文件句柄數語句ulimit -a,修改vim /etc/security/limits.conf, 不能設置太大文件末尾,添加
* soft nofile 65535
* hard nofile 65535
註意:句柄數不能夠太大,可以設置為1000000以下的所有數值,一般不設置為-1。
異常處理:當設置句柄數較大的時候,重新登錄可能出現unable load session的提示信息,這個時候采用單用戶模式進行修改操作即可。
單用戶模式:
啟動的時候按‘a‘鍵,進入選擇界面,然後按‘e‘鍵進入kernel修改界面,然後選擇第二行‘kernel...‘,按‘e‘鍵進行修改,在最後添加空格+single即可,按回車鍵回到修改界面,最後按‘b‘鍵進行單用戶模式啟動,當啟動成功後,還原文件後保存,最後退出(exit)重啟系統即可。
6. io.file.buffer.size: 讀取/寫出數據的buffer大小,默認4096,一般不用設置,推薦設置為4096的整數倍(物理頁面的整數倍大小)。
mapreduce調優
1. mapreduce.task.io.sort.factor: mr程序進行合並排序的時候,打開的文件數量,默認為10個.
2. mapreduce.task.io.sort.mb: mr程序進行合並排序操作的時候或者mapper寫數據的時候,內存大小,默認100M
3. mapreduce.map.sort.spill.percent: mr程序進行flush操作的閥值,默認0.80。
4. mapreduce.reduce.shuffle.parallelcopies:mr程序reducer copy數據的線程數,默認5。
5. mapreduce.reduce.shuffle.input.buffer.percent: reduce復制map數據的時候指定的內存堆大小百分比,默認為0.70,適當的增加該值可以減少map數據的磁盤溢出,能夠提高系統性能。
6. mapreduce.reduce.shuffle.merge.percent:reduce進行shuffle的時候,用於啟動合並輸出和磁盤溢寫的過程的閥值,默認為0.66。如果允許,適當增大其比例能夠減少磁盤溢寫次數,提高系統性能。同mapreduce.reduce.shuffle.input.buffer.percent一起使用。
7. mapreduce.task.timeout:mr程序的task執行情況匯報過期時間,默認600000(10分鐘),設置為0表示不進行該值的判斷。
四、代碼調優
代碼調優,主要是mapper和reducer中,針對多次創建的對象,進行代碼提出操作。這個和一般的java程序的代碼調優一樣。
五、mapper調優
mapper調優主要就是就一個目標:減少輸出量。我們可以通過增加combine階段以及對輸出進行壓縮設置進行mapper調優。
combine介紹:
實現自定義combine要求繼承reducer類,特點:
以map的輸出key/value鍵值對作為輸入輸出鍵值對,作用是減少網絡輸出,在map節點上就合並一部分數據。
比較適合,map的輸出是數值型的,方便進行統計。
壓縮設置:
在提交job的時候分別設置啟動壓縮和指定壓縮方式。
六、reducer調優
reducer調優主要是通過參數調優和設置reducer的個數來完成。
reducer個數調優:
要求:一個reducer和多個reducer的執行結果一致,不能因為多個reducer導致執行結果異常。
規則:一般要求在hadoop集群中的執行mr程序,map執行完成100%後,盡量早的看到reducer執行到33%,可以通過命令hadoop job -status job_id或者web頁面來查看。
原因: map的執行process數是通過inputformat返回recordread來定義的;而reducer是有三部分構成的,分別為讀取mapper輸出數據、合並所有輸出數據以及reduce處理,其中第一步要依賴map的執行,所以在數據量比較大的情況下,一個reducer無法滿足性能要求的情況下,我們可以通過調高reducer的個數來解決該問題。
優點:充分利用集群的優勢。
缺點:有些mr程序沒法利用多reducer的優點,比如獲取top n的mr程序。
七、runner調優
runner調優其實就是在提交job的時候設置job參數,一般都可以通過代碼和xml文件兩種方式進行設置。
1~8詳見ActiveUserRunner(before和configure方法),9詳解TransformerBaseRunner(initScans方法)
1. mapred.child.java.opts: 修改childyard進程執行的jvm參數,針對map和reducer均有效,默認:-Xmx200m
2. mapreduce.map.java.opts: 需改map階段的childyard進程執行jvm參數,默認為空,當為空的時候,使用mapred.child.java.opts。
3. mapreduce.reduce.java.opts:修改reducer階段的childyard進程執行jvm參數,默認為空,當為空的時候,使用mapred.child.java.opts。
4. mapreduce.job.reduces: 修改reducer的個數,默認為1。可以通過job.setNumReduceTasks方法來進行更改。
5. mapreduce.map.speculative:是否啟動map階段的推測執行,默認為true。其實一般情況設置為false比較好。可通過方法job.setMapSpeculativeExecution來設置。
6. mapreduce.reduce.speculative:是否需要啟動reduce階段的推測執行,默認為true,其實一般情況設置為fase比較好。可通過方法job.setReduceSpeculativeExecution來設置。
7. mapreduce.map.output.compress:設置是否啟動map輸出的壓縮機制,默認為false。在需要減少網絡傳輸的時候,可以設置為true。
8. mapreduce.map.output.compress.codec:設置map輸出壓縮機制,默認為org.apache.hadoop.io.compress.DefaultCodec,推薦使用SnappyCodec(在之前版本中需要進行安裝操作,現在版本不太清楚,安裝參數:http://www.cnblogs.com/chengxin1982/p/3862309.html)
9. hbase參數設置
由於hbase默認是一條一條數據拿取的,在mapper節點上執行的時候是每處理一條數據後就從hbase中獲取下一條數據,通過設置cache值可以一次獲取多條數據,減少網絡數據傳輸。
源碼:
1、設置map端的數量:mapreduce.input.fileinputformat.split.minsize
位置FileInputFormat.getSplits()方法
(1)輸入文件size巨大,但不是小文件
減小map的數量:增大mapred.min.split.size的值
(2)輸入文件數量巨大,且都是小文件
使用FileInputFormat衍生的CombineFileInputFormat將多個input path合並成一個InputSplit送給mapper處理,從而減少mapper的數量
2、增加Map-Reduce job 啟動時創建的Mapper數量
可以通過減小每個mapper的輸入做到,即減小blockSize或者減小mapred.min.split.size的值,設置blockSize一般不可行
- client負責切分文件,並與NameNode交互,獲取文件位置;與DataNode交互,讀取和寫入數據
- NameNode是Master節點,管理HDFS的名稱空間和數據塊映射信息,配置副本策略,處理客戶端請求
- DataNode是Slave節點,存儲實際數據,匯報存儲信息給NameNode
- DataNode與NameNode保持心跳,提交block列表
- 只存在內存中
- 持久化
- 啟動後, 元數據(metadate)信息加載到內存
- metadata的磁盤文件名為”fsimage”
- Block的位置信息不會保存到fsimage
- (journalNode的作用是存放EditLog的)edits記錄對metadata的操作日誌
- 接收客戶端讀寫
- 收集DataNode匯報的block列表信息
- 文件ownership, permissions(文件所有權、權限)
- 文件大小, 時間
- (block列表,block偏移量)--->會持久化, 位置信息--->不會持久化(啟動時候由DataNode匯報過來)
- block每個副本位置(dataNode上報)
client切分文件與NanmeNode交互,獲取DataNode列表,驗證DataNode後連接DataNode,各節點之間兩兩交互,確定可用後,client以更小單位流式傳輸數據; Block傳輸數據結束後,DataNode向NameNode匯報Block信息,DataNode向Client匯報完成,Client向NameNode匯報完成,獲取去下一個Block存放的DataNode列表, 循環以上步驟,最終client匯報完成,NameNode會在寫流程更新文件狀態。 hdfs讀流程
client與NameNode交互,獲取Block存放的DataNode列表(Block副本的位置信息),線性和DataNode交互,獲取Block,最終合並為一個文件,其中,在Block副本列表中按距離擇優選取DataNode節點獲取Block塊。 mapreduce(分布式計算框架)
MR運行原理: 1、客戶端提交作業之前,檢查輸入輸出路徑,首先創建切片列表 反射出作業中設置的input對象,默認是TextInputFormat類 通過input類得到切片列表(getSpilits()方法) 最小值 minSize 默認為1,如果設置就取設置的值 最大值 maxSize 默認為long的最大值 根據輸入路徑取出文件,獲取每個文件的所有block列表,接著創建splits列表(包含文件名,偏移量,長度和位置信息) 切片大小根據最大最小值取,默認為block的大小 一個split對應一個map 提交作業到集群(submitJob()方法) 2、mapInput: input.initialize 輸入初始化 拿到taskContext(上下文) 創建mapper(默認為Mapper類,一般取用戶設置的) 獲取InputFomat類(輸入格式化的類) 獲取split 根據以上信息創建input(NewTrackingRecordReader) input初始化 獲取split的開始和結束位置和文件,開啟對文件的IO流,將起始偏移量個IO設置一下 如果不是第一個切片(split),每次讀取放棄第一行(跳過第一行數據),只有第一個切片才會讀取第一行數據 mapper.run 3、output: MapOutputBuffer初始化 環形緩沖區的閾值0.8、大小(100M) 默認值 sorter :QuickSort算法 反射獲取比較器 OutputKeyComparator 排序,溢寫,一些一次觸發一次combiner 溢寫達到3次的時候還會觸發一次combiner 通過反射獲取Partitioner類,默認為HashPartitoner write(k,v) collector.collect(key,value,partition) output.close() merger 如果numSplits<minSpillsForCombiner 判斷溢寫的次數是不是小於設置的合並的溢寫次數(默認是3),成立的話combiner 4、reduce: shuffle:copy sort:SecondarySort reduce 1、mapreduce shuffle (1)maptask的輸入是hdfs上的block塊,maptask只讀取split,block與split的對應關系默認是一對一 (2)進過map端的運行後,輸出的格式為key/value,Mapreduce提供接口partition,他的作用是根據maptask輸出的key hash後與reduce數量取模, 來決定當前的輸出對應到哪個reduce處理,也可以自定義partition (3)map運行後的數據序列化到緩沖區,默認這個緩沖區大小為100M,作用是收集這個map的結果,當數據達到溢寫比例(默認是spill.percent=0.8)後,所定這80M的內存, 對這80M內存中的key做排序(sort),maptask的輸出結果還可以往剩下的20M內存中寫,互不影響。之後執行溢寫的線程會往磁盤中寫數據。每次溢寫都會產生一個溢寫小文件, map執行完後,會合並這些溢寫小文件,這個過程叫Merge。 (4)如果客戶端設置了Combiner,那麽會優化MapReduce的中間結果,合並map端的數據(相當於reduce端的預處理),Combiner不能改變最終的計算結果。 (5)reduce在執行之前就是從各個maptask執行完後的溢寫文件中拿到所對應的數據,然後做合並(Merge),最終形成的文件作為reduce的輸入文件,這個過程是歸並排序。 最後就是reduce計算,把結果放到hdfs上面。 hdfs參數調優
io.file.buffer.size:4096 (core-default.xml) | SequenceFiles在讀寫中可以使用緩存大小,可減少I/O次數;在大型Hadoop cluster,建議可設定為65536-131072 |
dfs.blockes:134217728( hdfs-default.xml ) | hdfs中一個文件的Block塊的大小,CDH5中默認為128M;設置太大影響map同時計算的數量,設置較少會浪費map個數資源 |
mapred.reduce.tasks(mapreduce.job.reduces):1 | 默認啟動的reduce數 |
mapreduce.task.io.sort.factor:10 | reduce task中合並文件時,一次合並的文件數據 |
mapred.child.java.opts:-Xmx200m | jvm啟動子線程可以使用的最大內存 |
mapred.reduce.parallel.copies:5 | Reduce copy數據的線程數量,默認值是5 |
mapreduce.tasktracker.http.threads:40 | map和reduce是通過http進行傳輸的,這個設置傳輸的並行線程數 |
mapreduce.map.output.compress:flase | map輸出是否進行壓縮,如果壓縮就會多耗cpu,但是減少傳輸時間,如果不壓縮,就需要較多的傳輸帶寬。配合 mapreduce.map.output.compress.codec使用,默認是 org.apache.hadoop.io.compress.DefaultCodec,可以根據需要設定數據壓縮方式。 |
mapreduce.tasktracker.tasks.reduce.maximum:2 | 一個tasktracker並發執行的reduce數,建議為cpu核數 |
mapreduce.map.sort.spill.percent:0.8 | 溢寫比例 |
min.num.spill.for.combine:3 | spill的文件達到設置的參數進行combiner |
mapred.map.tasks.speculative.execution=true |
mapred.reduce.tasks.speculative.execution=true |
hadoop核心組件(一)