Hadoop原始碼分析:FileSystem類
1、org.apache.hadoop.conf包
org.apache.hadoop.conf包位於hadoop-common模組下
1.1 Configurable 介面
package org.apache.hadoop.conf;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/** Something that may be configured with a {@link Configuration}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Configurable {
/** Set the configuration to be used by this object. */
void setConf(Configuration conf);
/** Return the configuration used by this object. */
Configuration getConf();
}
1.2 Configured類
package org.apache.hadoop.conf;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/** Base class for things that may be configured with a {@link Configuration}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Configured implements Configurable {
private Configuration conf;
/** Construct a Configured. */
public Configured() {
this(null);
}
/** Construct a Configured. */
public Configured(Configuration conf) {
setConf(conf);
}
// inherit javadoc
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
// inherit javadoc
@Override
public Configuration getConf() {
return conf;
}
}
1.3 Configuration類
package org.apache.hadoop.conf;
import ...
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Configuration implements Iterable<Map.Entry<String,String>>,
Writable {
...
private static class Resource {
private final Object resource;
private final String name;
public Resource(Object resource) {
this(resource, resource.toString());
}
public Resource(Object resource, String name) {
this.resource = resource;
this.name = name;
}
public String getName(){
return name;
}
public Object getResource() {
return resource;
}
@Override
public String toString() {
return name;
}
}
/**
* Set the <code>value</code> of the <code>name</code> property. If
* <code>name</code> is deprecated or there is a deprecated name associated to it,
* it sets the value to both names. Name will be trimmed before put into
* configuration.
*
* @param name property name.
* @param value property value.
*/
public void set(String name, String value) {
set(name, value, null);
}
/**
* Add a configuration resource.
*
* The properties of this resource will override properties of previously
* added resources, unless they were marked <a href="#Final">final</a>.
*
* @param name resource to be added, the classpath is examined for a file
* with that name.
*/
public void addResource(String name) {
addResourceObject(new Resource(name));
}
2、org.apache.hadoop.fs包
org.apache.hadoop.fs包位於hadoop-common模組下
2.1 FileSystem
Hadoop有1個抽象的檔案系統概念,HDFS只是其中一個實現。該抽象檔案系統由抽象類org.apache.hadoop.fs.FileSystem 定義,該類繼承了org.apache.hadoop.conf.Configured類,並實現了java.io.Closeable介面。 該抽象類類提供了豐富的方法用於對檔案系統進行操作,比如建立目錄、刪除檔案、重新命名等。
package org.apache.hadoop.fs;
import ....
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileSystem extends Configured implements Closeable {
//"fs.defaultFS"
public static final String FS_DEFAULT_NAME_KEY = CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
//"file:///";
public static final String DEFAULT_FS = CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT;
/**
* Call {@link #mkdirs(Path, FsPermission)} with default permission.
*/
public boolean mkdirs(Path f) throws IOException {
return mkdirs(f, FsPermission.getDirDefault());
}
/** create a directory with the provided permission
* The permission of the directory is set to be the provided permission as in
* setPermission, not permission&~umask
*
* @see #create(FileSystem, Path, FsPermission)
*
* @param fs file system handle
* @param dir the name of the directory to be created
* @param permission the permission of the directory
* @return true if the directory creation succeeds; false otherwise
* @throws IOException
*/
public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
throws IOException {
// create the directory using the default permission
boolean result = fs.mkdirs(dir);
// set its permission to be the supplied one
fs.setPermission(dir, permission);
return result;
}
}
2.2 FileSystem的子類DistributedFileSystem
FileSystem抽象類的一個針對於分散式檔案系統的實現子類,該類實現了DFS系統,通過該類使用者程式碼與HDFS互動。
package org.apache.hadoop.hdfs;
import ...
/****************************************************************
* Implementation of the abstract FileSystem for the DFS system.
* This object is the way end-user code interacts with a Hadoop
* DistributedFileSystem.
*
*****************************************************************/
@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" })
@InterfaceStability.Unstable
public class DistributedFileSystem extends FileSystem {
private Path workingDir;
private URI uri;
// "/user"
private String homeDirPrefix = DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT;
...
}
2.3 FileSystem物件建立過程
Hadoop支援多鍾檔案系統,那麼Hadoop是如何通過FileSystem類引用實際的DistributedFileSystem檔案系統的呢,下面我們將通過原始碼逐步分析這個建立過程。
(1)建立FileSystem的小程式
public static void main(String[] args) throws Exception{
//本地檔案路徑
String local="D:\\word2.txt";
String dest="hdfs://192.168.80.131:9000/user/root/input/word2.txt";
Configuration cfg=new Configuration();
FileSystem fs= FileSystem.get(URI.create(dest),cfg,"root");
fs.copyFromLocalFile(new Path(local), new Path(dest));
fs.close();
}
(2)從該程式中的get()方法入手
下面進入FileSystem類的get(final URI uri, final Configuration conf, final String user)方法,發現呼叫get(URI uri, Configuration conf)方法
/**
* Get a filesystem instance based on the uri, the passed
* configuration and the user
* @param uri of the filesystem
* @param conf the configuration to use
* @param user to perform the get as
* @return the filesystem instance
* @throws IOException
* @throws InterruptedException
*/
public static FileSystem get(final URI uri, final Configuration conf,
final String user) throws IOException, InterruptedException {
String ticketCachePath =
conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
UserGroupInformation ugi =
UserGroupInformation.getBestUGI(ticketCachePath, user);
return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws IOException {
return get(uri, conf);
}
});
}
(3)進入get(URI uri, Configuration conf)方法
從下面的程式碼可以得知,get方法不是每次都建立FileSystem物件,會從快取中獲取FileSystem物件。
/** Returns the FileSystem for this URI's scheme and authority. The scheme
* of the URI determines a configuration property name,
* <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
* The entire URI is passed to the FileSystem instance's initialize method.
*/
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
//根據fs.defaultFS的值獲取檔案系統,若未設定該引數則根據file:///返回檔案件系統
if (scheme == null && authority == null) { // use default FS
return get(conf);
}
//根據fs.defaultFS的值獲取URI,若未設定則使用file:///建立URI
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}
//disableCacheName是conf中關於禁用快取的配置,若該項配置false,則表示使用快取,進入createFileSystem()
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
return createFileSystem(uri, conf);
}
//本配置檔案conf中並未配置關於快取的資訊,所以進入CACHE.get()方法
return CACHE.get(uri, conf);
}
(4)進入CACHE.get(URI uri, Configuration conf)方法
發現CACHE為FileSystem的一個內部類。在該get()方法中,uri和conf被放在了一個key中,key中儲存著使用者身份資訊和訪問的系統資訊。
/** Caching FileSystem objects */
static class Cache {
private final ClientFinalizer clientFinalizer = new ClientFinalizer();
private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
private final Set<Key> toAutoClose = new HashSet<Key>();
/** A variable that makes all objects in the cache unique */
private static AtomicLong unique = new AtomicLong(1);
FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
return getInternal(uri, conf, key);
}
...
private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
FileSystem fs;
//此處相當於快取機制,當用戶第一次進入該方法,map空;該使用者再次進入該方法,並訪問同一個uri
//則檔案系統直接從map中獲取,免去再次初始化的過程
synchronized (this) {
fs = map.get(key);
}
if (fs != null) {
return fs;
}
//建立檔案系統的核心程式碼
fs = createFileSystem(uri, conf);
synchronized (this) { // refetch the lock again
FileSystem oldfs = map.get(key);
if (oldfs != null) { // a file system is created while lock is releasing
fs.close(); // close the new file system
return oldfs; // return the old file system
}
// now insert the new file system into the map
if (map.isEmpty()
&& !ShutdownHookManager.get().isShutdownInProgress()) {
ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
}
fs.key = key;
//使用者第一次進來,map空,在此處為map賦值,上個方法中的key與相應的檔案系統作為鍵值對存入map
map.put(key, fs);
if (conf.getBoolean("fs.automatic.close", true)) {
toAutoClose.add(key);
}
return fs;
}
}
...
}
(5)進入getInternal(URI uri, Configuration conf, Key key)方法
該方法內部類Cache中。
(6)進入createFileSystem(URI uri, Configuration conf)方法
該負責建立具體的檔案系統例項
private static FileSystem createFileSystem(URI uri, Configuration conf
) throws IOException {
//根據conf中配置的fs.defaultFS的值來獲取相應的檔案系統物件的class檔案,即DFS的位元組碼檔案
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
//通過反射機制,利用上面的class檔案,建立相應的檔案系統物件
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
//上面的fs物件僅是一個空系統,需要呼叫initialize()進行初始化
fs.initialize(uri, conf);
return fs;
}
(7)進入initialize(URI uri, Configuration conf)方法
注意,由於此處是對DistributedFileSystem進行初始化,所以一定要檢視DistributedFileSystem類的initialize(URI uri, Configuration conf)方法
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
setConf(conf);
//獲取NameNode主機名
String host = uri.getHost();
if (host == null) {
throw new IOException("Incomplete HDFS URI, no host: "+ uri);
}
homeDirPrefix = conf.get(
DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY, //"dfs.user.home.dir.prefix"
DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);// "/user"
//對dfs、url和workingDir進行初始化
//最重要的是dfs,這是一個DFSClint,從名字可以看出是一個客戶端,負責與NameNode通訊,
// 他的內部有一個RPC代理物件,負責遠端獲取NameNode上的資訊。這是一個複雜的物件。
this.dfs = new DFSClient(uri, conf, statistics);
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
this.workingDir = getHomeDirectory();
}