1. 程式人生 > >hadoop核心組件(一)

hadoop核心組件(一)

poi 不可 組件 為我 med 批處理 數據庫 true 讀取

  hadoop的核心組件:hdfs(分布式文件系統)、mapreduce(分布式計算框架)、Hive(基於hadoop的數據倉庫)、HBase(分布式列存數據庫)、Zookeeper(分布式協作服務)、Sqoop(數據同步工具)和Flume(日誌手機工具) hdfs(分布式文件系統): 由client、NameNode、DataNode組成
  • client負責切分文件,並與NameNode交互,獲取文件位置;與DataNode交互,讀取和寫入數據
  • NameNode是Master節點,管理HDFS的名稱空間和數據塊映射信息,配置副本策略,處理客戶端請求
  • DataNode是Slave節點,存儲實際數據,匯報存儲信息給NameNode
  • DataNode與NameNode保持心跳,提交block列表
在hadoop1.x的時候還有Secondary NameNode,負責輔助NameNode,分擔其工作量;定期合並fsimage和fsedits,推送給NameNode;緊急情況下,可輔助恢復NameNode 存儲模型 (1)文件線性切割成Block offset (2)Block分散存儲在集群節點中,Block是HDFS的基本存儲單元,默認大小是64M (3)單一文件Block大小一致,文件與文件可以不一致 (4)Block可設置副本數(小於節點數),分散在不同節點 (5)文件上傳可以設置Block大小和副本數 (6)已上傳的文件Block副本數可以調整,大小不變 (7)只支持一次寫入多次讀取,同一時刻只有一個寫入者 (8)可以append追加數據 架構模型 (1)NameNode節點保存文件元數據 (2)DataNode節點保存文件Block數據 (3)DataNode與NameNode保持心跳,提交Block列表 (4)HdfsClient與NameNode交互元數據信息 (5)HdfsClient與DataNode交互文件Block數據 hdfs結構 一、NameNode(不會與磁盤發生交換) (1)基於內存存儲
  • 只存在內存中
  • 持久化
    • 啟動後, 元數據(metadate)信息加載到內存
    • metadata的磁盤文件名為”fsimage”
    • Block的位置信息不會保存到fsimage
    • (journalNode的作用是存放EditLog的)edits記錄對metadata的操作日誌
(2)功能
  • 接收客戶端讀寫
  • 收集DataNode匯報的block列表信息
(3) metadata
  • 文件ownership, permissions(文件所有權、權限)
  • 文件大小, 時間
  • (block列表,block偏移量)--->會持久化, 位置信息--->不會持久化(啟動時候由DataNode匯報過來)
  • block每個副本位置(dataNode上報)
二、DataNode (1)本地文件形式存儲block (2)存儲Block的元數據信息文件 (3)啟動DN時會向NN匯報block信息 (4)通過向NameNode發送心跳(3秒一次),如果NameNode 10分鐘沒有收到,則認為已經lost,並copy其上的block到其它DN 三、SecondaryNameNode/Qurom Journal Manager 合並時機 fs.checkpoint.period 3600s fs.checkpoint.size 64MB 四、ZooKeeper Failover Controller(HDFS 2.0 HA) (1)監控NameNode健康狀態 (2)向Zookeeper註冊NameNode (3)NameNode掛掉後,ZKFC為NameNode競爭鎖,獲得ZKFC 鎖的NameNode變為active 5、Block副本放置位置 (1)第一個副本:放置在上傳文件的DN;如果是集群外提交,則隨機挑選一臺磁盤不太滿,CPU不太忙的節點 (2)第二個副本:放置在於第一個副本不同的機架的節點上 (3)第三個副本:與第二個副本相同機架的節點 (4)更多副本:隨機節點 6、安全模式 (1)NameNode啟動, fsimage載入內存, 執行edits (2)成功建立元數據映射後, 創建新的fsimage文件(無需SNN)和空的edits (3)檢查副本數, 數量正常後,過若幹時間, 解除安全模式 7、優缺點 優點: 高容錯性(多副本, 自動恢復) 適合批處理(計算移動, 數據位置暴露給計算框架(block)) 適合大數據處理(GB TB PB級數據) 可構建在廉價機器上 高吞吐 缺點: 高延遲 小文件存取(占用namenode內存, 尋道時間超過讀取時間) 並發寫入、文件隨機修改(一個文件一個寫入者, 只能append) hdfs寫流程 技術分享圖片

client切分文件與NanmeNode交互,獲取DataNode列表,驗證DataNode後連接DataNode,各節點之間兩兩交互,確定可用後,client以更小單位流式傳輸數據; Block傳輸數據結束後,DataNode向NameNode匯報Block信息,DataNode向Client匯報完成,Client向NameNode匯報完成,獲取去下一個Block存放的DataNode列表, 循環以上步驟,最終client匯報完成,NameNode會在寫流程更新文件狀態。 hdfs讀流程 技術分享圖片

client與NameNode交互,獲取Block存放的DataNode列表(Block副本的位置信息),線性和DataNode交互,獲取Block,最終合並為一個文件,其中,在Block副本列表中按距離擇優選取DataNode節點獲取Block塊。 mapreduce(分布式計算框架) 技術分享圖片

技術分享圖片 MR運行原理: 1、客戶端提交作業之前,檢查輸入輸出路徑,首先創建切片列表 反射出作業中設置的input對象,默認是TextInputFormat類 通過input類得到切片列表(getSpilits()方法) 最小值 minSize 默認為1,如果設置就取設置的值 最大值 maxSize 默認為long的最大值 根據輸入路徑取出文件,獲取每個文件的所有block列表,接著創建splits列表(包含文件名,偏移量,長度和位置信息) 切片大小根據最大最小值取,默認為block的大小 一個split對應一個map 提交作業到集群(submitJob()方法) 2、mapInput: input.initialize 輸入初始化 拿到taskContext(上下文) 創建mapper(默認為Mapper類,一般取用戶設置的) 獲取InputFomat類(輸入格式化的類) 獲取split 根據以上信息創建input(NewTrackingRecordReader) input初始化 獲取split的開始和結束位置和文件,開啟對文件的IO流,將起始偏移量個IO設置一下 如果不是第一個切片(split),每次讀取放棄第一行(跳過第一行數據),只有第一個切片才會讀取第一行數據 mapper.run 3、output: MapOutputBuffer初始化 環形緩沖區的閾值0.8、大小(100M) 默認值 sorter :QuickSort算法 反射獲取比較器 OutputKeyComparator 排序,溢寫,一些一次觸發一次combiner 溢寫達到3次的時候還會觸發一次combiner 通過反射獲取Partitioner類,默認為HashPartitoner write(k,v) collector.collect(key,value,partition) output.close() merger 如果numSplits<minSpillsForCombiner 判斷溢寫的次數是不是小於設置的合並的溢寫次數(默認是3),成立的話combiner 4、reduce: shuffle:copy sort:SecondarySort reduce 1、mapreduce shuffle (1)maptask的輸入是hdfs上的block塊,maptask只讀取split,block與split的對應關系默認是一對一 (2)進過map端的運行後,輸出的格式為key/value,Mapreduce提供接口partition,他的作用是根據maptask輸出的key hash後與reduce數量取模, 來決定當前的輸出對應到哪個reduce處理,也可以自定義partition (3)map運行後的數據序列化到緩沖區,默認這個緩沖區大小為100M,作用是收集這個map的結果,當數據達到溢寫比例(默認是spill.percent=0.8)後,所定這80M的內存, 對這80M內存中的key做排序(sort),maptask的輸出結果還可以往剩下的20M內存中寫,互不影響。之後執行溢寫的線程會往磁盤中寫數據。每次溢寫都會產生一個溢寫小文件, map執行完後,會合並這些溢寫小文件,這個過程叫Merge。 (4)如果客戶端設置了Combiner,那麽會優化MapReduce的中間結果,合並map端的數據(相當於reduce端的預處理),Combiner不能改變最終的計算結果。 (5)reduce在執行之前就是從各個maptask執行完後的溢寫文件中拿到所對應的數據,然後做合並(Merge),最終形成的文件作為reduce的輸入文件,這個過程是歸並排序。 最後就是reduce計算,把結果放到hdfs上面。 hdfs參數調優
io.file.buffer.size:4096 (core-default.xml) SequenceFiles在讀寫中可以使用緩存大小,可減少I/O次數;在大型Hadoop cluster,建議可設定為65536-131072
dfs.blockes:134217728( hdfs-default.xml ) hdfs中一個文件的Block塊的大小,CDH5中默認為128M;設置太大影響map同時計算的數量,設置較少會浪費map個數資源
mapred.reduce.tasks(mapreduce.job.reduces):1 默認啟動的reduce數
mapreduce.task.io.sort.factor:10 reduce task中合並文件時,一次合並的文件數據
mapred.child.java.opts:-Xmx200m jvm啟動子線程可以使用的最大內存
mapred.reduce.parallel.copies:5 Reduce copy數據的線程數量,默認值是5
mapreduce.tasktracker.http.threads:40 map和reduce是通過http進行傳輸的,這個設置傳輸的並行線程數
mapreduce.map.output.compress:flase map輸出是否進行壓縮,如果壓縮就會多耗cpu,但是減少傳輸時間,如果不壓縮,就需要較多的傳輸帶寬。配合 mapreduce.map.output.compress.codec使用,默認是 org.apache.hadoop.io.compress.DefaultCodec,可以根據需要設定數據壓縮方式。
mapreduce.tasktracker.tasks.reduce.maximum:2 一個tasktracker並發執行的reduce數,建議為cpu核數
mapreduce.map.sort.spill.percent:0.8 溢寫比例
min.num.spill.for.combine:3 spill的文件達到設置的參數進行combiner
避免推測執行
mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true
自定義partition 適當添加combiner 自定義reduce端的grouping Comparator - mapred.reduce.tasks:手動設置reduce個數 - mapreduce.map.output.compress:map輸出結果是否壓縮 - mapreduce.map.output.compress.codec - mapreduce.output.fileoutputformat.compress:job輸出結果是否壓縮 - mapreduce.output.fileoutputformat.compress.type - mapreduce.output.fileoutputformat.compress.codec 9、調優文件以及參數 一、調優的目的 充分的利用機器的性能,更快的完成mr程序的計算任務。甚至是在有限的機器條件下,能夠支持運行足夠多的mr程序。 二、調優的總體概述 從mr程序的內部運行機制,我們可以了解到一個mr程序由mapper和reducer兩個階段組成,其中mapper階段包括數據的讀取、map處理以及寫出操作(排序和合並/sort&merge),而reducer階段包含mapper輸出數據的獲取、數據合並(sort&merge)、reduce處理以及寫出操作。那麽在這七個子階段中,能夠進行較大力度的進行調優的就是map輸出、reducer數據合並以及reducer個數這三個方面的調優操作。也就是說雖然性能調優包括cpu、內存、磁盤io以及網絡這四個大方面,但是從mr程序的執行流程中,我們可以知道主要有調優的是內存、磁盤io以及網絡。在mr程序中調優,主要考慮的就是減少網絡傳輸和減少磁盤IO操作,故本次課程的mr調優主要包括服務器調優、代碼調優、mapper調優、reducer調優以及runner調優這五個方面。 三、服務器調優 服務器調優主要包括服務器參數調優和jvm調優。在本次項目中,由於我們使用hbase作為我們分析數據的原始數據存儲表,所以對於hbase我們也需要進行一些調優操作。除了參數調優之外,和其他一般的java程序一樣,還需要進行一些jvm調優。 hdfs調優 1. dfs.datanode.failed.volumes.tolerated: 允許發生磁盤錯誤的磁盤數量,默認為0,表示不允許datanode發生磁盤異常。當掛載多個磁盤的時候,可以修改該值。 2. dfs.replication: 復制因子,默認3 3. dfs.namenode.handler.count: namenode節點並發線程量,默認10 4. dfs.datanode.handler.count:datanode之間的並發線程量,默認10。 5. dfs.datanode.max.transfer.threads:datanode提供的數據流操作的並發線程量,默認4096。 一般將其設置為linux系統的文件句柄數的85%~90%之間,查看文件句柄數語句ulimit -a,修改vim /etc/security/limits.conf, 不能設置太大文件末尾,添加 * soft nofile 65535 * hard nofile 65535 註意:句柄數不能夠太大,可以設置為1000000以下的所有數值,一般不設置為-1。 異常處理:當設置句柄數較大的時候,重新登錄可能出現unable load session的提示信息,這個時候采用單用戶模式進行修改操作即可。 單用戶模式: 啟動的時候按‘a‘鍵,進入選擇界面,然後按‘e‘鍵進入kernel修改界面,然後選擇第二行‘kernel...‘,按‘e‘鍵進行修改,在最後添加空格+single即可,按回車鍵回到修改界面,最後按‘b‘鍵進行單用戶模式啟動,當啟動成功後,還原文件後保存,最後退出(exit)重啟系統即可。 6. io.file.buffer.size: 讀取/寫出數據的buffer大小,默認4096,一般不用設置,推薦設置為4096的整數倍(物理頁面的整數倍大小)。 mapreduce調優 1. mapreduce.task.io.sort.factor: mr程序進行合並排序的時候,打開的文件數量,默認為10個. 2. mapreduce.task.io.sort.mb: mr程序進行合並排序操作的時候或者mapper寫數據的時候,內存大小,默認100M 3. mapreduce.map.sort.spill.percent: mr程序進行flush操作的閥值,默認0.80。 4. mapreduce.reduce.shuffle.parallelcopies:mr程序reducer copy數據的線程數,默認5。 5. mapreduce.reduce.shuffle.input.buffer.percent: reduce復制map數據的時候指定的內存堆大小百分比,默認為0.70,適當的增加該值可以減少map數據的磁盤溢出,能夠提高系統性能。 6. mapreduce.reduce.shuffle.merge.percent:reduce進行shuffle的時候,用於啟動合並輸出和磁盤溢寫的過程的閥值,默認為0.66。如果允許,適當增大其比例能夠減少磁盤溢寫次數,提高系統性能。同mapreduce.reduce.shuffle.input.buffer.percent一起使用。 7. mapreduce.task.timeout:mr程序的task執行情況匯報過期時間,默認600000(10分鐘),設置為0表示不進行該值的判斷。 四、代碼調優 代碼調優,主要是mapper和reducer中,針對多次創建的對象,進行代碼提出操作。這個和一般的java程序的代碼調優一樣。 五、mapper調優 mapper調優主要就是就一個目標:減少輸出量。我們可以通過增加combine階段以及對輸出進行壓縮設置進行mapper調優。 combine介紹: 實現自定義combine要求繼承reducer類,特點: 以map的輸出key/value鍵值對作為輸入輸出鍵值對,作用是減少網絡輸出,在map節點上就合並一部分數據。 比較適合,map的輸出是數值型的,方便進行統計。 壓縮設置: 在提交job的時候分別設置啟動壓縮和指定壓縮方式。 六、reducer調優 reducer調優主要是通過參數調優和設置reducer的個數來完成。 reducer個數調優: 要求:一個reducer和多個reducer的執行結果一致,不能因為多個reducer導致執行結果異常。 規則:一般要求在hadoop集群中的執行mr程序,map執行完成100%後,盡量早的看到reducer執行到33%,可以通過命令hadoop job -status job_id或者web頁面來查看。 原因: map的執行process數是通過inputformat返回recordread來定義的;而reducer是有三部分構成的,分別為讀取mapper輸出數據、合並所有輸出數據以及reduce處理,其中第一步要依賴map的執行,所以在數據量比較大的情況下,一個reducer無法滿足性能要求的情況下,我們可以通過調高reducer的個數來解決該問題。 優點:充分利用集群的優勢。 缺點:有些mr程序沒法利用多reducer的優點,比如獲取top n的mr程序。 七、runner調優 runner調優其實就是在提交job的時候設置job參數,一般都可以通過代碼和xml文件兩種方式進行設置。 1~8詳見ActiveUserRunner(before和configure方法),9詳解TransformerBaseRunner(initScans方法) 1. mapred.child.java.opts: 修改childyard進程執行的jvm參數,針對map和reducer均有效,默認:-Xmx200m 2. mapreduce.map.java.opts: 需改map階段的childyard進程執行jvm參數,默認為空,當為空的時候,使用mapred.child.java.opts。 3. mapreduce.reduce.java.opts:修改reducer階段的childyard進程執行jvm參數,默認為空,當為空的時候,使用mapred.child.java.opts。 4. mapreduce.job.reduces: 修改reducer的個數,默認為1。可以通過job.setNumReduceTasks方法來進行更改。 5. mapreduce.map.speculative:是否啟動map階段的推測執行,默認為true。其實一般情況設置為false比較好。可通過方法job.setMapSpeculativeExecution來設置。 6. mapreduce.reduce.speculative:是否需要啟動reduce階段的推測執行,默認為true,其實一般情況設置為fase比較好。可通過方法job.setReduceSpeculativeExecution來設置。 7. mapreduce.map.output.compress:設置是否啟動map輸出的壓縮機制,默認為false。在需要減少網絡傳輸的時候,可以設置為true。 8. mapreduce.map.output.compress.codec:設置map輸出壓縮機制,默認為org.apache.hadoop.io.compress.DefaultCodec,推薦使用SnappyCodec(在之前版本中需要進行安裝操作,現在版本不太清楚,安裝參數:http://www.cnblogs.com/chengxin1982/p/3862309.html) 9. hbase參數設置 由於hbase默認是一條一條數據拿取的,在mapper節點上執行的時候是每處理一條數據後就從hbase中獲取下一條數據,通過設置cache值可以一次獲取多條數據,減少網絡數據傳輸。 源碼: 1、設置map端的數量:mapreduce.input.fileinputformat.split.minsize 位置FileInputFormat.getSplits()方法 (1)輸入文件size巨大,但不是小文件 減小map的數量:增大mapred.min.split.size的值 (2)輸入文件數量巨大,且都是小文件 使用FileInputFormat衍生的CombineFileInputFormat將多個input path合並成一個InputSplit送給mapper處理,從而減少mapper的數量 2、增加Map-Reduce job 啟動時創建的Mapper數量 可以通過減小每個mapper的輸入做到,即減小blockSize或者減小mapred.min.split.size的值,設置blockSize一般不可行

hadoop核心組件(一)