1. 程式人生 > 其它 >Hadoop學習筆記

Hadoop學習筆記

作者:伍棟樑

編輯:陳人和

1.hadoop安裝與介紹

1.1hadoop生態圈介紹

分散式系統—Google三架馬車(GFS,mapreduce,Bigtable)。google 公佈了釋出了這三個產品的詳細設計論文,但沒有公佈這三個產品的原始碼。Yahoo 資助的 Hadoop 按照這三篇論文的開源 Java 實現:Hadoop 對應 Mapreduce,Hadoop Distributed File System (HDFS)對應Google fs,Hbase對應Bigtable。不過在效能上Hadoop比 Google 要差很多。

1.1.1 GOOGLE三駕馬車

(1)分散式檔案系統 GFS :GFS 是一個可擴充套件的分散式檔案系統,用於大型的、分散式的、對大量資料進行訪問的應用。它運行於廉價的普通硬體上,提供容錯功能。對外的介面:和檔案系統類似,GFS 對外提供 create, delete,open, close,read, 和 write 操作。

(2)分散式計算 mapreduce 【通過平行計算可以處理 T 級別 p 級別 e 級別 z 級別的的資料】

Mapreduce 由 map 與 reduce 組成,map 把資料分發到多個主機上,reduce 是規約,把 map 的worker 計算出來的結果合併出來。Google 的 mapreduce 實現使用 GFS 的資料。Mapreduce的計算使用方式, Distributed Grep,Count of URL Access Frequency, Graph,Distributed Sort。

(3)分散式儲存 BIGTABLE【就像檔案系統需要資料庫來儲存結構化資料一樣,GFS 也需要 Bigtable 來儲存結構化資料。】

* BigTable 是建立在 GFS ,Scheduler ,Lock Service 和 MapReduce 之上的。

*每個 Table 都是一個多維的稀疏圖

*為了管理巨大的 Table,把 Table 根據行分割,這些分割後的資料統稱為:Tablets。每個Tablets 大概有 100-200 MB,每個機器儲存 100 個左右的 Tablets。底層的架構是:GFS。由於 GFS 是一種分散式的檔案系統,採用 Tablets 的機制後,可以獲得很好的負載均衡。比如:可以把經常響應的表移動到其他空閒機器上,然後快速重建。

Google 新三架馬車:

Caffeine : 新版的搜尋引擎

Pregel:圖演算法引擎,

Dremel:互動式資料分析系統

1.1.2 對應的Hadoop部分

(1)HDFS(Hadoop分散式檔案系統)

Hadoop體系中資料儲存管理的基礎。它是一個高度容錯的系統,能檢測和應對硬體故障,用於在低成本的通用硬體上執行。HDFS 簡化了檔案的一致性模型,通過流式資料訪問,提供高吞吐量應用程式資料訪問功能,適合帶有大型資料集的應用程式。

Client:切分檔案;訪問 HDFS;與 NameNode 互動,獲取檔案位置資訊;與 DataNode 互動,讀取和寫入資料。

NameNode:管理 HDFS 的名稱空間和資料塊對映資訊,配置副本策略,處理客戶端請求。【Master節點,在 hadoop1.X 中只有一個】

DataNode:儲存實際的資料【Slave 節點,彙報儲存資訊給 NameNode】

Secondary NameNode:定期合併 fsimage 和 fsedits,推送給 NameNode;輔助 NameNode,分擔其工作量;緊急情況下,可輔助恢復 NameNode,但 Secondary NameNode 並非NameNode 的熱備。

(2)Mapreduce (分散式計算框架)

【MapReduce 是一種計算模型,用以進行大資料量的計算。適合在大量計算機組成的分散式並行環境裡進行資料處理。】

Map 對資料集上的獨立元素進行指定的操作,生成鍵-值對形式中間結果。

Reduce 則對中間結果中相同“鍵”的所有“值”進行規約,以得到最終結果。

JobTracker:Master 節點,只有一個,管理所有作業,作業/任務的監控、錯誤處理等;將

任務分解成一系列任務,並分派給 TaskTracker。

TaskTracker:Slave 節點,執行 Map Task 和 Reduce Task;並與 JobTracker互動,彙報任務

狀態。

Map Task:解析每條資料記錄,傳遞給使用者編寫的 map(),並執行,將輸出結果寫入本地磁

盤(如果為 map-only 作業,直接寫入 HDFS)。

Reducer Task:從 Map Task 的執行結果中,遠端讀取輸入資料,對資料進行排序,將資料

按照分組傳遞給使用者編寫的 reduce 函式執行。

Mapreduce 處理流程,以 wordCount 為例:

(3)Hbase(分散式列存資料庫)

HBase 是一個針對結構化資料的可伸縮、高可靠、高效能、分散式和麵向列的動態模式資料庫。和傳統關係資料庫不同,HBase 採用了 BigTable 的資料模型:增強的稀疏排序對映表(Key/Value),其中,鍵由行關鍵字、列關鍵字和時間戳構成。HBase 提供了對大規模資料的隨機、實時讀寫訪問,同時,HBase 中儲存的資料可以使用MapReduce 來處理,它將資料儲存和平行計算完美地結合在一起。

資料模型:Schema-->Table-->ColumnFamily-->Column-->RowKey-->TimeStamp-->Value

1.2 hadoop 的生態系統

Apache Hive:

資料倉庫基礎設施,提供資料彙總和特定查詢。這個系統支援使用者進行有效

的查詢,並實時得到返回結果。由 facebook 開源,最初用於解決海量結構化的日誌資料統計問題。Hive 定義了一種類似 SQL 的查詢語言(HQL),將 SQL 轉化為 MapReduce 任務在 Hadoop 上執行。通常用於離線分析。

Apache Spark:

Apache Spark 是提供大資料集上快速進行資料分析的計算引擎。它建立在HDFS 之上,卻繞過了 MapReduce 使用自己的資料處理框架。Spark 常用於實時查詢、流處理、迭代演算法、複雜操作運算和機器學習。

Apache Ambari:

Ambari 用來協助管理 Hadoop。它提供對 Hadoop 生態系統中許多工具的

支援,包括 Hive、HBase、Pig、 Spooq 和 ZooKeeper。這個工具提供叢集管理儀表盤,可以跟蹤叢集執行狀態,幫助診斷效能問題。

Apache

Pig:

Pig 是一個整合高階查詢語言的平臺,可以用來處理大資料集。基於 Hadoop 的資料流系統,由 yahoo!開源。設計動機是提供一種基於MapReduce 的 ad-hoc(計算在 query 時發生)資料分析工具定義了一種資料流語言—Pig Latin,將指令碼轉換為 MapReduce 任務在 Hadoop 上執行。通常用於進行離線分析。

Zookeeper (分散式協作服務):

源自 Google 的 Chubby 論文<TheChubby lock service for loosely-coupled distributed systems>,Zookeeper 是 Chubby 克隆版解決分散式環境下的資料管理問題:統一命名,狀態同步,叢集管理,配置同步等。

Sqoop (資料同步工具):Sqoop 是 SQL-to-Hadoop 的縮寫,主要用於傳統資料庫和Hadoop 之前傳輸資料。資料的匯入和匯出本質上是Mapreduce 程式,充分利用了MR 的並行化和容錯性。

Mahout (資料探勘演算法庫):

Mahout 的主要目標是建立一些可擴充套件的機器學習領域經典演算法的實現,旨在幫助開發人員更加方便快捷地建立智慧應用程式。Mahout 現在已經包含了聚類、分類、推薦引擎(協同過濾)和頻繁集挖掘等廣泛使用的資料探勘方法。除了演算法,Mahout 還包含資料的輸入/輸出工具、與其他儲存系統(如資料庫、MongoDB 或 Cassandra)整合等資料探勘支援架構。

Flume (日誌收集工具):

Cloudera 開源的日誌收集系統,具有分散式、高可靠、高容錯、易於定製和擴充套件的特點。它將資料從產生、傳輸、處理並最終寫入目標的路徑的過程抽象為資料流,在具體的資料流中,資料來源支援在 Flume 中定製資料傳送方,從而支援收集各種不同協議資料。同時,Flume資料流提供對日誌資料進行簡單處理的能力,如過濾、格式轉換等。此外,Flume還具有能夠將日誌寫往各種資料目標(可定製)的能力。總的來說,Flume 是一個可擴充套件、適合複雜環境的海量日誌收集系統。

資源管理器的簡單介紹(YARN和 和 mesos):

隨著網際網路的高速發展,基於資料密集型應用的計算框架不斷出現,從支援離線處理的MapReduce,到支援線上處理的 Storm,從迭代式計算框架 Spark 到流式處理框架 S4,…,各種框架誕生於不同的公司或者實驗室,它們各有所長,各自解決了某一類應用問題。而在大部分網際網路公司中,這幾種框架可能都會採用,比如對於搜尋引擎公司,可能的技術方案如下:網頁建索引採用 MapReduce 框架,自然語言處理/資料探勘採用 Spark(網頁 PageRank計算,聚類分類演算法等,【注】Spark 現在不太成熟,很少有公司嘗試使用),對效能要求很高的資料探勘演算法用MPI 等。考慮到資源利用率,運維成本,資料共享等因素,公司一般希望將所有這些框架部署到一個公共的叢集中,讓它們共享叢集的資源,並對資源進行統一使用,這樣,便誕生了資源統一管理與排程平臺,典型代表是 Mesos 和 YARN。

其他的一些開源元件:

1) cloudrea impala:

一個開源的 Massively Parallel Processing(MPP)查詢引擎 。與 Hive 相同的元資料、SQL 語法、ODBC 驅動程式和使用者介面(Hue Beeswax),可以直接在 HDFS 或 HBase 上提供快速、互動式 SQL 查詢。Impala 不再使用緩慢的Hive+MapReduce 批處理,而是通過與商用並行關係資料庫中類似的分散式查詢引擎(由Query Planner、Query Coordinator 和 Query Exec Engine 三部分組成),可以直接從HDFS 或者 HBase 中用 SELECT、JOIN 和統計函式查詢資料,從而大大降低了延遲。

2)spark

Spark 是個開源的資料分析叢集計算框架,建立於 HDFS 之上。Spark 與 Hadoop 一樣,用於構建大規模、低延時的資料分析應用。Spark 採用 Scala 語言實現,使用 Scala 作為應用框架。Spark 採用基於記憶體的分散式資料集,優化了迭代式的工作負載以及互動式查詢。與Hadoop 不同的是,Spark 和 Scala 緊密整合,Scala 像管理本地 collective 物件那樣管理分散式資料集。Spark支援分散式資料集上的迭代式任務,實際上可以在 Hadoop 檔案系統上與Hadoop 一起執行(通過 YARN、Mesos 等實現)。

3) storm

Storm 是一個分散式的、容錯的實時計算系統, 屬於流處理平臺,多用於實時計算並更新資料庫。也可被用於“連續計算”(continuous computation),對資料流做連續查詢,在計算時就將結果以流的形式輸出給使用者。它還可被用於“分散式 RPC”,以並行的方式執行昂貴的運算。

2.HDFS

Hadoop整合了眾多檔案系統,在其中有一個綜合性的檔案系統抽象,它提供了檔案系統實現的各類介面,Hadoop提供了許多檔案系統的介面,使用者可以使用URI方案選取合適的檔案系統來實現互動。提供了一個高層的檔案系統抽象類org.apache.hadoop.fs.FileSystem,這個抽象類展示了一個分散式檔案系統,並有幾個具體實現,如下圖(hadoop檔案系統)所示。【HDFS只是這個抽象檔案系統的一個例項。】

2.1 基礎知識

2.1.1基本概念

分散式檔案系統

(1)特點

分散式檔案系統管理的物理儲存資源不一定直接連線在本地節點上,而是通過計算機網路與節點相連。

分散式檔案系統的基於客戶機/伺服器模式。通常,一個分散式檔案系統提供多個供使用者訪問的伺服器。

分散式檔案系統一般都會提供備份和容錯的功能

分散式檔案系統一般都基於作業系統的本地檔案系統

ext3, ext4

NTFS

(2)為什麼需要分散式檔案系統

傳統檔案系統最大的問題是容量和吞吐量的限制,多使用者多應用的並行讀寫是分散式檔案系統產生的根源,一塊硬碟的讀寫效能,比不上多塊硬碟同時讀寫的效能【1 HDD=75MB/sec

1000 HDDs = 75GB/sec】。

擴充儲存空間的成本低廉,可提供冗餘備份,可以為分散式計算提供基礎。

hdfs:

(1)概念

Hadoop Distributed File System是一個基於*nix的使用Java實現的、分散式的、可橫向擴充套件的檔案系統,是Hadoop的核心元件。是分散式計算中資料儲存管理的基礎,是基於流資料模式訪問和處理超大檔案的需求而開發的,可以運行於廉價的商用伺服器上。它所具有的高容錯、高可靠性、高可擴充套件性、高獲得性、高吞吐率等特徵為海量資料提供了不怕故障的儲存,為超大資料集(LargeData Set)的應用處理帶來了很多便利。

(2)設計目標

A、基於廉價的普通硬體,可以容忍硬體出錯,系統中的某一臺或幾臺伺服器出現故障的時候,系統仍可用且資料保持完整

B、大資料集(大檔案),HDFS適合儲存大量檔案,總儲存量可以達到PB,EB級,HDFS適合儲存大檔案,單個檔案大小一般在百MB級之上,檔案數目適中。

C、簡單的一致性模型,HDFS應用程式需要一次寫入,多次讀取一個檔案的訪問模式,支援追加(append)操作,但無法更改已寫入資料。

D、順序的資料流訪問,HDFS適合用於處理批量資料,而不適合用於隨機定位訪問。

E、側重高吞吐量的資料訪問,可以容忍資料訪問的高延遲。

F、為把“計算”移動到“資料”提供基礎和便利。

資料塊:

HDFS(HadoopDistributed File System)預設的最基本的儲存單位是64M的資料塊,【可針對每個檔案配置,由客戶端指定,每個塊有一個自己的全域性ID】。和普通檔案系統相同的是,HDFS中的檔案是被分成64M一塊的資料塊儲存的。與傳統檔案系統不同的是,如果實際資料沒有達到塊大小,則並不實際佔用磁碟空間【如果一個檔案是200M,則它會被分為4個塊: 64+64+64+8。儘管64m為基本儲存單位,但10m的檔案仍然只佔10m的空間】

使用塊的好處:

當一個檔案大於叢集中任意一個磁碟的時候,檔案系統可以充分利用叢集中所有的磁碟。管理塊使底層的儲存子系統相對簡單。塊更加適合備份,從而為容錯和高可用性的實現帶來方便。

塊的冗餘備份:

每個塊在叢集上會儲存多份(replica),預設複製份數為3,【可針對每個檔案配置,由客戶端指定,可動態修改】。

某個塊的所有備份都是同一個ID,系統無需記錄“哪些塊其實是同一份資料”。

系統可以根據機架的配置自動分配備份位置,第一份在叢集的某個機架的某臺機器上,其他兩份在另外的一個機架的兩臺機器上。【此策略是效能與冗餘性的平衡,機架資訊需要手工配置】。

元資料

元資料包括:

檔案系統目錄樹資訊:檔名,目錄名、檔案和目錄的從屬關係、 檔案和目錄的大小,建立及最後訪問時間、許可權

檔案和塊的對應關係:檔案由哪些塊組成

塊的存放位置:機器名,塊ID

資料儲存的方法:

元資料儲存在一臺指定的伺服器上(NameNode)

實際資料儲存在叢集的其他機器的本地檔案系統中(DataNode)

Namenode

NameNode是用來管理檔案系統名稱空間的元件【一個HDFS叢集只有一臺NameNode一個名稱空間,一個根目錄】,其上存放了HDFS的元資料。一個HDFS叢集只有一份元資料,目前有單點故障的問題。

元資料節點是分散式檔案系統中的管理者,用來管理檔案系統的名稱空間,叢集配置資訊和儲存塊的複製等。其將所有的檔案和資料夾的元資料儲存在一個檔案系統樹中。這些資訊也會在硬碟上儲存成以下檔案:名稱空間映象(namespace image)及修改日誌(edit log)。還儲存了一個檔案包括哪些資料塊,分佈在哪些資料節點上。NameNode會將檔案系統的Meta-data儲存在記憶體中,這些資訊主要包括了檔案資訊、每一個檔案對應的檔案塊的資訊和每一個檔案塊在DataNode的資訊等。

元資料儲存在NameNode的記憶體當中,以便快速查詢。【1G記憶體大致可以存放1,000,000個塊對應的元資料資訊】,按預設每塊64M計算,大致對應64T實際資料。這些資訊是在系統啟動的時候從資料節點收集而成的。

Datanode

資料節點是檔案系統中真正儲存資料的地方。客戶端(client)或者元資料資訊(namenode)可以向資料節點請求寫入或者讀出資料塊。DataNode是檔案儲存的基本單元,它將Block儲存在本地檔案系統中,儲存了Block的Meta-data。 其週期性的向元資料節點回報其儲存的資料塊資訊。

每個塊會在本地檔案系統產生兩個檔案,一個是實際的資料檔案,另一個是塊的附加資訊檔案,其中包括資料的校驗和,生成時間。客戶端讀取/寫入資料的時候直接與DataNode通訊。

【DataNode通過心跳包(Heartbeat)與NameNode通訊,這兩類節點分別承擔Master和Worker具體任務的執行節點。】

Secondary namenode

從元資料節點並不是元資料節點出現問題時候的備用節點,它和元資料節點負責不同的事情。其主要功能就是週期性將元資料節點的名稱空間映象檔案【fsimage】和修改日誌【edits】合併,以此來控制edits的檔案大小在合理的範圍。合併過後的名稱空間映象檔案也在從元資料節點儲存了一份,以防元資料節點失敗的時候,可以恢復。

為了縮短叢集重啟時NameNode重建fsimage的時間,在NameNode硬碟損壞的情況下,Secondary NameNode也可用作資料恢復,但絕不是全部。一般情況下Secondary Namenode執行在不同與NameNode的主機上,並且它的記憶體需求和NameNode是一樣的。

nn,dn,snn總結

1namenode

記錄源資料的名稱空間

資料分配到那些datanode儲存

協調客戶端對檔案訪問

2.datanode

負責所在物理節點的儲存管理

一次寫入,多次讀取(不能修改)

檔案由資料塊組成,典型的塊大小是64M

資料塊儘量散步到各個節點

3.secondarynamenode (輔助)

當NameNode重啟的時候,會合並硬碟上的fsimage檔案和edits檔案,得到完整的Metadata資訊。這個fsimage檔案可以看做是一個過時的Metadata資訊檔案(最新的Metadata修改資訊在edits檔案中)。如果edits檔案非常大,那麼這個合併過程就非常慢,導致HDFS長時間無法啟動,如果定時將edits檔案合併到fsimage,那麼重啟NameNode就可以非常快。SecondaryNameNode就做這個合併的工作。

總體結構:

HDFS是一個主/從(Mater/Slave)體系結構,從終端使用者的角度來看,它就像傳統的檔案系統一樣,可以通過目錄路徑對檔案執行CRUD(Create、Read、Update和Delete)操作。但由於分散式儲存的性質,客戶端通過同NameNode和DataNodes的互動訪問檔案系統。客戶端聯絡NameNode以獲取檔案的元資料,而真正的檔案I/O操作是直接和DataNode進行互動的。【Client就是需要獲取分散式檔案系統檔案的應用程式。】

HDFS典型的部署是在一個專門的機器上執行NameNode,叢集中的其他機器各執行一個DataNode;也可以在執行NameNode的機器上同時執行DataNode,或者一臺機器上執行多個DataNode。一個叢集只有一個NameNode的設計大大簡化了系統架構。

檔案讀入讀出:

1)檔案寫入

Client向NameNode發起檔案寫入的請求。

NameNode根據檔案大小和檔案塊配置情況,返回給Client它所管理部分DataNode的資訊。

Client將檔案劃分為多個Block,根據DataNode的地址資訊,按順序寫入到每一個DataNode塊中。

2)檔案讀取

Client向NameNode發起檔案讀取的請求。

NameNode返回檔案儲存的DataNode的資訊。

Client讀取檔案資訊。

2.1.2 HDFS特性和原理

HDFS優點

1)處理超大檔案

  這裡的超大檔案通常是指百MB、設定數百TB大小的檔案。目前在實際應用中,HDFS已經能用來儲存管理PB級的資料了。

2)流式的訪問資料

HDFS的設計建立在更多地響應"一次寫入、多次讀寫"任務的基礎上。這意味著一個數據集一旦由資料來源生成,就會被複制分發到不同的儲存節點中,然後響應各種各樣的資料分析任務請求。在多數情況下,分析任務都會涉及資料集中的大部分資料,也就是說,對HDFS來說,請求讀取整個資料集要比讀取一條記錄更加高效。

3)運行於廉價的商用機器叢集上

Hadoop設計對硬體需求比較低,只須執行在低廉的商用硬體叢集上,而無需昂貴的高可用性機器上。廉價的商用機也就意味著大型叢集中出現節點故障情況的概率非常高。這就要求設計HDFS時要充分考慮資料的可靠性,安全性及高可用性。

HDFS缺點

1)不適合低延遲資料訪問

  如果要處理一些使用者要求時間比較短的低延遲應用請求,則HDFS不適合。HDFS是為了處理大型資料集分析任務的,主要是為達到高的資料吞吐量而設計的,這就可能要求以高延遲作為代價。

  改進策略:對於那些有低延時要求的應用程式,HBase是一個更好的選擇。通過上層資料管理專案來儘可能地彌補這個不足。在效能上有了很大的提升,它的口號就是goes real time。使用快取或多master設計可以降低client的資料請求壓力,以減少延時。還有就是對HDFS系統內部的修改,這就得權衡大吞吐量與低延時了,HDFS不是萬能的銀彈。

2)無法高效儲存大量小檔案

  因為Namenode把檔案系統的元資料放置在記憶體中,所以檔案系統所能容納的檔案數目是由Namenode的記憶體大小來決定。一般來說,每一個檔案、資料夾和Block需要佔據150位元組左右的空間,所以,如果你有100萬個檔案,每一個佔據一個Block,你就至少需要300MB記憶體。當前來說,數百萬的檔案還是可行的,當擴充套件到數十億時,對於當前的硬體水平來說就沒法實現了。還有一個問題就是,因為Map task的數量是由splits來決定的,所以用MR處理大量的小檔案時,就會產生過多的Maptask,執行緒管理開銷將會增加作業時間。舉個例子,處理10000M的檔案,若每個split為1M,那就會有10000個Maptasks,會有很大的執行緒開銷;若每個split為100M,則只有100個Maptasks,每個Maptask將會有更多的事情做,而執行緒的管理開銷也將減小很多。

  改進策略:要想讓HDFS能處理好小檔案,有不少方法。

利用SequenceFile、MapFile、Har等方式歸檔小檔案,這個方法的原理就是把小檔案歸檔起來管理,HBase就是基於此的。對於這種方法,如果想找回原來的小檔案內容,那就必須得知道與歸檔檔案的對映關係。

橫向擴充套件,一個Hadoop叢集能管理的小檔案有限,那就把幾個Hadoop叢集拖在一個虛擬伺服器後面,形成一個大的Hadoop叢集。google也是這麼幹過的。

多Master設計,這個作用顯而易見了。正在研發中的GFS II也要改為分散式多Master設計,還支援Master的Failover,而且Block大小改為1M,有意要調優處理小檔案啊。

附帶個Alibaba DFS的設計,也是多Master設計,它把Metadata的對映儲存和管理分開了,由多個Metadata儲存節點和一個查詢Master節點組成。

3)不支援多使用者寫入及任意修改檔案

  在HDFS的一個檔案中只有一個寫入者,而且寫操作只能在檔案末尾完成,即只能執行追加操作。目前HDFS還不支援多個使用者對同一檔案的寫操作,以及在檔案任意位置進行修改。

HDFS客戶端

(1)命令列客戶端

同一個Hadoop安裝包

(2)API客戶端

Java庫

非POSIX介面

封裝了通訊細節

元資料的持久化

NameNode裡使用兩個非常重要的本地檔案來儲存元資料資訊:

Fsimage:

儲存了檔案系統目錄樹資訊,儲存了檔案和塊的對應關係

edits

儲存檔案系統的更改記錄(journal),當客戶端對檔案進行寫操作(包括新建或移動)的時候,操作首先記入edits,成功後才會更改記憶體中的資料,並不會立刻更改硬碟上的fsimage

塊的位置資訊並不做持久化

元資料的載入和更新

NameNode啟動時:

通過fsimage讀取元資料,載入記憶體。執行edits中的記錄,在記憶體中生成最新的元資料

清空edits,儲存最新的元資料到fsimage。收集DataNode彙報的塊的位置資訊

NameNode執行時:

對檔案建立和寫操作,記錄到edits,更新記憶體中的元資料,收集DataNode彙報的塊的建立和複製資訊。

HDFS建立檔案流程

客戶端

客戶端請求NameNode在名稱空間中建立新的檔案元資訊。

如果不能建立檔案則 NameNode 會返回失敗。【檔案已存在,資源不足】

如建立成功,客戶端得到此檔案的防寫鎖。 NameNode

Namenode檢查叢集和檔案狀態。

建立防寫鎖來保證只有一個客戶端在操作該檔案。

建立該檔案的元資訊。

把建立檔案這個事件加入edits。

為該檔案分配塊,以及塊的冗餘備份位置。

HDFS的讀操作流程

客戶端與NameNode通訊獲取檔案的塊位置資訊,其中包括了塊的所有冗餘備份的位置資訊:DataNode的列表

客戶端獲取檔案位置資訊後直接同有檔案塊的DataNode通訊,讀取檔案。如果第一個DataNode無法連線,客戶端將自動聯絡下一個DataNode

如果塊資料的校驗值出錯,則客戶端需要向NameNode報告,並自動聯絡下一個DataNode,

Namenode並不參與資料實際傳輸

HDFS寫操作流程

客戶端寫一個檔案並不是直接寫到HDFS上,HDFS客戶端接收使用者資料,並把內容快取在本地,當本地快取收集足夠一個HDFS塊大小的時候,客戶端同NameNode通訊註冊一個新的塊。註冊塊成功後,NameNode會給客戶端返回一個DataNode的列表,列表中是該塊需要存放的位置,包括冗餘備份。

客戶端向列表中的第一個DataNode寫入塊,當完成時第一個DataNode 向列表中的下個DataNode傳送寫操作,並把資料已收到的確認資訊給客戶端,同時傳送確認資訊給NameNode,之後的DataNode重複之上的步驟,當列表中所有DataNode都接收到資料並且由最後一個DataNode校驗資料正確性完成後,返回確認資訊給客戶端。

收到所有DataNode的確認資訊後,客戶端刪除本地快取,客戶端繼續傳送下一個塊,重複以上步驟。當所有資料傳送完成後,寫操作完成。

(客戶端請求namenode建立新檔案客戶端將資料寫入DFSOutputStream建立pipetine依次將目標資料塊寫入各個datanode,建立多個副本)

HDFS追加寫(append)的操作流程

客戶端與NameNode通訊,獲得檔案的防寫鎖及檔案最後一個塊的位置(DataNode列表)

客戶端挑選一個DataNode作為主寫入節點,並對其餘節點上的該資料塊加鎖,開始寫入資料,與普通寫入流程類似,依次更新各個DataNode上的資料,更新時間戳和校驗和,最後一個塊寫滿,並且所有備份塊都完成寫入後,向NameNode申請下一個資料塊。

Secondary NameNode的執行過程

Secondary NameNode根據配置好的策略決定多久做一次合併

fs.checkpoint.period 和 fs.checkpoint.size

通知NameNode現在需要回滾edits日誌,此時NameNode的新操作將寫入新的edits 檔案

Secondary NameNode通過HTTP從NameNode取得fsimage 和 edits

Secondary NameNode將fsimage載入記憶體,執行所有edits中的操作,新建新的完整的fsimage

Secondary NameNode將新的fsimage傳回NameNode

NameNode替換為新的fsimage並且記錄此checkpoint的時間

2.1.3 hdfs高階特性

hdfs作用

1.提供分散式儲存機制,提供可線形增長的海量儲存能力

2.自動資料冗餘,無須使用raid,無須另行備份

3.為進一步分析計算提供資料基礎

(可以在任意節點上使用HDFS儲存,不用在namenode節點,datanode也可以)

hdfs設計基礎與目標

1.硬體錯誤是常態,因此需要冗餘(因為hadoop儲存都比較廉價)

2.流式資料訪問.即資料批量讀取而非隨機讀寫,hadoop擅長做的是資料分析而不是事務處理

3.大資料處理

4.簡單一致性模型,為了降低系統複雜度,對檔案採用一次性寫多次讀的邏輯設計,即檔案一經寫入, 關閉,就再也不能修改了.

5.程式採用"資料就近"分配節點執行

元資料目錄

[hadoop@h1 ~]$ cd/tmp/hadoop-hadoop/dfs/name
[hadoop@h1 name]$ ls
current image  in_use.lock  previous.checkpoint
[hadoop@h1 name]$ cd current/
[hadoop@h1 current]$ ls
edits fsimage  fstime  VERSION
[hadoop@h1 current]$ strings VERSION
#Thu Aug 27 13:13:39 CST 2015
namespaceID=1025320819  (名稱空間)
cTime=0  (建立的時間,升級後會改變)
storageType=NAME_NODE  (儲存型別)
layoutVersion=-19   (hdfs版本)
[hadoop@h1 current]$

fsimage(映像檔案,namenode主要在記憶體中工作,檢查點時候先到fsimage中)

edits (操作動作寫到edits 類似於redo檔案,fsimage寫一次 之前的的內容被清空)

[hadoop@h1 ~]$ cd/tmp/hadoop-hadoop/dfs/namesecondary/

[hadoop@h1 namesecondary]$ ls

current image in_use.lock

[hadoop@h1 namesecondary]$

檢視datanode節點

[

[hadoop@h2 ~]$  cd /tmp/hadoop-hadoop/dfs/
[hadoop@h2 dfs]$ ls
data
[hadoop@h2 dfs]$ cd data/current/
[hadoop@h2 current]$ ls
blk_5351326362188108637            dncp_block_verification.log.curr
blk_5351326362188108637_1154.meta  VERSION

【blk開頭的為資料塊】

HDFS負載均衡

[hadoop@h91 hadoop-0.20.2-cdh3u5]$bin/start-balancer.sh -threshold 10;

(如果節點間 資料使用量的偏差 小於10% 就認為正常 )

機架感知

新增節點(刪節點)

1)在新節點安裝好hadoop

2)把namenode的有關配置檔案複製到該節點

3)修改masters和staves檔案,增加該節點

4)設定ssh免密碼進出該節點

5)單獨啟動該節點上的datanode和tasktracker (hadoop-daemon.sh start datanode/tasktracker)

6)執行start-batancensh進行資料負載均衡

*是否要重啟叢集?

hadoop 壓縮

對Hadoop來說,有兩個地方需要用到壓縮:其一,在HDFS上儲存資料檔案,壓縮之後資料體積更小,有利儲存;其二,叢集間的通訊需要壓縮資料,這樣可以提高網路頻寬的利用率。如果用MapReduce處理壓縮檔案,那麼要求壓縮演算法能支援檔案分割,因為MapReduce的過程需要將檔案分割成多個切片,如果壓縮演算法不支援分割,就不能做切片了。

在Java裡,一切輸入輸出都用流的方式進行。一個可以讀取位元組序列的物件叫輸入流。檔案,網路連線,記憶體區域,都可以是輸入流。一個可以寫入位元組序列的物件叫輸出流。檔案,網路連線,記憶體區域,都可以是輸出流。

Hadoop如何壓縮?假設,輸入流是A,輸出流是B。A和B有很多種可能,可以是檔案,網路連線,記憶體區域,標準輸入,標準輸出的兩兩組合。做壓縮的話,先選擇壓縮演算法,然後根據壓縮演算法建立相應的壓縮器,然後用壓縮器和輸出流B建立壓縮輸出流C,最後,將資料從輸入流A複製到壓縮輸出流C即可進行壓縮並輸出結果。

如果是解壓縮,先選擇解壓縮演算法,然後根據解壓縮演算法建立相應的解壓縮器,然後用解壓縮器和輸入流A建立壓縮輸入流C,最後,將資料從輸入流C複製到輸出流B即可進行解壓縮並輸出結果。

1、編程式碼

viCprsF2F.java
importjava.net.URI;
importjava.io.InputStream;
importjava.io.OutputStream;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.io.compress.CompressionCodec;
importorg.apache.hadoop.io.IOUtils;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.util.ReflectionUtils;
publicclass CprsF2F{
  public static void main(String[] args) throwsException{
    if (args.length != 3){
      System.err.println("Usage: CprsF2Fcmps_name src target");
      System.exit(2);
    }
    Class<?> codecClass =Class.forName(args[0]);  //反射機制
    Configuration conf = new Configuration();
    CompressionCodec codec =(CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);
    //重構物件,org.apache.hadoop.io.compress.GzipCodec 加入新的屬性,構成一個新的物件
    InputStream in = null;
    OutputStream out = null;
    FileSystem fs =FileSystem.get(URI.create(args[1]), conf);
    try{
      in = fs.open(new Path(args[1]));
      out =codec.createOutputStream(fs.create(new Path(args[2])));
      IOUtils.copyBytes(in, out, conf);
    }finally{
      IOUtils.closeStream(in);
     IOUtils.closeStream(out);
    }
  }
}

2、演示

[[email protected]]$ usr/jdk1.7.0_25/bin/javac CprsF2F.java[[email protected]]$ /usr/jdk1.7.0_25/bin/jar -cvf CprsF2F.jar CprsF2F.class
[[email protected]]$ echo "hellowrod">a.txt
[[email protected]]$ bin/hadoop fs -put a.txt a.txt
【壓縮a.txt  壓縮檔案為b.txt】
[[email protected]]$ bin/hadoop jar CprsF2F.jar CprsF2Forg.apache.hadoop.io.compress.GzipCodec /user/hadoop/a.txt /user/hadoop/b.txt
[[email protected]]$ bin/hadoop fs -cat b.txt
???/???Yμ-   (亂碼)
[[email protected]]$ bin/hadoop fs -cat b.txt|gunzip
hellowrod

2.1.4 HDFS可靠性

1)冗餘副本策略

[hadoop@h1 ~]$ cd/usr/local/hadoop-1.2.1/conf/

[hadoop@h1 conf]$ cat hdfs-site.xml

<value>2</value> 2為冗餘性2份(記錄塊時在不同的節點上儲存2份)

Datanode啟動時,遍歷本地檔案系統,產生一份hdfs資料塊和本地檔案的對應關係列 表(bbckreport)彙報給namenode

2)機架策略

叢集_般放在不同機架上,機架間頻寬要比機架內頻寬要小

HDFS的"機架感知"(設定同一機架的指令碼)

((RackAware.py
#!/usr/bin/python #-*-coding:UTF-8 importsys
rack ={"hadoop-node-31":"rackl",
"hadoop-node-32":"rackl",
"hadoop-node-49":"rack2",
"hadoop-node-50":"rack2",
"hadoop-node-51":"rack2",
"192.168.1.31":"rackl",
"192.168.1.32":"rackl",
"192.168.1.49":"rack2",
"192.168.1.50":"rack2",
"192.168.1.51":"rack2",
core-site.xml配置檔案
<property>
<name>topology.script.file.name</name>
<value>/opt/modules/hadoop/hadoop-1.0.3/bin/RackAware.py</value>
<!--機架感知指令碼路徑-->
</property>
<property>
<name>topology.script.number.args</name>
<value>20</value>
<!--機架伺服器數量,由於我寫了20個,所以這裡寫20-->
</property>
))

一般在本機架存放一個副本,在其它機架再存放別的副本,這樣可以防止機架失效時丟失資料,也可以提高頻寬利用率。

3)心跳機制

Namenode週期性從datanode接收心跳訊號和塊報告,Namenode根據塊報告驗證元資料沒有按時傳送心跳的datanode會被標記為宕機,不會再給它任何I/O請求,如果datanode失效造成副本數量下降,並且低於預先設定的閾值,namenode會檢測 出這些資料塊,並在合適的時機進行重新複製,引發重新複製的原因還包括資料副本本身損壞、磁碟錯誤,複製因子被增大等

4)安全模式

Namenode啟動時會先經過一個"安全模式〃階段 ,安全模式階段不會產生資料寫,在此階段Namenode收集各個datanode的報告,當資料塊達到最小副本數以上時,會被認為是"安全"的。,在一定比例(可設定)的資料塊被確定為"安全"後,再過若干時間,安全模式結束

當檢測到副本數不足的資料塊時,該塊會被複制直到達到最小副本數?

[hadoop@h1 hadoop-1.2.1]$ bin/hadoopdfsadmin -safemode enter  (進入安全模式)
Safe mode is ON
[hadoop@h1 input]$ cd /home/hadoop/input/
[hadoop@h1 input]$ touch ab
[hadoop@h1 input]$ cd/usr/local/hadoop-1.2.1/
[hadoop@h1 hadoop-1.2.1]$ bin/hadoop fs-put /home/hadoop/input/abc ./in
 (傳遞檔案到hdfs中不能成功,因為處於安全模式)
put:org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot create/user/hadoop/in/abc. Name node is in safe mode.
[hadoop@h1 hadoop-1.2.1]$ bin/hadoop fs -ls./in(檢視下,abc檔案沒有被拷入)
[hadoop@h1 hadoop-1.2.1]$ bin/hadoop fs-rmr ./in/test1.txt
(刪除檔案也報錯,安全模式下不允許刪除)
[hadoop@h1 hadoop-1.2.1]$ bin/hadoopdfsadmin -safemode leave (退出安全模式)
Safe mode is OFF

5)校驗和

在檔案創立時,每個資料塊都產生校驗和,校驗和儲存在.meta檔案內 ,客戶端獲取資料時可以檢査校驗和是否相同,從而發現數據塊是否損壞。如果正在讀取的資料塊損壞,則可以繼續讀取其它副本。

[hadoop@h2 ~]$ cd /tmp/hadoop-hadoop/dfs(datanode節點)
[hadoop@h2 dfs]$ cd data/current/
[hadoop@h2 current]$ ls (會看到資料塊,拷貝到hdfs儲存的資料,分散到各個節點)
blk_7094582618848038271_1003.meta (.meta字尾的為儲存塊的校驗碼)

6)回收站

刪除檔案時,其實是放入回收站/trash。回收站裡的檔案可以快速恢復。可以設定一個時間閾值,當回收站裡檔案的存放時間超過這個閾值,就被徹底刪除, 並且釋放佔用的資料塊。

(開啟回收站功能)

[hadoop@h1 ~]$ cd/usr/local/hadoop-1.2.1/conf
[hadoop@h1 conf]$ vi core-site.xml  (新增下面一段,10080為保留時間,單位分鐘)
<property>
<name>fs.trash.interval</name>
<value>10080</value>
<description>
Number of minutes between trashcheckpoints.
If zero, the trash feature is disabted
</description>
</property>
[hadoop@h1 conf]$ cd ..
[hadoop@h1 hadoop-1.2.1]$ bin/stop-all.sh
[hadoop@h1 hadoop-1.2.1]$ bin/start-all.sh(重啟 回收站功能生效)
[hadoop@h1 hadoop-1.2.1]$ bin/hadoop fs -ls./in
[hadoop@h1 hadoop-1.2.1]$ bin/hadoop fs-rmr ./in/test1.txt (刪除test1.txt)
Moved to trash:hdfs://h1:9000/user/hadoop/in/test1.txt
[hadoop@h1 hadoop-1.2.1]$ bin/hadoop fs -ls(發現多了個.Trash目錄)
drwxr-xr-x  - hadoop supergroup          0 2013-11-26 14:47 /user/hadoop/.Trash
drwxr-xr-x  - hadoop supergroup          02013-11-26 14:47 /user/hadoop/in
drwxr-xr-x  - hadoop supergroup          02013-11-26 12:19 /user/hadoop/out
[hadoop@h1 hadoop-1.2.1]$ bin/hadoop fs -ls./.Trash/Current/user/hadoop/in (發現了我們刪除的test1.txt)
[hadoop@h1 hadoop-1.2.1]$ bin/hadoop fs -mv./.Trash/Current/user/hadoop/in/test1.txt ./in (恢復檔案)
[hadoop@h1 hadoop-1.2.1]$ bin/hadoop fs -ls./in (又回來了)
[hadoop@h1 hadoop-1.2.1]$ bin/hadoop fs-expunge(清空回收站)

7)元資料保護

映像檔案剛和事務日誌是Namenode的核心資料。可以配置為擁有多個副本,副本會降低Namenode的處理速度,但增加安全性,Namenode依然是單點,如果發生故障要手工切換。

8、Zookeeper

8.1

8.1.1 Zookeeper資料模型

zookeeper使用了一個類似⽂檔案系統的樹結構,資料可以掛在某個節點上,可以對這節點進行刪改。

(1)每個節點在 zookeeper 中叫做 znode,並且其有⼀一個唯⼀一的路徑標識,Znode 可以有⼦znode(臨時節點除外),並且 znode 裡可以存資料。

(2)Znode 中的資料可以有多個版本,比如某⼀一個路徑下存有多個數據版本,那麼查詢這個路徑下的資料就需要帶上版本。

(3)znode 可以被監控,包括這個目錄節點中儲存的資料的修改,⼦子節點目錄的變化等,一旦變化可以通知設定監控的客戶端。這個功能是 zookeeper 對於應用最重要的特性,通過這個特性可以實現的功能包括配置的集中管理,叢集管理,分散式鎖等等。

(4)ZooKeeper 的資料結構, 與普通的⽂檔案系統極為類似.每個 znode 由 3 部分組成

stat. 此為狀態資訊, 描述該 znode 的版本, 許可權等資訊.

data. 與該 znode 關聯的資料.

children. 該 znode 下的⼦子節點.

8.1.2 ZooKeeper 命令

連線 server:zkCli.sh –server master

列出指定 node 的⼦子 node:ls /

建立 znode 節點, 並指定關聯資料:create/rongxin hello【建立節點/hello, 並將字串"world"關聯到該節點中.】

獲取 znode 的資料和狀態資訊:get /rongxin

刪除 znode: delete /rongxin

【delete 命令可以刪除指定 znode. 但當該 znode 擁有子 znode 時, 必須先刪除其所有⼦ znode, 否則操作將失敗。rmr 是⼀一個遞迴刪除命令, 如果發⽣生指定節點擁有⼦子節點時, rmr 命令會首先刪除⼦子節點.】

ls2(檢視當前節點資料並能看到更新次數等資料) ,

set(修改節點)

8.1.3 znode 節點

znode節點的狀態資訊

【使用 get 命令獲取指定節點的資料時, 同時也將返回該節點的狀態資訊, 稱為 Stat.】

czxid. 節點建立時的 zxid.

mzxid. 節點最新⼀一次更新發⽣生時的zxid.

ctime. 節點建立時的時間戳.

mtime. 節點最新⼀一次更新發⽣生時的時間戳.

dataVersion. 節點資料的更新次數.

cversion. 其⼦子節點的更新次數.

aclVersion. 節點 ACL(授權資訊)的更新次數.

ephemeralOwner. 如果該節點為ephemeral 節點,ephemeralOwner 值表示與該節點繫結的 session id. 如果該節點不是 ephemeral

節點, ephemeralOwner 值為 0. ⾄至於什麼是ephemeral 節點, 請看後面的講述.

dataLength. 節點資料的位元組數.

numChildren. ⼦子節點個數.

zxid

znode 節點的狀態資訊中包含 czxid 和 mzxid, 那麼什麼是 zxid 呢?

ZooKeeper 狀態的每一次改變, 都對應著一個遞增的Transaction id, 該 id 稱為 zxid. 由於 zxid 的遞增性質, 如果 zxid1 小於zxid2, 那麼 zxid1 肯定先於 zxid2 發⽣生. 建立任意節點, 或者更新任意節點的資料, 或者刪除任意節點, 都會導致 Zookeeper 狀態發⽣生改變, 從⽽而導致 zxid 的值增加。

session

在 client 和 server 通訊之前, 首先需要建立連線, 該連線稱為 session. 連線建立後, 如果發⽣生連線超時, 授權失敗, 或者顯式關閉連線, 連線便處於 CLOSED 狀態, 此時 session 結束.

節點型別【臨時、永久、有序】

永久節點:persistent 節點不和特定的 session 繫結, 不會隨著建立該節點的 session 的結束⽽而消失, ⽽而是⼀一直存在, 除非該節點被顯式刪除.

臨時節點:ephemeral節點是臨時性的, 如果建立該節點的 session 結束了, 該節點就會被自動刪除. ephemeral 節點不能擁有⼦子節點. 雖然 ephemeral 節點與建立它的session 繫結, 但只要該該節點沒有被刪除, 其他 session 就可以讀寫該節點中關聯的資料. 。【Zookeeper 的客戶端和伺服器通訊採用長連線⽅方式,每個客戶端和 伺服器通過⼼心跳來保持連線,這個連線狀態稱為 session,如果 znode 是臨時節點,這個session 失效,znode 也就刪除了】使用-e引數指定建立ephemeral節點,如create –e/rongxin/test hello

有序節點:sequence 並非節點型別中的一種 sequence 節點既可以是 ephemeral 的, 也可以是 persistent 的. 建立sequence 節點時, ZooKeeperserver 會在指定的節點名稱後加上⼀個數字序列, 該數字序列是遞增的. 因此可以多次建立相同的sequence 節點, ⽽而得到不同的節點.使用-s 引數指定建立 sequence 節點.如create –s /hello/item/word

watch監聽感興趣的事件

在命令⾏行中, 以幾個命令可以指定是否監聽相應的事件。

(1) ls 命令: ls /rongxin true

【ls 命令的第⼀一個引數指定 znode, 第⼆二個引數如果為 true, 則說明監聽該 znode 的⼦子節點的增減, 以及該 znode 本身的刪除事件。】

(2)get 命令:get /rongxin true

【get 命令的第⼀一個引數指定 znode, 第⼆二個引數如果為 true, 則說明監聽該 znode 的更新和刪除事件.】

(3)stat 命令

【stat 命令用於獲取 znode 的狀態資訊. 第⼀一個引數指定 znode, 如果第⼆二個引數為 true, 則監聽該 node 的更新和刪除事件.】

8.1.4 java 程式碼使用 zookeeper

Zookeeper 的使用主要是通過建立其jar 包下的 Zookeeper 例項,並且呼叫其接⼝口⽅方法進⾏行的,主要的操作就是對 znode 的增刪改操作,監聽 znode 的變化以及處理。

znode應用例項:實時更新伺服器列表

(1)場景描述

在分散式應用中, 我們經常同時啟動多個 server, 呼叫⽅方(client)選擇其中之一發起請求.分散式應用必須考慮⾼高可用性和可擴充套件性: server 的應用程序可能會崩潰, 或者 server 本身也可能會宕機. 當 server 不夠時, 也有可能增加 server 的數量. 總⽽而⾔言之, server 列表並非一成不變, ⽽而是一直處於動態的增減中.那麼 client 如何才能實時的更新 server 列表呢? 解決的⽅方案很多, 本⽂文將講述利用 ZooKeeper 的解決⽅方案.

(2)思路

啟動 server 時, 在 zookeeper 的某個znode(假設為/sgroup)下建立一個子節點. 所建立的⼦子節點的型別應該為 ephemeral,這樣一來, 如果 server 程序崩潰, 或者 server 宕機, 與 zookeeper 連線的 session 就結束了, 那麼其所建立的⼦子節點會被zookeeper自動刪除. 當崩潰的 server 恢復後, 或者新增 server 時, 同樣需要在/sgroup 節點下建立新的子節點.對於 client, 只需註冊/sgroup 子節點的監聽, 當/sgroup 下的子節點增加或減少時, zookeeper 會通知 client, 此時 client 更新server 列表.

9、Shuffle階段與mapreduce設定引數

官方對Shuffle過程的描述圖

(1)Shuffle 的大致範圍:怎樣把 maptask 的輸出結果有效地傳送到 reduce 端。也可以這樣理解,Shuffle 描述著資料從 map task 輸出到 reduce task 輸入的這段過程。

(2)在 Hadoop 這樣的叢集環境中,大部分map task 與 reduce task 的執行是在不同的節點上。當然很多情況下 Reduce執行時需要跨節點去拉取其它節點上的 map task 結果。如果叢集正在執行的job 有很多,那麼task的正常執行對叢集內部的網路資源消耗會很嚴重。這種網路消耗是正常的,我們不能限制,能做的就是最大化地減少不必要的消耗。還有在節點內,相比於記憶體,磁碟 IO 對 job 完成時間的影響也是可觀的。

(3)我們對 Shuffle 過程的期望可以有:

完整地從 map task 端拉取資料到reduce 端。

在跨節點拉取資料時,儘可能地減少對頻寬的不必要消耗。

減少磁碟 IO 對task 執行的影響。

【能優化的地方主要在於減少拉取資料的量及儘量使用記憶體而不是磁碟。】

9.1 shuffle階段詳解

以 WordCount 為例,假設它有 8 個 map task 和 3 個 reduce task。Shuffle 過程橫跨 map與 reduce 兩端,所以下面我也會分兩部分來展開。

map端:

整個流程分了四步。簡單些可以這樣說,每個 map task 都有一個記憶體緩衝區,儲存著map 的輸出結果,當緩衝區快滿的時候需要將緩衝區的資料以一個臨時檔案的方式存放到磁碟,當整個 map task 結束後再對磁碟中這個 maptask 產生的所有臨時檔案做合併,生成最終的正式輸出檔案,然後等待 reducetask來拉資料。

1、map階段

1)input

在 map task 執行時,它的輸入資料來源於HDFS 的 block,當然在 MapReduce 概念中,map task 只讀取 split。Split 與 block 的對應關係可能是多對一,預設是一對一。在 WordCount 例子裡,假設 map 的輸入資料都是像“hello”這樣的字串。

2)partition

在經過 mapper 的執行後,我們得知mapper 的輸出是這樣一個 key/value 對: key 是“hello”,value 是數值 1。因為當前 map 端只做加 1 的操作,在 reduce task 裡才去合併結果集。前面我們知道這個job 有 3 個 reduce task,到底當前的“hello”應該交由哪個 reduce 去做呢,是需要現在決定的。

MapReduce提供 Partitioner 介面,它的作用就是根據 key 或 value 及 reduce 的數量來決定當前的這對輸出資料最終應該交由哪個reduce task 處理。預設對 key hash 後再以 reduce task 數量取模。預設的取模方式只是為了平均 reduce 的處理能力,如果使用者自己對Partitioner 有需求,可以訂製並設定到 job 上。

在我們的例子中,“hello”經過 Partitioner 後返回 0,也就是這對值應當交由第一個 reducer 來處理。接下來,需要將資料寫入記憶體緩衝區中,緩衝區的作用是批量收集map 結果,減少磁碟IO 的影響。我們的 key/value 對以及 Partition 的結果都會被寫入緩衝區。當然寫入之前,key 與 value 值都會被序列化成位元組陣列。整個記憶體緩衝區就是一個位元組陣列,它的位元組索引及 key/value 儲存結構我沒有研究過。

3)spill sort combiner

記憶體緩衝區是有大小限制的(io.sort.mb),預設是 100MB。當 map task 的輸出結果很多時,就可能會撐爆記憶體,所以需要在一定條件下將緩衝區中的資料臨時寫入磁碟,然後重新利用這塊緩衝區。【這個從記憶體往磁碟寫資料的過程被稱為 Spill,中文可譯為溢寫】這個溢寫是由單獨執行緒來完成,不影響往緩衝區寫 map 結果的執行緒。

溢寫執行緒啟動時不應該阻止map 的結果輸出,所以整個緩衝區有個溢寫的比例spill.percent(io.sort.spill.percent)。這個比例預設是 0.8,也就是當緩衝區的資料已經達到閾值(buffer size *spill percent = 100MB * 0.8 = 80MB),溢寫執行緒啟動,鎖定這 80MB 的記憶體,執行溢寫過程。Map task 的輸出結果還可以往剩下的 20MB 記憶體中寫,互不影響。當溢寫執行緒啟動後,需要對這 80MB 空間內的key 做排序(Sort)。排序是 MapReduce 模型預設的行為,這裡的排序也是對序列化的位元組做的排序。

因為 map task 的輸出是需要傳送到不同的 reduce 端去,而記憶體緩衝區沒有對將傳送到相同 reduce 端的資料做合併,那麼這種合併應該是體現是磁碟檔案中的。寫到磁碟中的溢寫檔案是對不同的 reduce 端的數值做過合併。所以溢寫過程一個很重要的細節在於,如果有很多個 key/value 對需要傳送到某個reduce 端去,那麼需要將這些 key/value 值拼接到一塊,減少與 partition 相關的索引記錄。

在針對每個 reduce 端而合併資料時,有些資料可能像這樣:"hello"/1,

“hello"/1。對於WordCount 例子,就是簡單地統計單詞出現的次數,如果在

同一個 map task 的結果中有很多個像“hello”一樣出現多次的 key,我們就應該把它們的值合併到一塊,這個過程叫 reduce 也叫 combiner【MapRed

uce 的術語中,reduce 只指 reduce 端執行從多個 map task 取資料做計算的

過程。除 reduce 外,非正式地合併資料只能算做 combine 了。MapReduce 中

將 Combiner 等同於 Reducer。】如果 client 設定過 Combiner,那麼現在就

是使用 Combiner 的時候了。將有相同 key 的 key/value 對的value 加起來

,減少溢寫到磁碟的資料量。

Combiner 會優化 MapReduce 的中間結果,所以它在整個模型中會多次使用。那哪些場景才能使用 Combiner 呢?從這裡分析,Combiner的輸出是 Reducer 的輸入,Combiner 絕不能改變最終的計算結果。所以從我的想法來看,Combiner只應該用於那種Reduce 的輸入key/value與輸出key/value 型別完全一致,且不影響最終結果的場景。比如累加,最大值等。Combiner 如果用好,它對 job 執行效率有幫助,反之會影響 reduce 的最終結果。

4)Merge

每次溢寫會在磁碟上生成一個溢寫檔案,如果 map 的輸出結果真的很大,有多次這樣的溢寫發生,磁碟上相應的就會有多個溢寫檔案存在。當 map task 真正完成時,記憶體緩衝區中的資料也全部溢寫到磁碟中形成一個溢寫檔案。最終磁碟中會至少有一個這樣的溢寫檔案存在(如果 map 的輸出結果很少,當 map 執行完成時,只會產生一個溢寫檔案),因為最終的檔案只有一個,所以需要將這些溢寫檔案歸併到一起,這個過程就叫做 Merge。

Merge 是怎樣的?如前面的例子,“aaa”從某個 map task 讀取過來時值是 5,從另外一個 map 讀取時值是 8,因為它們有相同的 key,所以得 merge 成 group。什麼是 group。對於“hello”就是像這樣的:{“hello”, [5, 8, 2, …]},陣列中的值就是從不同溢寫檔案中讀取出來的,然後再把這些值加起來。請注意,因為 merge 是將多個溢寫檔案合併到一個檔案,所以可能也有相同的 key存在,在這個過程中如果 client 設定過 Combiner,也會使用 Combiner來合併相同的 key。至此,map 端的所有工作都已結束,最終生成的這個檔案也存放在 TaskTracker夠得著的某個本地目錄內。每個 reduce task 不斷地通過 RPC 從 JobTracker 那裡獲取map task 是否完成的資訊,如果reduce task得到通知,獲知某臺 TaskTracker 上的 map task 執行完成,Shuffle 的後半段過程開始啟動。

2、reduce階段

1)copy

Copy 過程,簡單地拉取資料。Reduce 程序啟動一些資料 copy 執行緒(Fetcher),通過 HTTP(jetty)方式請求 map task 所在的TaskTracker 獲取 map task 的輸出檔案。因為 map task 早已結束,這些檔案就歸TaskTracker管理在本地磁碟中。

2)merge

這裡的 merge 如 map 端的 merge 動作,只是陣列中存放的是不同map 端 copy 來的數值。Copy 過來的資料會先放入記憶體緩衝區中,這裡的緩衝區大小要比 map 端的更為靈活,它基於 JVM的 heap size 設定,因為 Shuffle 階段 Reducer 不執行,所以應該把絕大部分的記憶體都給 Shuffle 用。

merge 有三種形式:1)記憶體到記憶體 2)記憶體到磁碟 3)磁碟到磁碟。預設情況下第一種形式不啟用,讓人比較困惑,當記憶體中的資料量到達一定閾值,就啟動記憶體到磁碟的 merge。與 map 端類似,這也是溢寫的過程,這個過程中如果你設定有 Combiner,也是會啟用的,然後在磁碟中生成了眾多的溢寫檔案。第二種 merge 方式一直在執行,直到沒有map 端的資料時才結束,然後啟動第三種磁碟到磁碟的 merge 方式生成最終的那個檔案。

3)input

Reducer 的輸入檔案。不斷地 merge 後,最後會生成一個“最終檔案”。為什麼加引號?因為這個檔案可能存在於磁碟上,也可能存在於記憶體中。對我們來說,當然希望它存放於記憶體中,直接作為 Reducer的輸入,但預設情況下,這個檔案是存放於磁碟中的。至於怎樣才能讓這個檔案出現在記憶體中,後面有時間我再說。當 Reducer 的輸入檔案已定,整個Shuffle 才最終結束。然後就是 Reducer 執行,把結果放到 HDFS 上。

9.2 mapreduce重要設定引數

記憶體管理

可以通過 mapred.child.ulimit 引數配置子程序可使用的最大虛擬記憶體。

注意:該屬性對單個程序設定最大限制,單位為 KB,值必須大於等於最大堆記憶體(通過-Xmx 設定)。

注意:mapred.child.java.opts 只對從 tasktracker 分配和管理的子 JVM 程序有效。其他 hadoop 守護程序記憶體引數配置詳見Configuring the Environment of the Hadoop Daemons。

mapred.task.maxvmem

int型別 以位元組為單位指定單個map 或 reduce 任務的最大虛擬記憶體。如果任務超過該值就被kill

mapred.task.maxpmem

int型別 以位元組為單位指定單個map 或 reduce 任務的最大 RAM。這個值被排程器(Jobtracer)參考作為分配 mapreduce 任務的依據,避免讓一個節點超RAM 負載使用。

Map 引數

io.sort.mb

int型別 預設100 以 MB 為單位設定序列化和元資料 buffer 的大小。

io.sort.spill.percent

float型別預設0.80 元資料和序列化資料 buffer 空間閥值。當兩者任何一個 buffer 空間達到該閥值,資料將被 spill 到磁碟。

【假設io.sort.record.percent=r,io.sort.mb=x,io.sort.spill.percent=q,那麼在 map 執行緒 spill 之前最大處理的記錄量為 r*x*q*2^16。

注意:較大的值可能降低spill 的次數甚至避免合併,但是也會增加 map 被阻塞的機率。通過精確估計 map 的輸出尺寸和減少 spill 次數可有效縮短 map 處理時間。】

io.sort.record.percent

float型別預設0.05 map 記錄序列化後資料元資料 buffer 所佔總 buffer 百分比值。為了加速排序,除了序列化後本身尺寸外每條序列化後的記錄需要 16 位元組的元資料io.sort.mb 值被佔用的百分比值超過設定值機會發生 spill。對輸出記錄較少的 map,值越高越可降低

spill 發生的次數。

Shuffle/Reduce 引數

io.sort.factor

int型別預設值 10 指定同時可合併的檔案片段數目。引數限制了開啟檔案的數目,壓縮解碼器。如果檔案數超過了該值,合併將分成多次。這個引數一般適用於 map 任務,大多數作業應該配置該項。

子 JVM重用

可以通過指定mapred.job.reuse.jvm.num.tasks 作業配置引數來啟用 jvm 重用。預設是 1,jvm 不會被重用(每個 jvm 只處理 1 個任務)。如果設定為-1,那麼一個 jvm 可以運行同一個作業的任意任務數目。使用者可以通過 JobConf.setNumTasksToExecutePerJvm(int)指定一個大於 1 的值。

-------------------------------------------------------------------------------------------------

附錄 1. Hadoop 1.0 安裝

1.a1 192.168.9.1 (master)

a2192.168.9.2 (slave1)

a3192.168.9.3 (slave2)

修改/etc/hosts

2.3臺機器 建立hadoop 使用者

hadoop 密碼:123

3.安裝JDK (3臺都安裝)

[root@a1 ~]# chmod 777jdk-6u38-ea-bin-b04-linux-i586-31_oct_2012-rpm.bin

[root@a1 ~]#./jdk-6u38-ea-bin-b04-linux-i586-31_oct_2012-rpm.bin

[root@a1 ~]# cd /usr/java/jdk1.6.0_38/

[root@a1 jdk]# vi /etc/profile

export JAVA_HOME=/usr/java/jdk1.7.0_25

export JAVA_BIN=/usr/java/jdk1.7.0_25/bin

export PATH=$PATH:$JAVA_HOME/bin

exportCLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export JAVA_HOME JAVA_BIN PATH CLASSPATH

重啟你的系統 或 source/etc/profile

[root@a1 ~]#/usr/java/jdk1.7.0_25/bin/java-version

java version "1.6.0_38-ea"

Java(TM) SE Runtime Environment (build1.6.0_38-ea-b04)

Java HotSpot(TM) Client VM (build20.13-b02, mixed mode, sharing)

4.安裝hadoop (3臺都安)

[root@a1 ~]# tar zxvf hadoop-0.20.2-cdh3u5.tar.gz-C /usr/local

編輯hadoop 配置檔案

[root@a1 ~]# cd/usr/local/hadoop-0.20.2-cdh3u5/conf/

[root@a1 conf]# vi hadoop-env.sh

新增

export JAVA_HOME=/usr/java/jdk1.7.0_25

設定namenode啟動埠

[root@a1 conf]# vi core-site.xml

新增

<configuration>

<property>

<name>fs.default.name</name>

<value>hdfs://h1:9000</value>

</property>

</configuration>

設定datanode節點數為2

[root@a1 conf]# vi hdfs-site.xml

新增

<configuration>

<property>

<name>dfs.replication</name>

<value>2</value>

</property>

</configuration>

設定jobtracker埠

[root@a1 conf]# vim mapred-site.xm

<configuration>

<property>

<name>mapred.job.tracker</name>

<value>h1:9001</value>

</property>

</configuration>

[root@a1 conf]# vi masters

改為 a1(主機名)

[root@a1 conf]# vi slaves

改為

a2

a3

拷貝到其他兩個節點

[root@a1 conf]# cd /usr/local/

[root@a1 local]# scp -r./hadoop-0.20.2-cdh3u5/ h2:/usr/local/

[root@a1 local]# scp -r./hadoop-0.20.2-cdh3u5/ h3:/usr/local/

在所有節點上執行以下操作,把/usr/local/hadoop-0.20.2-cdh3u5的所有者,所有者組改為hadoop並su成該使用者

[root@a1 ~]# chown hadoop.hadoop/usr/local/hadoop-0.20.2-cdh3u5/ -R

[root@a2 ~]# chown hadoop.hadoop/usr/local/hadoop-0.20.2-cdh3u5/ -R

[root@a3 ~]# chown hadoop.hadoop/usr/local/hadoop-0.20.2-cdh3u5/ -R

[root@a1 ~]# su - hadoop

[root@a2 ~]# su - hadoop

[root@a3 ~]# su - hadoop

所有節點上建立金鑰

[hadoop@a1 ~]$ ssh-keygen -t rsa

[hadoop@a2 ~]$ ssh-keygen -t rsa

[hadoop@a3 ~]$ ssh-keygen -t rsa

[hadoop@a1 ~]$ ssh-copy-id -i/home/hadoop/.ssh/id_rsa.pub a1

[hadoop@a1 ~]$ ssh-copy-id -i/home/hadoop/.ssh/id_rsa.pub a2

[hadoop@a1 ~]$ ssh-copy-id -i /home/hadoop/.ssh/id_rsa.puba3

[hadoop@a2 ~]$ ssh-copy-id -i/home/hadoop/.ssh/id_rsa.pub a1

[hadoop@a2 ~]$ ssh-copy-id -i/home/hadoop/.ssh/id_rsa.pub a2

[hadoop@a2 ~]$ ssh-copy-id -i/home/hadoop/.ssh/id_rsa.pub a3

[hadoop@a3 ~]$ ssh-copy-id -i /home/hadoop/.ssh/id_rsa.puba1

[hadoop@a3 ~]$ ssh-copy-id -i/home/hadoop/.ssh/id_rsa.pub a2

[hadoop@a3 ~]$ ssh-copy-id -i/home/hadoop/.ssh/id_rsa.pub a3

格式化 namenode

[hadoop@a1 ~]$ cd/usr/local/hadoop-0.20.2-cdh3u5/

[hadoop@a1 hadoop-0.20.2-cdh3u5]$ bin/hadoopnamenode -format

開啟

[hadoop@a1 hadoop-0.20.2-cdh3u5]$bin/start-all.sh

在所有節點檢視程序狀態驗證啟動

[hadoop@a1 hadoop-0.20.2-cdh3u5]$ jps

8602 JobTracker

8364 NameNode

8527 SecondaryNameNode

8673 Jps

[hadoop@a2 hadoop-0.20.2-cdh3u5]$ jps

10806 Jps

10719 TaskTracker

10610 DataNode

[hadoop@a3 hadoop-0.20.2-cdh3u5]$ jps

7605 Jps

7515 TaskTracker

7405 DataNode

[hadoop@a1 hadoop-0.20.2-cdh3u5]$bin/hadoop dfsadmin -report

2. HDFS 操作與基本api編寫

(1)操作

建立目錄

[hadoop@h101 hadoop-1.2.1]$ hadoop fs-mkdir output

列出檔案

[hadoop@h101 ~]$ hadoop fs -ls

列出某個資料夾中的檔案

[hadoop@h101 ~]$ hadoop fs -ls output

上傳abc檔案,並命名為test

[hadoop@h101 ~]$ hadoop fs -put /home/hadoop/abc test

複製檔案到本地 檔案系統

[hadoop@h101 ~]$ hadoop fs -get test /home/hadoop/cba

刪除HDFS 中的檔案

[hadoop@h101 ~]$ hadoop fs -rmr test

檢視HDFS 中的檔案

[hadoop@h101 ~]$ hadoop fs -cat test

報告HDFS 基本統計資訊

[hadoop@h101 ~]$ hadoop dfsadmin –report

【安全模式:

NameNode在啟動的時候首先進入安全模式,如果 datanode 丟失的block達到一定的比例(1-dfs.safemode.threshold.pct),則系統會一直處於安全模式狀態即只讀狀態。 dfs.safemode.threshold.pct(預設值0.999f)表示HDFS啟動的時候,如果DataNode上報的block個數達到了元資料記錄的block個數的0.999倍才可以離開安全模式,否則一直是這種只讀模式。如果設為1則HDFS永遠是處於SafeMode。

退出安全模式

[hadoop@h101 ~]$ hadoop dfsadmin -safemodeleave

進入安全模式

[hadoop@h101 ~]$ hadoop dfsadmin -safemodeenter

(2)api

1) 環境準備:

拷貝hadoop 的jar包到java下

[root@h101 ~]# cd/usr/jdk1.7.0_25/jre/lib/ext/

[root@h101 ext]# cp/home/hadoop/hadoop-1.2.1/lib/*.jar .

[root@h101 ext]# cp/home/hadoop/hadoop-1.2.1/*.jar .

[root@h101 ~]# chmod -R 777/usr/jdk1.7.0_25/

另幾個節點同樣配置

把core-site.xml和hdfs-site.xml放到自己編寫的java檔案所在目錄下

[hadoop@h101 jdk1.7.0_25]$ cp/home/hadoop/hadoop-1.2.1/conf/core-site.xml .

[hadoop@h101 jdk1.7.0_25]$ cp/home/hadoop/hadoop-1.2.1/conf/hdfs-site.xml .

2)hdfs api

A、上傳檔案到 hdfs中

vi CopyFile.java
importorg.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CopyFile {
   public static void main(String[] args) throws Exception {
       Configuration conf=new Configuration();
       FileSystem hdfs=FileSystem.get(conf);
       Pathsrc =new Path("/home/hadoop/bbb/b1");
Path dst =newPath("hdfs://h101:9000/user/hadoop");
       hdfs.copyFromLocalFile(src, dst);
       System.out.println("Uploadto"+conf.get("fs.default.name"));
       FileStatus files[]=hdfs.listStatus(dst);
       for(FileStatus file:files){
           System.out.println(file.getPath());
       }
    }
}

B、 hdfs中建立檔案

vi CreateFile.java
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CreateFile {

   public static void main(String[] args) throws Exception {
        Configuration conf=new Configuration();
       FileSystem hdfs=FileSystem.get(conf);
       byte[] buff="hello hadoop world!n".getBytes();
       Path dfs=new Path("hdfs://h101:9000/user/hadoop/hellow.txt");
       FSDataOutputStream outputStream=hdfs.create(dfs);
       outputStream.write(buff,0,buff.length);
    }
}

C、 建立HDFS目錄

vi CreateDir.java
importorg.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CreateDir {
   public static void main(String[] args) throws Exception{
       Configuration conf=new Configuration();
       FileSystem hdfs=FileSystem.get(conf);
       Path dfs=new Path("hdfs://h101:9000/user/hadoop/TestDir");
       hdfs.mkdirs(dfs);
    }

}

D、 hdfs 重新命名檔案

vi Rename.java
importorg.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class Rename{
   public static void main(String[] args) throws Exception {
       Configuration conf=new Configuration();
       FileSystem hdfs=FileSystem.get(conf);
       Path frpaht=new Path("hdfs://h101:9000/user/hadoop/b1");    //舊的檔名
       Path topath=newPath("hdfs://h101:9000/user/hadoop/bb111");    //新的文名
       booleanisRename=hdfs.rename(frpaht, topath);
       String result=isRename?"成功":"失敗";
       System.out.println("檔案重新命名結果為:"+result);
    }
}

D、刪除hdfs 上的檔案

vi DeleteFile.java
importorg.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class DeleteFile {
   public static void main(String[] args) throws Exception {
       Configuration conf=new Configuration();
       FileSystem hdfs=FileSystem.get(conf);
       Path delef=new Path("hdfs://h101:9000/user/hadoop/bb111");
       boolean isDeleted=hdfs.delete(delef,false);
       //遞迴刪除
       //boolean isDeleted=hdfs.delete(delef,true);
       System.out.println("Delete?"+isDeleted);
    }
}

E、檢視檔案是否存在

vi CheckFile.java
importorg.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; 
public class CheckFile {
   public static void main(String[] args) throws Exception {
       Configuration conf=new Configuration();
       FileSystem hdfs=FileSystem.get(conf);
       Path findf=newPath("hdfs://h101:9000/user/hadoop/hellow.txt");
       boolean isExists=hdfs.exists(findf);
       System.out.println("Exist?"+isExists);
    }
}

F、檢視HDFS檔案最好修改時間

vi GetLTime.java
importorg.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class GetLTime {
   public static void main(String[] args) throws Exception {
       Configuration conf=new Configuration();
       FileSystem hdfs=FileSystem.get(conf);
       Path fpath =newPath("hdfs://h101:9000/user/hadoop/hellow.txt");
       FileStatus fileStatus=hdfs.getFileStatus(fpath);
       long modiTime=fileStatus.getModificationTime();
       System.out.println("file1.txt的修改時間是"+modiTime);
    }
}

file1.txt的修改時間是1406749398648

****時間格式:Coordinated Universal Time(CUT) 協調世界時

G、 檢視 hdfs檔案

vi URLcat.java
import java.io.InputStream;
import java.net.URL;
importorg.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
public class URLcat{
       static {
               URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
       }
       public static void main(String[] args) throws Exception {
                InputStream in = null;
                try {
                        in = newURL(args[0]).openStream();
                        IOUtils.copyBytes(in,System.out, 4086, false);
                } finally {
                       IOUtils.closeStream(in);
                }
       }
}

[hadoop@h101 jdk1.7.0_25]$/usr/jdk1.7.0_25/bin/javac URLcat.java

[hadoop@h101 jdk1.7.0_25]$

/usr/jdk1.7.0_25/bin/java URLcathdfs://h101:9000/user/hadoop/test.txt

(3)客戶端和HDFS伺服器端配置檔案的關係

客戶端的配置檔名與伺服器端相同,欄位名也相同

客戶端不會從HDFS叢集端同步配置檔案

客戶端只使用部分配置資訊

fs.default.name

dfs.block.size

dfs.replication

如果客戶端沒有配置資訊,則使用客戶端Hadoop程式包裡的預設值

而不是伺服器端的值

(4)HDFS的安全性和使用者認證

預設情況下,Hadoop不啟用認證

採用客戶端系統的登入使用者名稱

或可以通過API設定

從而,雖然HDFS有許可權控制,但並沒有安全性可言

可以在NameNode上啟用使用者認證

目前只支援Kerberos

可以與LDAP整合

3.Zookeeper 的主流應用場景實現思路

配置管理

集中式的配置管理在應用叢集中是非常常見的,⼀般商業公司內部都會實現一套集中的配置管理中⼼心,應對不同的應用叢集對於共享各自配置的需求,並且在配置變更時能夠通知到叢集中的每一個機器。

Zookeeper 很容易實現這種集中式的配置管理:

(1)將 APP1 的所有配置配置到/APP1 znode 下。

(2)APP1 所有機器一啟動就對/APP1 這個節點進⾏行監控(zk.exist("/APP1",true)),並且實現回撥⽅方法 Watcher。

(3)在 zookeeper 上/APP1 znode 節點下資料發⽣生變化的時候,每個機器都會收到通知,Watcher ⽅方法將會被執⾏。

(4)那麼應用再取下資料即可(zk.getData("/APP1",false,null))。

【以上這個例⼦子只是簡單的粗顆粒度配置監控,細顆粒度的資料可以進⾏行分層級監控,這⼀一切都是可以設計和控制的。】

叢集管理

應用叢集中,我們常常需要讓每⼀一個機器知道叢集中(或依賴的其他某一個叢集)哪些機器是活著的,並且在叢集機器因為宕機,⽹網路斷鏈等原因能夠不在人⼯介⼊的情況下迅速通知到每⼀個機器。

Zookeeper 同樣很容易實現這個功能:

(1)我在 zookeeper 伺服器端有⼀一個 znode 叫/APP1SERVERS,那麼

(2)叢集中每⼀一個機器啟動的時候都去這個節點下建立⼀一個 EPHEMERAL 型別的節點,比如 server1 建立/APP1SERVERS/SERVER1(可以使用 ip,保證不重複),server2 建立/APP1SERVERS/SERVER2,

(3)SERVER1 和 SERVER2 都 watch /APP1SERVERS 這個父節點,那麼也就是這個父節點下資料或者子節點變化都會通知對該節點進行 watch 的客戶端。

【因為 EPHEMERAL 型別節點有一個很重要的特性,就是客戶端和伺服器端連線斷掉或者 session 過期就會使節點消失,那麼在某一個機器掛掉或者斷鏈的時候,其對應的節點就會消失,然後叢集中所有對/APP1SERVERS 進⾏行 watch 的客戶端都會收到通知,然後取得最新列表即可。】

叢集選 master

一旦master 掛掉能夠馬上能從 slave 中選出一個master,實現步驟和前者⼀一樣

(1)機器在啟動的時候在 APP1SERVERS 建立的節點型別變為EPHEMERAL

_SEQUENTIAL型別,這樣每個節點會自動被編號,【例如 :zookeeperTest2.java】

(2) 我們預設規定編號最小的為 master。

所以當我們對/APP1SERVERS節點做監控的時候,得到伺服器列表,只要所有叢集機器邏輯認為最小編號節點為master,那麼 master 就被選出,⽽而這個 master 宕機的時候,相應的 znode 會消失,然後新的伺服器列表就被推送到客戶端,然後每個節點邏輯認為最小編號節點為 master,這樣就做到動態master 選舉。

4.Mapreduce實現WordCount演算法