1. 程式人生 > >Hadoop原始碼分析之FileSystem

Hadoop原始碼分析之FileSystem

新建了一個Configuration物件之後,在呼叫Configuration.get()獲取配置鍵值對時,如果Configuration物件的properities為null,就會預設載入CLASSPATH上的預設配置檔案(參見 Hadoop原始碼分析之Configuration),所以在得到一個Configuration物件之後就可以利用這個物件來新建一個FileSystem物件。

Hadoop抽象檔案系統

為了為不同的檔案系統提供一個統一的介面,Hadoop提供了一個抽象的檔案系統,而Hadoop分散式檔案系統(Hadoop Distributed File System, HDFS)只是這個抽象檔案系統的一個具體實現。Hadoop抽象檔案系統介面主要由抽象類org.apache.hadoop.fs.FileSystem提供,其繼承的層次結構如下圖:


從上圖可以看出,Hadoop發行包中包含了不同的FileSystem子類,以滿足不同得到資料訪問需求。比較典型的是訪問HDFS上的資料,但有些時候也可訪問儲存在其他檔案系統上,如Amazon的S3系統。此外,使用者還可以根據特定的需求,自己實現特定網路存粗服務的具體檔案系統。

Hadoop抽象檔案系統為使用者提供了一個訪問不同的檔案系統的統一介面,大部分介面都在FileSystem這個抽象類中。其主要提供的方法可以分為兩部分:一部分用於處理檔案和目錄相關的事務;另一部分用於讀寫檔案資料。其中處理檔案和目錄主要是指建立檔案,建立目錄,刪除檔案,刪除目錄等操作,讀寫資料檔案主要是指讀檔案資料,寫入檔案資料等操作。這一些列操作大部分與Java的檔案系統介面相似,如FileSystem.mkdirs(Path f, FsPermission permission)方法在FileSystem物件所代表的檔案系統中建立目錄,Java.io.File.mkdirs()也是建立目錄的方法。FileSystem.delete(Path f)方法用於刪除檔案或目錄,Java.io.File.delete()方法也用於刪除檔案或目錄。FileSystem中需要子類實現的抽象方法大部分都是見名知意的方法,下面整理了FileSystem中的抽象方法:

	/**獲取檔案系統URI**/
	public abstract URI getUri();
	/**開啟一個檔案,並返回一個輸入流**/
	public abstract FSDataInputStream open(Path f, int bufferSize)
		    throws IOException;
	/**建立一個檔案,並返回一個輸入流**/
	public abstract FSDataOutputStream create(Path f,
		      FsPermission permission,
		      boolean overwrite,
		      int bufferSize,
		      short replication,
		      long blockSize,
		      Progressable progress) throws IOException;
	/**在一個已經存在的檔案中追加資料**/	  
	public abstract FSDataOutputStream append(Path f, int bufferSize,
		      Progressable progress) throws IOException;
	/**修改檔名或目錄名**/
	public abstract boolean rename(Path src, Path dst) throws IOException;
	/**刪除檔案**/
	public abstract boolean delete(Path f, boolean recursive) throws IOException;
	/**如果Path是一個目錄,讀取一個目錄下的所有專案和專案屬性,如果Path是一個檔案獲取該檔案的屬性**/
	public abstract FileStatus[] listStatus(Path f) throws IOException;
	/**設定當前的工作目錄**/
	public abstract void setWorkingDirectory(Path new_dir);
	/**獲取當前的工作目錄**/
	public abstract Path getWorkingDirectory();
	/**建立資料夾**/
	public abstract boolean mkdirs(Path f, FsPermission permission
		      ) throws IOException;
	/**獲取檔案或目錄屬性**/
	public abstract FileStatus getFileStatus(Path f) throws IOException;
上面的方法基本上覆蓋了Hadoop抽象檔案系統具體實現所需要實現的方法。此外,Hadoop抽象檔案系統基於以上方法,提供了一些工具方法,方便使用者呼叫。如listStatus()方法等。

獲取FileSystem物件

下面是關於客戶端獲取FileSystem的DistributedFileSystem物件

客戶端要構造FileSystem物件可以使用FileSystem.get()方法,該方法有3個過載方法,分別是

  /** Returns the configured filesystem implementation.
   * 獲取具體檔案系統
   * */
  public static FileSystem get(Configuration conf) throws IOException {
    return get(getDefaultUri(conf), conf);
  }

/** Returns the FileSystem for this URI's scheme and authority.  The scheme
   * of the URI determines a configuration property name,
   * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
   * The entire URI is passed to the FileSystem instance's initialize method.
   */
  public static FileSystem get(URI uri, Configuration conf) throws IOException {
    String scheme = uri.getScheme();//獲得URI的模式
    String authority = uri.getAuthority();//鑑權資訊
    //URI模式為空,並且鑑權資訊為空,返回預設檔案系統
    if (scheme == null && authority == null) {     // use default FS
      return get(conf);
    }
    //鑑權資訊為空
    if (scheme != null && authority == null) {     // no authority
      URI defaultUri = getDefaultUri(conf);
      if (scheme.equals(defaultUri.getScheme())    // if scheme matches default
          && defaultUri.getAuthority() != null) {  // & default has authority
        return get(defaultUri, conf);              // return default
      }
    }
    
    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
    if (conf.getBoolean(disableCacheName, false)) {//是否使用被Cache的檔案系統
      return createFileSystem(uri, conf);
    }

    return CACHE.get(uri, conf);
  }

public static FileSystem get(final URI uri, final Configuration conf, 
      final String user)
  throws IOException, InterruptedException {
    UserGroupInformation ugi;
    if (user == null) {
      ugi = UserGroupInformation.getCurrentUser();
    } else {
      ugi = UserGroupInformation.createRemoteUser(user);
    }
    return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
      public FileSystem run() throws IOException {
        return get(uri, conf);
      }
    });
  }
但是都是呼叫有兩個形參的FileSystem.get()方法獲取FileSystem物件  。

Hadoop原始碼分析之開篇給出的程式碼中獲取FileSystem物件的那行程式碼是

FileSystem in = FileSystem.get(conf);
其中conf是一個Configuration物件。執行這行程式碼後就進入到FileSystem.get(Configuration conf)方法中,可以看到,在這個方法中先通過getDefaultUri()方法獲取檔案系統對應的的URI,該URI儲存了與檔案系統對應的協議和授權資訊,如:hdfs://localhost:9000。這個URI又是如何得到的呢?是在CLASSPATH中的配置檔案中取得的,看getDefaultUri()方法中有conf.get(FS_DEFAULT_NAME_KEY, "file:///") 這麼一個實參,在筆者專案的CLASSPATH中的core-site.xml檔案中有這麼一個配置:
	<property>
		<name>fs.default.name</name>
		<value>hdfs://localhost:9000</value>
	</property>
	<property>
而常量FS_DEFAULT_NAME_KEY對應的值是fs.default.name,所以conf.get(FS_DEFAULT_NAME_KEY, "file:///")得到的值是hdfs://localhost:9000

URI建立完成之後就進入到FileSystem.get(final URI uri, final Configuration conf)方法。在這個方法中,先執行一些檢查,檢查URI的協議和授權資訊是否為空,然後再直接或簡介呼叫該方法,最重要的是執行

    String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
    if (conf.getBoolean(disableCacheName, false)) {//是否使用被Cache的檔案系統
      return createFileSystem(uri, conf);
    }

    return CACHE.get(uri, conf);
常量CACHE用於快取已經開啟的、可共享的檔案系統,它是FileSystem類的靜態內部類FileSystem.Cache的物件,在其內部使用一個Map儲存檔案系統
private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
這個鍵值對對映的鍵是FileSystem.Cache.Key型別,它有三個成員變數:
/**URI模式**/
      final String scheme;
      /**URI的授權部分**/
      final String authority;
      /**儲存了開啟具體檔案系統的本地使用者資訊,不同本地使用者開啟的具體檔案系統也是不能共享的**/
      final UserGroupInformation ugi;
由於FileSystem.Cache表示可共享的檔案系統,所以這個Key就用於區別不同的檔案系統物件,如一個一個檔案系統物件可共享,那麼FileSystem.Cache.Key的三個成員變數相等,在這個類中重寫了hashCode()方法和equals()方法,就是用於判斷這三個變數是否相等。根據《Hadoop技術內幕:深入解析Hadoop Common和HDFS架構設計與實現原理》這本書的介紹,在Hadoop1。0版本中FileSystem.Cache.Key類還有一個unique欄位,這個欄位表示,如果其他3個欄位相等的情況,下如果使用者不想共享這個檔案系統,就設定這個值(預設為0),但是不知道現在為什麼去除了,還沒搞清楚,有哪位同學知道的話麻煩告知,謝謝。

回到FileSystem.get(final URI uri, final Configuration conf)方法的最後一行語句return CACHE.get(uri, conf),呼叫了FileSystem.Cahce.get()方法獲取具體的檔案系統物件,該方法程式碼如下:

    FileSystem get(URI uri, Configuration conf) throws IOException{
      Key key = new Key(uri, conf);
      FileSystem fs = null;
      synchronized (this) {
        fs = map.get(key);
      }
      if (fs != null) {
        return fs;
      }
      
      fs = createFileSystem(uri, conf);
      synchronized (this) {  // refetch the lock again
        FileSystem oldfs = map.get(key);
        if (oldfs != null) { // a file system is created while lock is releasing
          fs.close(); // close the new file system
          return oldfs;  // return the old file system
        }

        // now insert the new file system into the map
        if (map.isEmpty() && !clientFinalizer.isAlive()) {
          Runtime.getRuntime().addShutdownHook(clientFinalizer);
        }
        fs.key = key;
        map.put(key, fs);
        return fs;
      }
    }
在這個方法中先檢視已經map中是否已經快取了要獲取的檔案系統物件,如果已經有了,直接從集合中去除,如果沒有才進行建立,由於FileSystem.CACHE為static型別,所以在同一時刻可能有多個執行緒在訪問,所以需要在Cache類的方法中使用同步的操作來取值和設定值。這個方法比較簡單,最核心的就是
      fs = createFileSystem(uri, conf);
這行語句,它執行了具體的檔案系統物件的建立的功能。createFileSystem()方法是FileSystem的一個私有方法,其程式碼如下:
private static FileSystem createFileSystem(URI uri, Configuration conf
      ) throws IOException {
    Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
    LOG.debug("Creating filesystem for " + uri);
    if (clazz == null) {
      throw new IOException("No FileSystem for scheme: " + uri.getScheme());
    }
    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    fs.initialize(uri, conf);
    return fs;
  }
其實現就是先從配置檔案取得URI對應的類,如在core-default.xml檔案中屬性(鍵)fs.hdfs.impl對應的值是org.apache.hadoop.hdfs.DistributedFileSystem,相應的XML程式碼如下:
<property>
  <name>fs.hdfs.impl</name>
  <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
  <description>The FileSystem for hdfs: uris.</description>
</property>

所以若uri對應fs.hdfs.impl,那麼createFileSystem中的clazz就是org.apache.hadoop.hdfs.DistributedFileSystem的Class物件。然後再利用反射,建立org.apache.hadoop.hdfs.DistributedFileSystem的物件fs。然後執行fs.initialize(uri, conf);初始化fs物件。DistributedFileSystem是Hadoop分散式檔案系統的實現類,實現了Hadoop檔案系統的介面,提供了處理HDFS檔案和目錄的相關事務。

執行DistributedFileSystem.initialize()方法之後,FileSystem物件就建立成功。

Reference:

《Hadoop技術內幕:深入解析Hadoop Common和HDFS架構設計與實現原理》