HDFS原始碼分析資料塊校驗之DataBlockScanner
DataBlockScanner是執行在資料節點DataNode上的一個後臺執行緒。它為所有的塊池管理塊掃描。針對每個塊池,一個BlockPoolSliceScanner物件將會被建立,其執行在一個單獨的執行緒中,為該塊池掃描、校驗資料塊。當一個BPOfferService服務變成活躍或死亡狀態,該類中的blockPoolScannerMap將會更新。
我們先看下DataBlockScanner的成員變數,如下:
首先是由建構函式確定的三個成員變數:所屬資料節點DataNode例項datanode、所屬儲存FsDatasetSpi例項dataset、配置資訊Configuration例項conf,對應建構函式如下:// 所屬資料節點DataNode例項 private final DataNode datanode; // 所屬儲存FsDatasetSpi例項 private final FsDatasetSpi<? extends FsVolumeSpi> dataset; // 配置資訊Configuration例項 private final Configuration conf; // 執行緒休眠週期,5s static final int SLEEP_PERIOD_MS = 5 * 1000; /** * Map to find the BlockPoolScanner for a given block pool id. This is updated * when a BPOfferService becomes alive or dies. * 儲存塊池ID到對應BlockPoolScanner例項的對映。 * 當一個BPOfferService服務變成活躍或死亡狀態,blockPoolScannerMap將會隨之更新。 */ private final TreeMap<String, BlockPoolSliceScanner> blockPoolScannerMap = new TreeMap<String, BlockPoolSliceScanner>(); // 資料塊掃描執行緒 Thread blockScannerThread = null;
然後設定了一個靜態變數,5s的執行緒休眠週期,即SLEEP_PERIOD_MS,另外兩個重要的成員變數是:// 建構函式 DataBlockScanner(DataNode datanode, FsDatasetSpi<? extends FsVolumeSpi> dataset, Configuration conf) { this.datanode = datanode; this.dataset = dataset; this.conf = conf; }
1、TreeMap<String, BlockPoolSliceScanner> blockPoolScannerMap
儲存塊池ID到對應BlockPoolScanner例項的對映。當一個BPOfferService服務變成活躍或死亡狀態,blockPoolScannerMap將會隨之更新。
2、Thread blockScannerThread
資料塊掃描執行緒。
既然DataBlockScanner實現了Runnable介面,那麼它肯定是作為一個執行緒在DataNode節點上執行的,我們看下DataNode是如何對其進行構造及啟動的,程式碼如下:
/**
* See {@link DataBlockScanner}
*/
private synchronized void initDataBlockScanner(Configuration conf) {
// 如果blockScanner不為null,直接返回
if (blockScanner != null) {
return;
}
// 資料塊校驗功能無法開啟的原因
String reason = null;
assert data != null;
// 如果引數dfs.datanode.scan.period.hours未配置,或者配置為0,說明資料塊校驗功能已關閉
if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) {
reason = "verification is turned off by configuration";
// SimulatedFSDataset不支援資料塊校驗
} else if ("SimulatedFSDataset".equals(data.getClass().getSimpleName())) {
reason = "verifcation is not supported by SimulatedFSDataset";
}
// 如果資料塊校驗功能無法開啟的原因為null,構造DataBlockScanner例項,並呼叫其start()方法啟動該執行緒
if (reason == null) {
blockScanner = new DataBlockScanner(this, data, conf);
blockScanner.start();
} else {
// 否則在日誌檔案中記錄週期性資料塊校驗掃描無法啟用的原因
LOG.info("Periodic Block Verification scan disabled because " + reason);
}
}
首先,如果blockScanner不為null,直接返回,說明之前已經初始化並啟動了,然後,確定資料塊校驗功能無法開啟的原因reason:1、如果引數dfs.datanode.scan.period.hours未配置,或者配置為0,說明資料塊校驗功能已關閉;
2、SimulatedFSDataset不支援資料塊校驗;
如果資料塊校驗功能無法開啟的原因為null,構造DataBlockScanner例項,並呼叫其start()方法啟動該執行緒,否則在日誌檔案中記錄週期性資料塊校驗掃描無法啟用的原因。
DataBlockScanner執行緒啟動的start()方法如下:
public void start() {
// 基於DataBlockScanner例項建立一個執行緒blockScannerThread
blockScannerThread = new Thread(this);
// 將執行緒blockScannerThread設定為後臺執行緒
blockScannerThread.setDaemon(true);
// 啟動執行緒blockScannerThread
blockScannerThread.start();
}
實際上它是基於DataBlockScanner例項建立一個執行緒blockScannerThread,將執行緒blockScannerThread設定為後臺執行緒,然後啟動執行緒blockScannerThread。DataBlockScanner執行緒已建立,並啟動,那麼我們看下它是如何工作的,接下來看下它的run()方法,程式碼如下:
// 執行緒核心run()方法
@Override
public void run() {
// 當前塊池ID,預設為空
String currentBpId = "";
// 第一次執行標誌,預設當然應該為true
boolean firstRun = true;
// 如果所屬資料節點DataNode例項datanode正常執行,且當前執行緒沒有被中斷
while (datanode.shouldRun && !Thread.interrupted()) {
//Sleep everytime except in the first iteration.
// 如果不是第一次執行,執行緒休眠5s
if (!firstRun) {
try {
Thread.sleep(SLEEP_PERIOD_MS);
} catch (InterruptedException ex) {
// Interrupt itself again to set the interrupt status
// 如果發生InterruptedException異常,中斷blockScannerThread執行緒,然後跳過,繼續下一輪迴圈
blockScannerThread.interrupt();
continue;
}
} else {
// 第一次執行時先將firstRun標誌設定為false
firstRun = false;
}
// 獲取下一個塊池切片掃描器BlockPoolSliceScanner例項bpScanner
BlockPoolSliceScanner bpScanner = getNextBPScanner(currentBpId);
// 如果bpScanner為null,跳過,繼續下一輪迴圈
if (bpScanner == null) {
// Possible if thread is interrupted
continue;
}
// 設定當前塊池ID,即currentBpId,從塊池切片掃描器BlockPoolSliceScanner例項bpScanner中獲取
currentBpId = bpScanner.getBlockPoolId();
// If BPOfferService for this pool is not alive, don't process it
// 如果當前塊池對應的心跳服務BPOfferService不是活躍的,不對它進行處理,呼叫removeBlockPool()方法從blockPoolScannerMap中移除資料,
// 並關閉對應BlockPoolSliceScanner,然後跳過,執行下一輪迴圈
if (!datanode.isBPServiceAlive(currentBpId)) {
LOG.warn("Block Pool " + currentBpId + " is not alive");
// Remove in case BP service died abruptly without proper shutdown
removeBlockPool(currentBpId);
continue;
}
// 呼叫塊池切片掃描器BlockPoolSliceScanner例項bpScanner的scanBlockPoolSlice()方法,
// 掃描對應塊池裡的資料塊,進行資料塊校驗
bpScanner.scanBlockPoolSlice();
}
// Call shutdown for each allocated BlockPoolSliceScanner.
// 退出迴圈後,遍歷blockPoolScannerMap中的每個BlockPoolSliceScanner例項bpss,
// 挨個呼叫對應shutdown()方法,停止塊池切片掃描器BlockPoolSliceScanner
for (BlockPoolSliceScanner bpss: blockPoolScannerMap.values()) {
bpss.shutdown();
}
}
run()方法邏輯比較清晰,大體如下:1、首先初始化當前塊池ID,即currentBpId,預設為空,再確定第一次執行標誌firstRun,預設當然應該為true;
2、接下來進入一個while迴圈,迴圈的條件是如果所屬資料節點DataNode例項datanode正常執行,且當前執行緒沒有被中斷:
2.1、處理第一次執行標誌位firstRun:
2.1.1、如果不是第一次執行,執行緒休眠5s:即firstRun為false,這時如果發生InterruptedException異常,中斷blockScannerThread執行緒,然後跳過,繼續下一輪迴圈;
2.1.2、第一次執行時先將firstRun標誌設定為false;
2.2、獲取下一個塊池切片掃描器BlockPoolSliceScanner例項bpScanner,通過呼叫getNextBPScanner()方法,傳入當前塊池ID,即currentBpId來實現,首次迴圈,currentBpId為空,後續會傳入之前處理的值,下面會對其進行更新;
2.3、如果bpScanner為null,跳過,繼續下一輪迴圈;
2.4、設定當前塊池ID,即currentBpId,從塊池切片掃描器BlockPoolSliceScanner例項bpScanner中獲取;
2.5、如果當前塊池對應的心跳服務BPOfferService不是活躍的,不對它進行處理,呼叫removeBlockPool()方法從blockPoolScannerMap中移除資料,並關閉對應BlockPoolSliceScanner,然後跳過,執行下一輪迴圈;
2.6、呼叫塊池切片掃描器BlockPoolSliceScanner例項bpScanner的scanBlockPoolSlice()方法,掃描對應塊池裡的資料塊,進行資料塊校驗;
3、退出迴圈後,遍歷blockPoolScannerMap中的每個BlockPoolSliceScanner例項bpss,挨個呼叫對應shutdown()方法,停止塊池切片掃描器BlockPoolSliceScanner。
我們接下來看下比較重要的getNextBPScanner()方法,程式碼如下:
/**
* Find next block pool id to scan. There should be only one current
* verification log file. Find which block pool contains the current
* verification log file and that is used as the starting block pool id. If no
* current files are found start with first block-pool in the blockPoolSet.
* However, if more than one current files are found, the one with latest
* modification time is used to find the next block pool id.
* 尋找下一個塊池ID以進行scan。
* 此時應該只有一個當前驗證日誌檔案。
*/
private BlockPoolSliceScanner getNextBPScanner(String currentBpId) {
String nextBpId = null;
// 如果所屬資料節點DataNode例項datanode正常執行,且當前blockScannerThread執行緒沒有被中斷
while (datanode.shouldRun && !blockScannerThread.isInterrupted()) {
// 等待初始化
waitForInit();
synchronized (this) {
// 當blockPoolScannerMap大小大於0,即存在BlockPoolSliceScanner例項時,做以下處理:
if (getBlockPoolSetSize() > 0) {
// Find nextBpId by the minimum of the last scan time
// lastScanTime用於記錄上次瀏覽時間
long lastScanTime = 0;
// 遍歷blockPoolScannerMap集合,取出每個塊池ID,即bpid
for (String bpid : blockPoolScannerMap.keySet()) {
// 根據塊池ID,即bpid,取出其對應BlockPoolSliceScanner例項的上次瀏覽時間t
final long t = getBPScanner(bpid).getLastScanTime();
// 如果t不為0,且如果塊池ID為null,或者t小於lastScanTime,則將t賦值給lastScanTime,bpid賦值給nextBpId
// 也就是計算最早的上次瀏覽時間lastScanTime,和對應塊池ID,即nextBpId
if (t != 0L) {
if (bpid == null || t < lastScanTime) {
lastScanTime = t;
nextBpId = bpid;
}
}
}
// nextBpId can still be null if no current log is found,
// find nextBpId sequentially.
// 如果對應塊池ID,即nextBpId為null,則取比上次處理的塊池currentBpId高的key作為nextBpId,
// 如果還不能取出的話,那麼取第一個塊池ID,作為nextBpId
if (nextBpId == null) {
nextBpId = blockPoolScannerMap.higherKey(currentBpId);
if (nextBpId == null) {
nextBpId = blockPoolScannerMap.firstKey();
}
}
// 如果nextBpId不為空,那麼從blockPoolScannerMap中獲取其對應BlockPoolSliceScanner例項返回
if (nextBpId != null) {
return getBPScanner(nextBpId);
}
}
}
// 記錄warn日誌,No block pool is up, going to wait,然後等待
LOG.warn("No block pool is up, going to wait");
try {
// 執行緒休眠5s
Thread.sleep(5000);
} catch (InterruptedException ex) {
LOG.warn("Received exception: " + ex);
blockScannerThread.interrupt();
return null;
}
}
return null;
}
它的主要作用就是尋找下一個塊池ID以進行scan,其存在一個整體的while迴圈,迴圈的條件為如果所屬資料節點DataNode例項datanode正常執行,且當前blockScannerThread執行緒沒有被中斷,迴圈內做以下處理:1、呼叫waitForInit()方法等待初始化;
2、當前物件上使用synchronized進行同步,當blockPoolScannerMap大小大於0,即存在BlockPoolSliceScanner例項時,做以下處理:
2.1、設定lastScanTime用於記錄上次瀏覽時間,預設值為0;
2.2、遍歷blockPoolScannerMap集合,取出每個塊池ID,即bpid,計算最早的上次瀏覽時間lastScanTime,和對應塊池ID,即nextBpId:
2.2.1、根據塊池ID,即bpid,取出其對應BlockPoolSliceScanner例項的上次瀏覽時間t;
2.2.2、如果t不為0,且如果塊池ID為null,或者t小於lastScanTime,則將t賦值給lastScanTime,bpid賦值給nextBpId,也就是計算最早的上次瀏覽時間lastScanTime,和對應塊池ID,即nextBpId;
2.3、如果對應塊池ID,即nextBpId為null,則取比上次處理的塊池currentBpId高的key作為nextBpId,如果還不能取出的話,那麼取第一個塊池ID,作為nextBpId;
2.4、如果nextBpId不為空,那麼從blockPoolScannerMap中獲取其對應BlockPoolSliceScanner例項返回;
3、如果blockPoolScannerMap大小等於0,或者上述2找不到的話,記錄warn日誌,No block pool is up, going to wait,然後等待5s後繼續下一輪迴圈;
最後,實在找不到就返回null。
可見,getNextBPScanner()方法優先選取最早處理過的塊池,找不到的話再按照之前處理過的塊池ID增長的順序,找下一個塊池ID,按照塊池ID大小順序到尾部的話,再折回取第一個。
其中等待初始化的waitForInit()方法比較簡單,程式碼如下:
// Wait for at least one block pool to be up
private void waitForInit() {
// 如果BlockPoolSliceScanner的個數小於資料節點所有BpOS個數,或者BlockPoolSliceScanner的個數小於1,一直等待
// BpOS你可以理解為DataNode上每個塊池或名稱空間對應的一個例項,它處理該名稱空間到對應活躍或備份狀態NameNode的心跳。
while ((getBlockPoolSetSize() < datanode.getAllBpOs().length)
|| (getBlockPoolSetSize() < 1)) {
try {
// 執行緒休眠5s
Thread.sleep(SLEEP_PERIOD_MS);
} catch (InterruptedException e) {
// 如果發生InterruptedException異常,中斷blockScannerThread執行緒,然後返回
blockScannerThread.interrupt();
return;
}
}
}
它本質上是等所有塊池都被上報至blockPoolScannerMap集合後,才認為已完成初始化,然後再挑選塊池ID,否則執行緒休眠5s,繼續等待。程式碼註釋比較詳細,這裡不再贅述!獲取到塊池ID,並獲取到其對應的塊池切片掃描器BlockPoolSliceScanner例項bpScanner了,接下來就是呼叫bpScanner的scanBlockPoolSlice()方法,掃描該塊池的資料塊,並做資料塊校驗工作了。這方面的內容,請閱讀《HDFS原始碼分析資料塊校驗之BlockPoolSliceScanner》一文,這裡不再做介紹。
到了這裡,各位看官可能有個疑問,選取塊池所依賴的blockPoolScannerMap集合中的資料是哪裡來的呢?答案就在處理資料節點心跳的BPServiceActor執行緒中,在完成資料塊彙報、處理來自名位元組點NameNode的相關命令等操作後,有如下程式碼被執行:
// Now safe to start scanning the block pool.
// If it has already been started, this is a no-op.
// 現在可以安全地掃描塊池,如果它已經啟動,這是一個空操作。
if (dn.blockScanner != null) {
dn.blockScanner.addBlockPool(bpos.getBlockPoolId());
}
很簡單,資料節點彙報資料塊給名位元組點,並執行來自名位元組點的相關命令後,就可以通過資料節點DataNode中成員變數blockScanner的addBlockPool()方法,新增塊池,程式碼如下: public synchronized void addBlockPool(String blockPoolId) {
// 如果blockPoolScannerMap集合中存在塊池blockPoolId,直接返回
if (blockPoolScannerMap.get(blockPoolId) != null) {
return;
}
// 根據塊池blockPoolId、資料節點datanode、儲存dataset、配置資訊conf等構造BlockPoolSliceScanner例項bpScanner
BlockPoolSliceScanner bpScanner = new BlockPoolSliceScanner(blockPoolId,
datanode, dataset, conf);
// 將塊池blockPoolId與bpScanner的對映關係儲存到blockPoolScannerMap中
blockPoolScannerMap.put(blockPoolId, bpScanner);
// 記錄日誌資訊
LOG.info("Added bpid=" + blockPoolId + " to blockPoolScannerMap, new size="
+ blockPoolScannerMap.size());
}
邏輯很簡單,首先需要看看blockPoolScannerMap集合中是否存在塊池blockPoolId,存在即返回,否則根據塊池blockPoolId、資料節點datanode、儲存dataset、配置資訊conf等構造BlockPoolSliceScanner例項bpScanner,將塊池blockPoolId與bpScanner的對映關係儲存到blockPoolScannerMap中,最後記錄日誌資訊。我們在上面也提到了如果當前塊池對應的心跳服務BPOfferService不是活躍的,那麼會呼叫removeBlockPool()方法,移除對應的塊池,程式碼如下:
public synchronized void removeBlockPool(String blockPoolId) {
// 根據塊池blockPoolId,從blockPoolScannerMap中移除資料,並得到對應BlockPoolSliceScanner例項bpss
BlockPoolSliceScanner bpss = blockPoolScannerMap.remove(blockPoolId);
// 呼叫bpss的shutdown()方法,關閉bpss
if (bpss != null) {
bpss.shutdown();
}
// 記錄日誌資訊
LOG.info("Removed bpid="+blockPoolId+" from blockPoolScannerMap");
}
程式碼很簡單,不再贅述。總結
DataBlockScanner是執行在資料節點DataNode上的一個後臺執行緒,它負責管理所有塊池的資料塊掃描工作。當資料節點DataNode傳送心跳給名位元組點NameNode進行資料塊彙報並執行完返回的命令時,會在DataBlockScanner的內部集合blockPoolScannerMap中註冊塊池ID與為此新建立的BlockPoolSliceScanner物件的關係,然後DataBlockScanner內部執行緒blockScannerThread週期性的挑選塊池currentBpId,並獲取塊池切片掃描器BlockPoolSliceScanner例項bpScanner,繼而呼叫其scanBlockPoolSlice()方法,掃描對應塊池裡的資料塊,進行資料塊校驗。塊池選擇的主要依據就是優先選擇掃描時間最早的,也就是自上次掃描以來最長時間沒有進行掃描的,按照這一依據選擇不成功的話,則預設按照塊池ID遞增的順序迴圈選取塊池。