hadoop(2.7.3) 原始碼分析--RPC部分
hadoop(2.7.3) 原始碼分析–RPC部分
序列化
hadoop 自帶了Writable序列化方法,可序列化的物件需實現 Writable 介面。
Hadoop common下org.apache.hadoop.io 大量的可序列化物件,他們都實現了Writable 介面。
/**
* Serialize the fields of this object to <code>out</code>.
*
* @param out <code>DataOuput</code> to serialize this object into.
* @throws IOException
*/
void write(DataOutput out) throws IOException;
/**
* Deserialize the fields of this object from <code>in</code>.
*
* <p>For efficiency, implementations should attempt to re-use storage in the
* existing object where possible.</p>
*
* @param in <code>DataInput</code> to deseriablize this object from.
* @throws IOException
*/
void readFields(DataInput in) throws IOException;
其中最關鍵的是ObjectWritable,他儲存了一個可以在RPC上傳輸的物件和物件的型別資訊,是萬能的。它會往流裡會寫如下資訊:物件類名,物件自己的序列化結果,程式碼見ObjectWritable.java
UTF8.writeString(out, declaredClass.getName()); // always write declared
if (declaredClass.isArray()) { // non-primitive or non-compact array
int length = Array.getLength(instance);
out.writeInt(length);
for (int i = 0; i < length; i++) {
writeObject(out, Array.get(instance, i),
declaredClass.getComponentType(), conf, allowCompactArrays);
}
} else if (declaredClass == ArrayPrimitiveWritable.Internal.class) {
((ArrayPrimitiveWritable.Internal) instance).write(out);
} else if (declaredClass == String.class) { // String
UTF8.writeString(out, (String)instance);
}
通訊部分
既然是RPC,當然就有客戶端和伺服器,當然,org.apache.hadoop.ipc也就有了類Client和類Server。但是類Server是一個抽象類,類RPC封裝了Server,利用反射,把某個物件的方法開放出來,發成RPC中的伺服器。
由於Client可能和多個Server通訊,典型的一次HDFS讀,需要和NameNode打交道,也需要和某個/某些DataNode通訊。這就意味著某一個Client需要維護多個連線。同時,為了減少不必要的連線,現在Client的做法是拿ConnectionId來做為Connection 的ID。ConnectionId 包括一個InetSocketAddress 和UserGroupInformation(ticket)及protocol,同一個使用者與同一個InetSocketAddress 的通訊將共享同一個連線。
private Hashtable<ConnectionId, Connection> connections =
new Hashtable<ConnectionId, Connection>();
為了區分在同一個Connection 上的不同調用,每個呼叫都有唯一的id。呼叫是否結束也需要一個標記,所有的這些都體現在物件Client.Call 中。Connection 物件通過一個Hash 表,維護在某個連線上的所有Call
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
每個Client.Connection 繼承Thread,所以會開啟一個執行緒,不斷去讀取socket,並將收結果解包,找出對應的Call,設定Call 並通知結果已獲取。
Hadoop 的Server 採用了Java 的NIO,返樣的話就不需要為每一個socket 連線建立一個執行緒,讀取socket 上的資料。在Server 中,Listener及其巢狀類Reader 負責accept 新的連線請求和讀取socket 上的資料。
Responder負責在NIO可寫時,傳送rpc的結果。
RPC部分
客戶端和服務端呼叫時,都需要用的Call類,表示一次請求。在客戶端Client.Call 類裡,包括rpcRequest和rpcResponse,在Server.Call,除了剛才的rpcRequest和rpcResponse,還主要包括一下屬性:
connection 是該Call 來自的連線,當然,當請求處理結束時,相應的結果會通過相同的connection,傳送給客戶端。
timestamp 是請求到達的時間戳,如果請求長時間沒被處理,對應的連線會被關閉,客戶端也就知道出錯了
rpc部分的關鍵是RpcEngine,預設為WritableRpcEngine。主要包括Invocation、Invoker和WritableRpcInvoker三部分。
Invocation封裝了一個迖程呼叫的所有相關資訊,它的主要屬性有: methodName,呼叫方法名,parameterClasses,呼叫方法引數的型別列表和parameters,呼叫方法引數。注意,它實現了Writable介面,可以序列化。
@Override
@SuppressWarnings("deprecation")
public void write(DataOutput out) throws IOException {
out.writeLong(rpcVersion);
UTF8.writeString(out, declaringClassProtocolName);
UTF8.writeString(out, methodName);
out.writeLong(clientVersion);
out.writeInt(clientMethodsHash);
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
conf, true);
}
}
Invoker實現了InvocationHandler的invoke方法(invoke方法也是InvocationHandler的唯一方法)。Invoker會通過Invocation把所有跟這次呼叫相關的呼叫方法名,引數型別列表,引數列表打包,然後引用前面我們分析過的Client,通過socket傳遞到伺服器端。就是說,你在proxy類上的任何呼叫,都通過Client傳送到對方的伺服器上。
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
long startTime = 0;
if (LOG.isDebugEnabled()) {
startTime = Time.now();
}
TraceScope traceScope = null;
if (Trace.isTracing()) {
traceScope = Trace.startSpan(RpcClientUtil.methodToTraceString(method));
}
ObjectWritable value;
try {
value = (ObjectWritable)
client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
remoteId, fallbackToSimpleAuth);
} finally {
if (traceScope != null) traceScope.close();
}
if (LOG.isDebugEnabled()) {
long callTime = Time.now() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
}
WritableRpcInvoker在服務端的被呼叫,主要功能包括獲取傳過來的Invocation物件,呼叫程式碼並將返回值返回。