hadoop2.6.0原始碼剖析-客戶端(第二部分--DFSClient)下(HA代理)
我們繼續進入到org.apache.hadoop.hdfs.NameNodeProxies的函式
public static <T> ProxyAndInfo<T> createProxy(Configuration conf,URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)中,開始分析HA代理部分程式碼,程式碼如下:
// HA case Conf config = new Conf(conf); T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, config.maxRetryAttempts, config.failoverSleepBaseMillis, config.failoverSleepMaxMillis)); Text dtService; if (failoverProxyProvider.useLogicalURI()) { dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri, HdfsConstants.HDFS_URI_SCHEME); } else { dtService = SecurityUtil.buildTokenService( NameNode.getAddress(nameNodeUri)); } return new ProxyAndInfo<T>(proxy, dtService, NameNode.getAddress(nameNodeUri));
我們在繼續講解上面的程式碼前,我們先看看failoverProxyProvider的建立過程,這個變數為dfs.client.failover.proxy.provider加上nameNodeUri.host()類物件,程式碼如下:
/** Creates the Failover proxy provider instance*/ //這個函式用來建立一個故障轉移代理類例項 //conf為配置類物件 //nameNodeUri物件中包含了NameNode伺服器url資訊 //xface為ClientProtocol.class //boolean為false @VisibleForTesting public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider( Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort, AtomicBoolean fallbackToSimpleAuth) throws IOException { Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null; AbstractNNFailoverProxyProvider<T> providerNN; //如果xface與NamenodeProtocols不是同一個類或者同一個介面,且xface不是NamenodeProtocols的 //父類或父介面,那麼就丟擲異常。 Preconditions.checkArgument( xface.isAssignableFrom(NamenodeProtocols.class), "Interface %s is not a NameNode protocol", xface); try { // Obtain the class of the proxy provider //返回dfs.client.failover.proxy.provider加上nameNodeUri.host()對應類的Class類物件 failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri); if (failoverProxyProviderClass == null) { return null; } // Create a proxy provider instance. //獲取對應的建構函式,建構函式有三個引數 Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass .getConstructor(Configuration.class, URI.class, Class.class); //根據建構函式建立類物件 FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri, xface); // If the proxy provider is of an old implementation, wrap it. //provider是否是AbstractNNFailoverProxyProvider類的例項或者是 //AbstractNNFailoverProxyProvider子類例項,如果不是,那麼採用 //WrappedFailoverProxyProvider類封裝,否則直接返回 if (!(provider instanceof AbstractNNFailoverProxyProvider)) { providerNN = new WrappedFailoverProxyProvider<T>(provider); } else { providerNN = (AbstractNNFailoverProxyProvider<T>)provider; } } catch (Exception e) { String message = "Couldn't create proxy provider " + failoverProxyProviderClass; if (LOG.isDebugEnabled()) { LOG.debug(message, e); } if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); } else { throw new IOException(message, e); } } // Check the port in the URI, if it is logical. //檢查埠號 if (checkPort && providerNN.useLogicalURI()) { int port = nameNodeUri.getPort(); //如果埠號不為8020,那麼就拋異常 if (port > 0 && port != NameNode.DEFAULT_PORT) { // Throwing here without any cleanup is fine since we have not // actually created the underlying proxies yet. throw new IOException("Port " + port + " specified in URI " + nameNodeUri + " but host '" + nameNodeUri.getHost() + "' is a logical (HA) namenode" + " and does not use port information."); } } providerNN.setFallbackToSimpleAuth(fallbackToSimpleAuth); return providerNN; }
我們分析failoverProxyProviderClass = getFailoverProxyProviderClass(conf,nameNodeUri);getFailoverProxyProviderClass函式程式碼如下:
/** Gets the configured Failover proxy provider's class */ @VisibleForTesting public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass( Configuration conf, URI nameNodeUri) throws IOException { if (nameNodeUri == null) { return null; } //獲取域名 String host = nameNodeUri.getHost(); //DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX值為dfs.client.failover.proxy.provider String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + host; try { @SuppressWarnings("unchecked") //返回類名稱為configKey對應的Class類物件,同時這個類要與FailoverProxyProvider是同一個類 //或者同一個介面,或者這個類是FailoverProxyProvider的子類或子介面 Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>) conf .getClass(configKey, null, FailoverProxyProvider.class); return ret; } catch (RuntimeException e) { if (e.getCause() instanceof ClassNotFoundException) { throw new IOException("Could not load failover proxy provider class " + conf.get(configKey) + " which is configured for authority " + nameNodeUri, e); } else { throw e; } } }
我們分析getClass函式,待如下:
/**
* Get the value of the <code>name</code> property as a <code>Class</code>
* implementing the interface specified by <code>xface</code>.
*
* If no such property is specified, then <code>defaultValue</code> is
* returned.
*
* An exception is thrown if the returned class does not implement the named
* interface.
*
* @param name the class name.
* @param defaultValue default value.
* @param xface the interface implemented by the named class.
* @return property value as a <code>Class</code>,
* or <code>defaultValue</code>.
*/
public <U> Class<? extends U> getClass(String name,
Class<? extends U> defaultValue,
Class<U> xface) {
try {
//獲取類名稱為name對應的Class類物件,如果沒有那麼採用預設值defaultValue
Class<?> theClass = getClass(name, defaultValue);
//如果Class類物件不為null且xface與theClass不是相同的類或介面,且xface不是theClass的父類
//或父介面,那麼就丟擲異常。
if (theClass != null && !xface.isAssignableFrom(theClass))
throw new RuntimeException(theClass+" not "+xface.getName());
else if (theClass != null)
return theClass.asSubclass(xface);
else
return null;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
我們分析getClass函式,程式碼如下:
/**
* Get the value of the <code>name</code> property as a <code>Class</code>.
* If no such property is specified, then <code>defaultValue</code> is
* returned.
*
* @param name the class name.
* @param defaultValue default value.
* @return property value as a <code>Class</code>,
* or <code>defaultValue</code>.
*/
public Class<?> getClass(String name, Class<?> defaultValue) {
//將字串前後的空格符去掉
String valueString = getTrimmed(name);
//如果字串為null,那麼就返回defaultValue
if (valueString == null)
return defaultValue;
try {
//通過valueString獲取對應的Class物件
return getClassByName(valueString);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
getClassByName函式最終會呼叫函式public Class<?> getClassByNameOrNull(String name),程式碼如下:
在org.apache.hadoop.conf.Configuration類中
/**
* Load a class by name, returning null rather than throwing an exception
* if it couldn't be loaded. This is to avoid the overhead of creating
* an exception.
*
* @param name the class name
* @return the class object, or null if it could not be found.
*/
public Class<?> getClassByNameOrNull(String name) {
Map<String, WeakReference<Class<?>>> map;
//採用同步的方式
synchronized (CACHE_CLASSES) {
//CACHE_CLASSES的建立:private static final Map<ClassLoader, Map<String,
//WeakReference<Class<?>>>>
//CACHE_CLASSES = new WeakHashMap<ClassLoader, Map<String, WeakReference<Class<?
//>>>>();
//可以看出CACHE_CLASSES是一個map,key為載入類物件,value也為一個map,key為名稱,value為
//WeakReference類物件,這個物件中包含了Class類物件
map = CACHE_CLASSES.get(classLoader);
if (map == null) {
//如果map為null,說明沒有找到key為相應載入器類物件的元素,那麼此時就建立一個
map = Collections.synchronizedMap(
new WeakHashMap<String, WeakReference<Class<?>>>());
CACHE_CLASSES.put(classLoader, map);
}
}
Class<?> clazz = null;
WeakReference<Class<?>> ref = map.get(name);
if (ref != null) {
//如果找到了對應name的WeekReference類物件,那麼就獲取相應的Class類物件
clazz = ref.get();
}
//如果為null
if (clazz == null) {
try {
//使用載入類物件classLoader載入名稱為name的Class類物件,true表示載入的時候會執行類名稱
//為name的類裡面的靜態區程式碼
clazz = Class.forName(name, true, classLoader);
} catch (ClassNotFoundException e) {
// Leave a marker that the class isn't found
//如果載入失敗,那麼就儲存一個標識,表示該類不存在
map.put(name, new WeakReference<Class<?>>(NEGATIVE_CACHE_SENTINEL));
return null;
}
// two putters can race here, but they'll put the same class
//將Class類物件儲存到map中,供下次再次使用,並返回Class類物件。
map.put(name, new WeakReference<Class<?>>(clazz));
return clazz;
} else if (clazz == NEGATIVE_CACHE_SENTINEL) {
return null; // not found
} else {
// cache hit
return clazz;
}
}
到此,failoverProxyProvider就講解完了,該類中包含了一個NameNode的代理類接下來我們繼續往下分析。
我們進入到static <T> Object create(Class<T> iface,FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy)函式中,程式碼如下:
/**
* Create a proxy for an interface of implementations of that interface using
* the given {@link FailoverProxyProvider} and the same retry policy for each
* method in the interface.
*
* @param iface the interface that the retry will implement
* @param proxyProvider provides implementation instances whose methods should be retried
* @param retryPolicy the policy for retrying or failing over method call failures
* @return the retry proxy
*/
public static <T> Object create(Class<T> iface,
FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {
//直接呼叫Java動態代理構造方法,返回ClientProtocol的代理物件,RetryInvocationHandler實現了
//InvocationHandler介面,裡面將proxyProvider和retryPolicy傳入到物件中
return Proxy.newProxyInstance(
proxyProvider.getInterface().getClassLoader(),
new Class<?>[] { iface },
new RetryInvocationHandler<T>(proxyProvider, retryPolicy)
);
}
上面程式碼中的proxyProvider其實是org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider類物件。回到org.apache.hadoop.hdfs.NameNodeProxies的函式createProxy中,將物件返回,進入到DFSClient類的DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,Configuration conf, FileSystem.Statistics stats)函式中,剩下的程式碼在此就忽略,到此整個DFSClient建構函式就結束了,當然裡面還涉及到很多知識點,後面我會不斷更新文章,把遺漏的知識點補上,敬請期待。