Hadoop原始碼分析之FileSystem
新建了一個Configuration物件之後,在呼叫Configuration.get()獲取配置鍵值對時,如果Configuration物件的properities為null,就會預設載入CLASSPATH上的預設配置檔案(參見 Hadoop原始碼分析之Configuration),所以在得到一個Configuration物件之後就可以利用這個物件來新建一個FileSystem物件。
Hadoop抽象檔案系統
為了為不同的檔案系統提供一個統一的介面,Hadoop提供了一個抽象的檔案系統,而Hadoop分散式檔案系統(Hadoop Distributed File System, HDFS)只是這個抽象檔案系統的一個具體實現。Hadoop抽象檔案系統介面主要由抽象類org.apache.hadoop.fs.FileSystem提供,其繼承的層次結構如下圖:
從上圖可以看出,Hadoop發行包中包含了不同的FileSystem子類,以滿足不同得到資料訪問需求。比較典型的是訪問HDFS上的資料,但有些時候也可訪問儲存在其他檔案系統上,如Amazon的S3系統。此外,使用者還可以根據特定的需求,自己實現特定網路存粗服務的具體檔案系統。
Hadoop抽象檔案系統為使用者提供了一個訪問不同的檔案系統的統一介面,大部分介面都在FileSystem這個抽象類中。其主要提供的方法可以分為兩部分:一部分用於處理檔案和目錄相關的事務;另一部分用於讀寫檔案資料。其中處理檔案和目錄主要是指建立檔案,建立目錄,刪除檔案,刪除目錄等操作,讀寫資料檔案主要是指讀檔案資料,寫入檔案資料等操作。這一些列操作大部分與Java的檔案系統介面相似,如FileSystem.mkdirs(Path f, FsPermission permission)方法在FileSystem物件所代表的檔案系統中建立目錄,Java.io.File.mkdirs()也是建立目錄的方法。FileSystem.delete(Path f)方法用於刪除檔案或目錄,Java.io.File.delete()方法也用於刪除檔案或目錄。FileSystem中需要子類實現的抽象方法大部分都是見名知意的方法,下面整理了FileSystem中的抽象方法:
上面的方法基本上覆蓋了Hadoop抽象檔案系統具體實現所需要實現的方法。此外,Hadoop抽象檔案系統基於以上方法,提供了一些工具方法,方便使用者呼叫。如listStatus()方法等。/**獲取檔案系統URI**/ public abstract URI getUri(); /**開啟一個檔案,並返回一個輸入流**/ public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException; /**建立一個檔案,並返回一個輸入流**/ public abstract FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException; /**在一個已經存在的檔案中追加資料**/ public abstract FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException; /**修改檔名或目錄名**/ public abstract boolean rename(Path src, Path dst) throws IOException; /**刪除檔案**/ public abstract boolean delete(Path f, boolean recursive) throws IOException; /**如果Path是一個目錄,讀取一個目錄下的所有專案和專案屬性,如果Path是一個檔案獲取該檔案的屬性**/ public abstract FileStatus[] listStatus(Path f) throws IOException; /**設定當前的工作目錄**/ public abstract void setWorkingDirectory(Path new_dir); /**獲取當前的工作目錄**/ public abstract Path getWorkingDirectory(); /**建立資料夾**/ public abstract boolean mkdirs(Path f, FsPermission permission ) throws IOException; /**獲取檔案或目錄屬性**/ public abstract FileStatus getFileStatus(Path f) throws IOException;
獲取FileSystem物件
下面是關於客戶端獲取FileSystem的DistributedFileSystem物件
客戶端要構造FileSystem物件可以使用FileSystem.get()方法,該方法有3個過載方法,分別是
/** Returns the configured filesystem implementation.
* 獲取具體檔案系統
* */
public static FileSystem get(Configuration conf) throws IOException {
return get(getDefaultUri(conf), conf);
}
/** Returns the FileSystem for this URI's scheme and authority. The scheme
* of the URI determines a configuration property name,
* <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
* The entire URI is passed to the FileSystem instance's initialize method.
*/
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();//獲得URI的模式
String authority = uri.getAuthority();//鑑權資訊
//URI模式為空,並且鑑權資訊為空,返回預設檔案系統
if (scheme == null && authority == null) { // use default FS
return get(conf);
}
//鑑權資訊為空
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {//是否使用被Cache的檔案系統
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
}
public static FileSystem get(final URI uri, final Configuration conf,
final String user)
throws IOException, InterruptedException {
UserGroupInformation ugi;
if (user == null) {
ugi = UserGroupInformation.getCurrentUser();
} else {
ugi = UserGroupInformation.createRemoteUser(user);
}
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws IOException {
return get(uri, conf);
}
});
}
但是都是呼叫有兩個形參的FileSystem.get()方法獲取FileSystem物件 。
在Hadoop原始碼分析之開篇給出的程式碼中獲取FileSystem物件的那行程式碼是
FileSystem in = FileSystem.get(conf);
其中conf是一個Configuration物件。執行這行程式碼後就進入到FileSystem.get(Configuration conf)方法中,可以看到,在這個方法中先通過getDefaultUri()方法獲取檔案系統對應的的URI,該URI儲存了與檔案系統對應的協議和授權資訊,如:hdfs://localhost:9000。這個URI又是如何得到的呢?是在CLASSPATH中的配置檔案中取得的,看getDefaultUri()方法中有conf.get(FS_DEFAULT_NAME_KEY,
"file:///") 這麼一個實參,在筆者專案的CLASSPATH中的core-site.xml檔案中有這麼一個配置:
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
<property>
而常量FS_DEFAULT_NAME_KEY對應的值是fs.default.name,所以conf.get(FS_DEFAULT_NAME_KEY, "file:///")得到的值是hdfs://localhost:9000。
URI建立完成之後就進入到FileSystem.get(final URI uri, final Configuration conf)方法。在這個方法中,先執行一些檢查,檢查URI的協議和授權資訊是否為空,然後再直接或簡介呼叫該方法,最重要的是執行
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {//是否使用被Cache的檔案系統
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
常量CACHE用於快取已經開啟的、可共享的檔案系統,它是FileSystem類的靜態內部類FileSystem.Cache的物件,在其內部使用一個Map儲存檔案系統
private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
這個鍵值對對映的鍵是FileSystem.Cache.Key型別,它有三個成員變數:
/**URI模式**/
final String scheme;
/**URI的授權部分**/
final String authority;
/**儲存了開啟具體檔案系統的本地使用者資訊,不同本地使用者開啟的具體檔案系統也是不能共享的**/
final UserGroupInformation ugi;
由於FileSystem.Cache表示可共享的檔案系統,所以這個Key就用於區別不同的檔案系統物件,如一個一個檔案系統物件可共享,那麼FileSystem.Cache.Key的三個成員變數相等,在這個類中重寫了hashCode()方法和equals()方法,就是用於判斷這三個變數是否相等。根據《Hadoop技術內幕:深入解析Hadoop Common和HDFS架構設計與實現原理》這本書的介紹,在Hadoop1。0版本中FileSystem.Cache.Key類還有一個unique欄位,這個欄位表示,如果其他3個欄位相等的情況,下如果使用者不想共享這個檔案系統,就設定這個值(預設為0),但是不知道現在為什麼去除了,還沒搞清楚,有哪位同學知道的話麻煩告知,謝謝。
回到FileSystem.get(final URI uri, final Configuration conf)方法的最後一行語句return CACHE.get(uri, conf),呼叫了FileSystem.Cahce.get()方法獲取具體的檔案系統物件,該方法程式碼如下:
FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
FileSystem fs = null;
synchronized (this) {
fs = map.get(key);
}
if (fs != null) {
return fs;
}
fs = createFileSystem(uri, conf);
synchronized (this) { // refetch the lock again
FileSystem oldfs = map.get(key);
if (oldfs != null) { // a file system is created while lock is releasing
fs.close(); // close the new file system
return oldfs; // return the old file system
}
// now insert the new file system into the map
if (map.isEmpty() && !clientFinalizer.isAlive()) {
Runtime.getRuntime().addShutdownHook(clientFinalizer);
}
fs.key = key;
map.put(key, fs);
return fs;
}
}
在這個方法中先檢視已經map中是否已經快取了要獲取的檔案系統物件,如果已經有了,直接從集合中去除,如果沒有才進行建立,由於FileSystem.CACHE為static型別,所以在同一時刻可能有多個執行緒在訪問,所以需要在Cache類的方法中使用同步的操作來取值和設定值。這個方法比較簡單,最核心的就是 fs = createFileSystem(uri, conf);
這行語句,它執行了具體的檔案系統物件的建立的功能。createFileSystem()方法是FileSystem的一個私有方法,其程式碼如下:
private static FileSystem createFileSystem(URI uri, Configuration conf
) throws IOException {
Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
LOG.debug("Creating filesystem for " + uri);
if (clazz == null) {
throw new IOException("No FileSystem for scheme: " + uri.getScheme());
}
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
fs.initialize(uri, conf);
return fs;
}
其實現就是先從配置檔案取得URI對應的類,如在core-default.xml檔案中屬性(鍵)fs.hdfs.impl對應的值是org.apache.hadoop.hdfs.DistributedFileSystem,相應的XML程式碼如下:
<property>
<name>fs.hdfs.impl</name>
<value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
<description>The FileSystem for hdfs: uris.</description>
</property>
所以若uri對應fs.hdfs.impl,那麼createFileSystem中的clazz就是org.apache.hadoop.hdfs.DistributedFileSystem的Class物件。然後再利用反射,建立org.apache.hadoop.hdfs.DistributedFileSystem的物件fs。然後執行fs.initialize(uri, conf);初始化fs物件。DistributedFileSystem是Hadoop分散式檔案系統的實現類,實現了Hadoop檔案系統的介面,提供了處理HDFS檔案和目錄的相關事務。
執行DistributedFileSystem.initialize()方法之後,FileSystem物件就建立成功。
Reference:
《Hadoop技術內幕:深入解析Hadoop Common和HDFS架構設計與實現原理》