1. 程式人生 > >hdfs硬碟中dfs.data.dir相關和一些說明

hdfs硬碟中dfs.data.dir相關和一些說明

HDFS 通過 dfs.data.dir 欄位在配置檔案中查詢 DFS 的資料在本地檔案系統中的存放位置。如果在伺服器上配置了多塊硬碟(假設都已經掛載到本地檔案系統中),我們希望 HDFS 能儘量均衡、充分的利用磁碟。理論上 HDFS 也確實能勝任這項工作。在 HDFS 中,這樣的一個存放資料的本地檔案系統中的目錄被稱為 volume
直接定位到 Datanode.java 中的程式碼:

1
2
3
4
5
6
7
8
9
10
11
12
13
    public static DataNode createDataNode(String args[],  Configuration conf)
throws IOException {
        DataNode dn = instantiateDataNode(args, conf);
        runDatanodeDaemon(dn);
        return dn;
    }

    public static DataNode instantiateDataNode(String args[], Configuration conf) throws IOException {
        //...
       String[] dataDirs = conf.getStrings("dfs.data.dir"
);
       dnThreadName = "DataNode: [" +
                            StringUtils.arrayToString(dataDirs) + "]";
       return makeInstance(dataDirs, conf);
    }

在真正例項化之前,程式碼會先拿到配置檔案中定義的 dfs.data.dir 對應的字串 dataDirs。然後在 makeInstance(dataDirs, conf) 方法中檢查 dataDirs 在本地檔案系統中是否存在、可用。只要有一個 DIR 可用,就會 new 一個 DataNode 出來。
建構函式 DataNode() 直接呼叫 startDataNode(conf, dataDirs) 方法。這其中跟資料相關的程式碼如下:

1
2
3
4
5
6
7
8
9
10
11
    startDataNode(){
        //…
        storage = new DataStorage();
        //…
        // read storage info, lock data dirs and transition fs state if necessary
        storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
        // adjust
        this.dnRegistration.setStorageInfo(storage);
        // initialize data node internal structure
        this.data = new FSDataset(storage, conf);
    }

在 storage.recoverTransitionRead(nsInfo, dataDirs, startOpt) 中還會對 dataDirs 做檢查:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
    for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
      File dataDir = it.next();
      StorageDirectory sd = new StorageDirectory(dataDir);
      StorageState curState;
      try {
        curState = sd.analyzeStorage(startOpt);
        // sd is locked but not opened
        switch(curState) {
        case NORMAL:
          break;
        case NON_EXISTENT:
          // ignore this storage
          LOG.info("Storage directory " + dataDir + " does not exist.");
          it.remove();
          continue;
        case NOT_FORMATTED: // format
          LOG.info("Storage directory " + dataDir + " is not formatted.");
          LOG.info("Formatting ...");
          format(sd, nsInfo);
          break;
        default:  // recovery part is common
          sd.doRecover(curState);
        }
      } catch (IOException ioe) {
        sd.unlock();
        throw ioe;
      }
      // add to the storage list
      addStorageDir(sd);
      dataDirStates.add(curState);
    }

在 startDataNode() 中跟 volume 直接相關的程式碼就是最後一行

10
this.data = new FSDataset(storage, conf);

FSDataset.java 檔案定義了 DFS 的很多資料結構,如 FSDir, FSVolume, FSVolumeSet。

1
2
3
4
5
6
7
8
9
10
11
  public FSDataset(DataStorage storage, Configuration conf) throws IOException {
    this.maxBlocksPerDir = conf.getInt("dfs.datanode.numblocks", 64);
    FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()];
    for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
      volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
    }
    volumes = new FSVolumeSet(volArray);
    volumeMap = new HashMap<Block, DatanodeBlockInfo>();
    volumes.getVolumeMap(volumeMap);
    registerMBean(storage.getStorageID());
  }

在這個建構函式中,volumeMap 儲存了 HDFS 中每一個 Block 和 一個 DatanodeBlockInfo 的對應關係,而 DatanodeBlockInfo 維護了一個 Block 到 它的 metada 的對映:

1
2
3
4
5
6
class DatanodeBlockInfo {
  private FSVolume volume;       // volume where the block belongs
  private File     file;         // block file
  private boolean detached;      // copy-on-write done for block
  //...
}

而通過 volumes.getVolumeMap(volumeMap),便遞迴的完成每個 volume 下面已經存在的 block 的對映關係的維護。

至此,HDFS 便基本上完成本地檔案系統上的檔案與 DFS 上的檔案/block 的對映。其中 FSDataset 是非常重要的類。下一篇 blog 將要講述的修改 HDFS 以便讓一個 SequenceFile 被建立在指定的 volume 上就需要挖掘這裡的很多方法。比如:

1
2
3
4
5
6
7
8
9
10
11
    synchronized FSVolume getNextVolume(long blockSize) throws IOException {
      int startVolume = curVolume;
      while (true) {
        FSVolume volume = volumes[curVolume];
        curVolume = (curVolume + 1) % volumes.length;
        if (volume.getAvailable() > blockSize) { return volume; }
        if (curVolume == startVolume) {
          throw new DiskOutOfSpaceException("Insufficient space for an additional block");
        }
      }
    }

這個方法保證 HDFS 能‘均衡’的使用配置的每個 volume。