1. 程式人生 > >HDFS集中式的緩存管理原理與代碼剖析

HDFS集中式的緩存管理原理與代碼剖析

邏輯 dna 取數 datanode 文件的 組成 上層 data 讀取

轉載自:http://www.infoq.com/cn/articles/hdfs-centralized-cache/

HDFS集中式的緩存管理原理與代碼剖析

Hadoop 2.3.0已經發布了,其中最大的亮點就是集中式的緩存管理(HDFS centralized cache management)。這個功能對於提升Hadoop系統和上層應用的執行效率與實時性有很大幫助,本文從原理、架構和代碼剖析三個角度來探討這一功能。

主要解決了哪些問題

  1. 用戶可以根據自己的邏輯指定一些經常被使用的數據或者高優先級任務對應的數據,讓他們常駐內存而不被淘汰到磁盤。例如在Hive或Impala構建的數據倉庫應用中fact表會頻繁地與其他表做JOIN,顯然應該讓fact常駐內存,這樣DataNode在內存使用緊張的時候也不會把這些數據淘汰出去,同時也實現了對於 mixed workloads的SLA。
  2. centralized cache是由NameNode統一管理的,那麽HDFS client(例如MapReduce、Impala)就可以根據block被cache的分布情況去調度任務,做到memory-locality。
  3. HDFS原來單純靠DataNode的OS buffer cache,這樣不但沒有把block被cache的分布情況對外暴露給上層應用優化任務調度,也有可能會造成cache浪費。例如一個block的三個replica分別存儲在三個DataNote 上,有可能這個block同時被這三臺DataNode的OS buffer cache,那麽從HDFS的全局看就有同一個block在cache中存了三份,造成了資源浪費。
  4. 加快HDFS client讀速度。過去NameNode處理讀請求時只根據拓撲遠近決定去哪個DataNode讀,現在還要加入speed的因素。當HDFS client和要讀取的block被cache在同一臺DataNode的時候,可以通過zero-copy read直接從內存讀,略過磁盤I/O、checksum校驗等環節。
  5. 即使數據被cache的DataNode節點宕機,block移動,集群重啟,cache都不會受到影響。因為cache被NameNode統一管理並被被持久化到FSImage和EditLog,如果cache的某個block的DataNode宕機,NameNode會調度其他存儲了這個replica的DataNode,把它cache到內存。

基本概念

cache directive: 表示要被cache到內存的文件或者目錄。
cache pool: 用於管理一系列的cache directive,類似於命名空間。同時使用UNIX風格的文件讀、寫、執行權限管理機制。命令例子:

hdfs cacheadmin -addDirective -path /user/hive/warehouse/fact.db/city -pool financial -replication 1

以上代碼表示把HDFS上的文件city(其實是Hive上的一個fact表)放到HDFS centralized cache的financial這個cache pool下,而且這個文件只需要被緩存一份。

系統架構與原理

技術分享

用戶可以通過hdfs cacheadmin命令行或者HDFS API顯式指定把HDFS上的某個文件或者目錄放到HDFS centralized cache中。這個centralized cache由分布在每個DataNode節點的off-heap內存組成,同時被NameNode統一管理。每個DataNode節點使用mmap/mlock把存儲在磁盤文件中的HDFS block映射並鎖定到off-heap內存中。

DFSClient讀取文件時向NameNode發送getBlockLocations RPC請求。NameNode會返回一個LocatedBlock列表給DFSClient,這個LocatedBlock對象裏有這個block的replica所在的DataNode和cache了這個block的DataNode。可以理解為把被cache到內存中的replica當做三副本外的一個高速的replica。

註:centralized cachedistributed cache的區別:

distributed cache將文件分發到各個DataNode結點本地磁盤保存,並且用完後並不會被立即清理的,而是由專門的一個線程根據文件大小限制和文件數目上限周期性進行清理。本質上distributed cache只做到了disk locality,而centralized cache做到了memory locality

實現邏輯與代碼剖析

HDFS centralized cache涉及到多個操作,其處理邏輯非常類似。為了簡化問題,以addDirective這個操作為例說明。

1.NameNode處理邏輯

技術分享

NameNode內部主要的組件如圖所示。FSNamesystem裏有個CacheManager是centralized cache在NameNode端的核心組件。我們都知道BlockManager負責管理分布在各個DataNode上的block replica,而CacheManager則是負責管理分布在各個DataNode上的block cache。

DFSClient給NameNode發送名為addCacheDirective的RPC, 在ClientNamenodeProtocol.proto這個文件中定義相應的接口。

NameNode接收到這個RPC之後處理,首先把這個需要被緩存的Path包裝成CacheDirective加入CacheManager所管理的directivesByPath中。這時對應的File/Directory並沒有被cache到內存。

一旦CacheManager那邊添加了新的CacheDirective,觸發CacheReplicationMonitor.rescan()來掃描並把需要通知DataNode做cache的block加入到CacheReplicationMonitor. cachedBlocks映射中。這個rescan操作在NameNode啟動時也會觸發,同時在NameNode運行期間以固定的時間間隔觸發。

Rescan()函數主要邏輯如下:

rescanCacheDirectives()->rescanFile():依次遍歷每個等待被cache的directive(存儲在CacheManager. directivesByPath裏),把每個等待被cache的directive包含的block都加入到CacheReplicationMonitor.cachedBlocks集合裏面。

rescanCachedBlockMap():調用CacheReplicationMonitor.addNewPendingCached()為每個等待被cache的block選擇一個合適的DataNode去cache(一般是選擇這個block的三個replica所在的DataNode其中的剩余可用內存最多的一個),加入對應的DatanodeDescriptor的pendingCached列表。

2.NameNode與DataNode的RPC邏輯

DataNode定期向NameNode發送heartbeat RPC用於表明它還活著,同時DataNode還會向NameNode定期發送block report(默認6小時)和cache block(默認10秒)用於同步block和cache的狀態。

NameNode會在每次處理某一DataNode的heartbeat RPC時順便檢查該DataNode的pendingCached列表是否為空,不為空的話發送DatanodeProtocol.DNA_CACHE命令給具體的DataNode去cache對應的block replica。

3.DataNode處理邏輯

技術分享

DataNode內部主要的組件如圖所示。DataNode啟動的時候只是檢查了一下dfs.datanode.max.locked.memory是否超過了OS的限制,並沒有把留給Cache使用的內存空間鎖定。

在DataNode節點上每個BlockPool對應有一個BPServiceActor線程向NameNode發送heartbeat、接收response並處理。如果接收到來自NameNode的RPC裏面的命令是DatanodeProtocol.DNA_CACHE,那麽調用FsDatasetImpl.cacheBlock()把對應的block cache到內存。

這個函數先是通過RPC傳過來的blockId找到其對應的FsVolumeImpl (因為執行cache block操作的線程cacheExecutor是綁定在對應的FsVolumeImpl裏的);然後調用FsDatasetCache.cacheBlock()把這個block封裝成MappableBlock加入到mappableBlockMap裏統一管理起來,然後向對應的FsVolumeImpl.cacheExecutor線程池提交一個CachingTask異步任務(cache的過程是異步執行的)。

FsDatasetCache有個成員mappableBlockMap(HashMap)管理著這臺DataNode的所有的MappableBlock及其狀態(caching/cached/uncaching)。目前DataNode中”哪些block被cache到內存裏了”也是只保存了soft state(和NameNode的block map一樣),是DataNode向NameNode 發送heartbeat之後從NameNode那問回來的,沒有持久化到DataNode本地硬盤。

CachingTask的邏輯: 調用MappableBlock.load()方法把對應的block從DataNode本地磁盤通過mmap映射到內存中,然後通過mlock鎖定這塊內存空間,並對這個映射到內存的block做checksum檢驗其完整性。這樣對於memory-locality的DFSClient就可以通過zero-copy直接讀內存中的block而不需要校驗了。

4.DFSClient讀邏輯:

HDFS的讀主要有三種: 網絡I/O讀 -> short circuit read -> zero-copy read。網絡I/O讀就是傳統的HDFS讀,通過DFSClient和Block所在的DataNode建立網絡連接傳輸數據。

當DFSClient和它要讀取的block在同一臺DataNode時,DFSClient可以跨過網絡I/O直接從本地磁盤讀取數據,這種讀取數據的方式叫short circuit read。目前HDFS實現的short circuit read是通過共享內存獲取要讀的block在DataNode磁盤上文件的file descriptor(因為這樣比傳遞文件目錄更安全),然後直接用對應的file descriptor建立起本地磁盤輸入流,所以目前的short circuit read也是一種zero-copy read。

增加了Centralized cache的HDFS的讀接口並沒有改變。DFSClient通過RPC獲取LocatedBlock時裏面多了個成員表示哪個DataNode把這個block cache到內存裏面了。如果DFSClient和該block被cache的DataNode在一起,就可以通過zero-copy read大大提升讀效率。而且即使在讀取的過程中該block被uncache了,那麽這個讀就被退化成了本地磁盤讀,一樣能夠獲取數據。

對上層應用的影響

對於HDFS上的某個目錄已經被addDirective緩存起來之後,如果這個目錄裏新加入了文件,那麽新加入的文件也會被自動緩存。這一點對於Hive/Impala式的應用非常有用。

HBase in-memory table:可以直接把某個HBase表的HFile放到centralized cache中,這會顯著提高HBase的讀性能,降低讀請求延遲。

和Spark RDD的區別:多個RDD的之間的讀寫操作可能完全在內存中完成,出錯就重算。HDFS centralized cache中被cache的block一定是先寫到磁盤上的,然後才能顯式被cache到內存。也就是說只能cache讀,不能cache寫。

目前的centralized cache不是DFSClient讀了誰就會把誰cache,而是需要DFSClient顯式指定要cache誰,cache多長時間,淘汰誰。目前也沒有類似LRU的置換策略,如果內存不夠用的時候需要client顯式去淘汰對應的directive到磁盤。

現在還沒有跟YARN整合,需要用戶自己調整好留給DataNode用於cache的內存和NodeManager的內存使用。

HDFS集中式的緩存管理原理與代碼剖析