hadoop3.0.0 原始碼閱讀之一:IPC Client部分
之前一直在看Hadoop原始碼,接下來打算好好的總結一下,先佔一個坑,先把之前註釋的程式碼發出來。如有不對,請大家指正。
一、RPC基礎概念
1.1 RPC的基礎概念
RPC,即Remote Procdure Call,中文名:遠端過程呼叫;
(1)它允許一臺計算機程式遠端呼叫另外一臺計算機的子程式,而不用去關心底層的網路通訊細節,對我們來說是透明的。因此,它經常用於分散式網路通訊中。
RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通訊程式之間攜帶資訊資料。在OSI網路通訊模型中,RPC跨越了傳輸層和應用層。RPC使得開發包括網路分散式多程式在內的應用程式更加容易。
(2)
因此,可以說Hadoop的執行就是建立在RPC基礎之上的。
1.2 RPC的顯著特點
(1)透明性:遠端呼叫其他機器上的程式,對使用者來說就像是呼叫本地方法一樣;
(2)高效能:RPC Server能夠併發處理多個來自Client的請求;
(3)可控性:jdk中已經提供了一個RPC框架—RMI,但是該PRC框架過於重量級並且可控之處比較少,所以Hadoop RPC實現了自定義的PRC框架。
(1)RPC採用了C/S的模式;
(2
(3)Server接收到這個請求以後,根據傳送過來的引數呼叫相應的程式,然後把自己計算好的結果傳送給Client端;
(4)Client端接收到結果後繼續執行;
1.4 Hadoop中的RPC機制(IPC)
同其他RPC框架一樣,Hadoop RPC分為四個部分:
(1)序列化層:Clent與Server端通訊傳遞的資訊採用了Hadoop裡提供的序列化類或自定義的Writable型別;
(2)函式呼叫層:Hadoop RPC通過動態代理以及java反射實現函式呼叫;
(3)網路傳輸層:Hadoop RPC採用了基於
(4)伺服器端框架層:RPC Server利用java NIO以及採用了事件驅動的I/O模型,提高RPC Server的併發處理能力;
1.5 Hadoop RPC設計技術
(1)動態代理
動態代理可以提供對另一個物件的訪問,同時隱藏實際物件的具體事實,代理物件對客戶隱藏了實際物件。目前Java開發包中提供了對動態代理的支援,但現在只支援對介面的實現。
(2)反射——動態載入類
(3)序列化
(4)非阻塞的非同步IO(NIO)
RPC是在分散式系統中必須要關注的,就是你在某一臺機器要呼叫其他機器上的函式的時候,就可以用RPC,使得這個函式呼叫就像呼叫本地函式一樣,你不需要擔心底層如何實現的,就跟TCP一樣, 上層呼叫無需關注下層實現。
Client的大致流程全在下面的程式碼中,你需要有的基礎知識(1)動態代理 (2)JAVA NIO 。
所有的RPC請求都會重定向,然後所有請求都會形成一個Call類,Call類會加到傳輸佇列中,然後會有一個執行緒獲取Call,並進行資料的傳輸呼叫,資料傳輸用的NIO。具體請看程式碼註釋。
/*
需要的知識:1動態代理 2.JAVA NIO
*/
/*
客戶端所有的方法呼叫都重定向到了Invoker.invoke()方法中,
所以分析IPC的連線建立與方法呼叫就從Invoker類開始。
所有的ipc代理最後都會呼叫這個invoke()方法
@proxy 需要被代理的協議
@method 需要被ipc的方法
@args 引數
*/
// ProtobufRpcEngine.invoke()
public Object invoke(Object proxy, final Method method, Object[] args)throws ServiceException {
long startTime = 0;
if (args.length != 2) { // RpcController + Message
throw new ServiceException("Too many parameters for request. Method: ["
+ method.getName() + "]" + ", Expected: 2, Actual: "
+ args.length);
}
if (args[1] == null) {
throw new ServiceException("null param while calling Method: ["
+ method.getName() + "]");
}
// if Tracing is on then start a new span for this rpc.
// guard it in the if statement to make sure there isn't
// any extra string manipulation.
Tracer tracer = Tracer.curThreadTracer();
TraceScope traceScope = null;
if (tracer != null) {
traceScope = tracer.newScope(RpcClientUtil.methodToTraceString(method));
}
//IPC產生訊息傳送頭,包括函式名,Protocol
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
//引數列表
Message theRequest = (Message) args[1];
//ipc呼叫的返回值
final RpcResponseWrapper val;
try {
/*
* 傳送rpc請求,等待返回結果
* */
val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
fallbackToSimpleAuth);
} finally {
if (traceScope != null) traceScope.close();
}
if (Client.isAsynchronousMode()) {
final AsyncGet<RpcResponseWrapper, IOException> arr
= Client.getAsyncRpcResponse();
final AsyncGet<Message, Exception> asyncGet
= new AsyncGet<Message, Exception>() {
@Override
public Message get(long timeout, TimeUnit unit) throws Exception {
return getReturnMessage(method, arr.get(timeout, unit));
}
@Override
public boolean isDone() {
return arr.isDone();
}
};
ASYNC_RETURN_MESSAGE.set(asyncGet);
return null;
} else {
return getReturnMessage(method, val);
}
}
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc respond.
*
* @param rpcKind
* @param rpcRequest - contains serialized method and method parameters
* @param remoteId - the target rpc server
* @param fallbackToSimpleAuth - set to true or false during this method to
* indicate if a secure client falls back to simple auth
* @returns the rpc response
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
*/
// Client.call()
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,
fallbackToSimpleAuth);
}
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc response.
*
* @param rpcKind
* @param rpcRequest - contains serialized method and method parameters
* @param remoteId - the target rpc server
* @param serviceClass - service class for RPC
* @param fallbackToSimpleAuth - set to true or false during this method to
* indicate if a secure client falls back to simple auth
* @returns the rpc response
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
*/
/*
* 產生一個 call,傳遞rcpRequest到由remoteId指定的IPC server,並且返回一個 rpc response
*
* */
// Client.call()
Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,ConnectionId remoteId, int serviceClass,AtomicBoolean fallbackToSimpleAuth) throws IOException {
//產生一個回撥例項
final Call call = createCall(rpcKind, rpcRequest);
//獲得連線,裡面包含握手,握手傳送了一些基本訊息
final Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);
try {
checkAsyncCall();
try {
connection.sendRpcRequest(call); // send the rpc request
}
}
if (isAsynchronousMode()) {
final AsyncGet<Writable, IOException> asyncGet
= new AsyncGet<Writable, IOException>() {
@Override
public Writable get(long timeout, TimeUnit unit)
throws IOException, TimeoutException{
boolean done = true;
try {
final Writable w = getRpcResponse(call, connection, timeout, unit);
if (w == null) {
done = false;
throw new TimeoutException(call + " timed out "
+ timeout + " " + unit);
}
return w;
} finally {
if (done) {
releaseAsyncCall();
}
}
}
@Override
public boolean isDone() {
synchronized (call) {
return call.done;
}
}
};
ASYNC_RPC_RESPONSE.set(asyncGet);
return null;
} else {
//返回rpc response
return getRpcResponse(call, connection, -1, null);
}
}
/** Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given ConnectionId are reused. */
// Client.Connetcion.getConnection()
private Connection getConnection(ConnectionId remoteId,Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)throws IOException {
Connection connection;
/* we could avoid this allocation for each RPC by having a
* connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok.
*/
while (true) {
// These lines below can be shorten with computeIfAbsent in Java8
connection = connections.get(remoteId);
if (connection == null) {
// 初始化connection資訊
connection = new Connection(remoteId, serviceClass);
// putIfAbsent
/**
* If the specified key is not already associated
* with a value, associate it with the given value.
* This is equivalent to
* <pre>
* if (!map.containsKey(key))
* return map.put(key, value);
* else
* return map.get(key);</pre>
* except that the action is performed atomically.
* .....
*/
//放入佇列中,執行緒安全的放入
Connection existing = connections.putIfAbsent(remoteId, connection);
if (existing != null) {
connection = existing;
}
}
// 在該connection中加入一個call,執行緒安全的加
if (connection.addCall(call)) {
break;
} else {
// This connection is closed, should be removed. But other thread could
// have already known this closedConnection, and replace it with a new
// connection. So we should call conditional remove to make sure we only
// remove this closedConnection.
connections.remove(remoteId, connection);
}
}
// If the server happens to be slow, the method below will take longer to
// establish a connection.
//設定連線IO流
connection.setupIOstreams(fallbackToSimpleAuth);
return connection;
}
/**
* Add a call to this connection's call queue and notify
* a listener; synchronized.
* Returns false if called during shutdown.
* @param call to add
* @return true if the call was added.
*/
/*
往這個連線中加入一個call, 並且喚醒Connection run執行緒的等待
*/
//Client.Connection.addCall()
private synchronized boolean addCall(Call call) {
if (shouldCloseConnection.get())
return false;
//加入calls傳送佇列彙總
calls.put(call.id, call);
//喚醒執行緒
notify();
return true;
}
/** Connect to the server and set up the I/O streams. It then sends
* a header to the server and starts
* the connection thread that waits for responses.
*/
/*
* 建立這個socket 的IO 流
* */
//Client.Connection.setupIOstreams()
private synchronized void setupIOstreams(AtomicBoolean fallbackToSimpleAuth) {
try {
Span span = Tracer.getCurrentSpan();
if (span != null) {
span.addTimelineAnnotation("IPC client connecting to " + server);
}
short numRetries = 0;
Random rand = null;
while (true) {
// 建立socket連線
setupConnection();
//獲得這個socket連線的輸入流
InputStream inStream = NetUtils.getInputStream(socket);
//獲得輸出流
OutputStream outStream = NetUtils.getOutputStream(socket);
// 寫 rpc 請求頭
/**
* Write the connection header - this is sent when connection is established
* +----------------------------------+
* | "hrpc" 4 bytes |
* +----------------------------------+
* | Version (1 byte) |
* +----------------------------------+
* | Service Class (1 byte) |
* +----------------------------------+
* | AuthProtocol (1 byte) |
* +----------------------------------+
*/
//第一次寫,寫header
writeConnectionHeader(outStream);
/*
private void writeConnectionHeader(OutputStream outStream)
throws IOException {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
// Write out the header, version and authentication method
// “hrpc”
out.write(RpcConstants.HEADER.array());
//version="9"
out.write(RpcConstants.CURRENT_VERSION);
out.write(serviceClass);
out.write(authProtocol.callId);
out.flush();
}
*/
// 都是以 Ping請求傳送的
if (doPing) {
inStream = new PingInputStream(inStream);
}
this.in = new DataInputStream(new BufferedInputStream(inStream));
// SASL may have already buffered the stream
if (!(outStream instanceof BufferedOutputStream)) {
outStream = new BufferedOutputStream(outStream);
}
this.out = new DataOutputStream(outStream);
//將connectionHeader傳送到服務埠
// 第二次寫
writeConnectionContext(remoteId, authMethod);
// update last activity time
touch();
span = Tracer.getCurrentSpan();
if (span != null) {
span.addTimelineAnnotation("IPC client connected to " + server);
}
// start the receiver thread after the socket connection has been set
// up
//開啟 connection執行緒,如果calls佇列中有call,就會去接受訊息
start();
return;
}
}
}
/*
傳送rpc 請求 第三次寫
*/
// Client.Connection.sendRpcRequest()
public void sendRpcRequest(final Call call)
throws InterruptedException, IOException {
if (shouldCloseConnection.get()) {
return;
}
// Serialize the call to be sent. This is done from the actual
// caller thread, rather than the sendParamsExecutor thread,
// so that if the serialization throws an error, it is reported
// properly. This also parallelizes the serialization.
//
// Format of a call on the wire:
// 0) Length of rest below (1 + 2)
// 1) RpcRequestHeader - is serialized Delimited hence contains length
// 2) RpcRequest
//
// Items '1' and '2' are prepared here.
final DataOutputBuffer d = new DataOutputBuffer();
/*
* call.rpcKind rpc引擎
*
* */
RpcRequestHeaderaderProto header = ProtoUtil.makeRpcRequestHeader(
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
clientId);
header.writeDelimitedTo(d);
call.rpcRequest.write(d);
// 同步鎖 sendRpcRequestLock
synchronized (sendRpcRequestLock) {
//為了後續的資料傳送
Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
// 傳送具體回撥函式給server端
@Override
public void run() {
try {
synchronized (Connection.this.out) {
if (shouldCloseConnection.get()) {
return;
}
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
// 獲取資料長度
byte[] data = d.getData();
int totalLength = d.getLength();
out.writeInt(totalLength); // Total Length
out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
out.flush();
}
} catch (IOException e) {
// exception at this point would leave the connection in an
// unrecoverable state (eg half a call left on the wire).
// So, close the connection, killing any outstanding calls
markClosed(e);
} finally {
//the buffer is just an in-memory buffer, but it is still polite to
// close early
IOUtils.closeStream(d);
}
}
});
try {
senderFuture.get();
}
}
}
//Client.Connection.run()
public void run() {
try {
// 等待receive的工作
while (waitForWork()) {//wait here for work - read or close connection
receiveRpcResponse();
}
}
close();
}
private void receiveRpcResponse() {
if (shouldCloseConnection.get()) {
return;
}
//修改最後一次活動時間
touch();
try {
//讀取資料頭
int totalLen = in.readInt();
/**
* Protobuf type {@code hadoop.common.RpcResponseHeaderProto}
*
* <pre>
**
* Rpc Response Header
* +------------------------------------------------------------------+
* | Rpc total response length in bytes (4 bytes int) |
* | (sum of next two parts) |
* +------------------------------------------------------------------+
* | RpcResponseHeaderProto - serialized delimited ie has len |
* +------------------------------------------------------------------+
* | if request is successful: |
* | - RpcResponse - The actual rpc response bytes follow |
* | the response header |
* | This response is serialized based on RpcKindProto |
* | if request fails : |
* | The rpc response header contains the necessary info |
* +------------------------------------------------------------------+
*
* Note that rpc response header is also used when connection setup fails.
* Ie the response looks like a rpc response with a fake callId.
* </pre>
*/
RpcResponseHeaderProto header =
RpcResponseHeaderProto.parseDelimitedFrom(in);
checkResponse(header);
int headerLen = header.getSerializedSize();
headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
//獲取callid號
int callId = header.getCallId();
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + callId);
//Rpc 回覆的狀態
RpcStatusProto status = header.getStatus();
//判斷返回的rpc response 狀態
if (status == RpcStatusProto.SUCCESS) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
// 移除這個callid
final Call call = calls.remove(callId);
// 設定返回值
call.setRpcResponse(value);
// verify that length was correct
// only for ProtobufEngine where len can be verified easily
if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {
ProtobufRpcEngine.RpcWrapper resWrapper =
(ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();
if (totalLen != headerLen + resWrapper.getLength()) {
throw new RpcClientException(
"RPC response length mismatch on rpc success");
}
}
} else { // Rpc Request failed
// Verify that length was correct
if (totalLen != headerLen) {
throw new RpcClientException(
"RPC response length mismatch on rpc error");
}
final String exceptionClassName = header.hasExceptionClassName() ?
header.getExceptionClassName() :
"ServerDidNotSetExceptionClassName";
final String errorMsg = header.hasErrorMsg() ?
header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
final RpcErrorCodeProto erCode =
(header.hasErrorDetail() ? header.getErrorDetail() : null);
if (erCode == null) {
LOG.warn("Detailed error code not set by server on rpc error");
}
RemoteException re = new RemoteException(exceptionClassName, errorMsg, erCode);
if (status == RpcStatusProto.ERROR) {
final Call call = calls.remove(callId);
call.setException(re);
} else if (status == RpcStatusProto.FATAL) {
// Close the connection
markClosed(re);
}
}
} catch (IOException e) {
markClosed(e);
}
}
相關推薦
hadoop3.0.0 原始碼閱讀之一:IPC Client部分
之前一直在看Hadoop原始碼,接下來打算好好的總結一下,先佔一個坑,先把之前註釋的程式碼發出來。如有不對,請大家指正。 一、RPC基礎概念 1.1 RPC的基礎概念 RPC,即Remote Procdure Call,中文名:遠端過程呼叫; (1)它允許一臺計算機
Bind-9.6.0-P1原始碼分析之一:整體架構(初稿)
轉載至:不知轉載源 一、 說明 這是bind解析程式的入口 事件bind程式也事件驅動型,以任務作為主要的執行。 當一個解析請求到來時,就會通過事件的產生來觸發任務dispatch處理。這樣的處理有相應 if (event->ev_action !
Zookeeper原始碼閱讀(九) ZK Client-Server(1)
前言 Watcher部分的程式碼量總的來說還是比較多的,但是整個邏輯流程還是相對來說比較清晰的。不過還是需要常在腦子裡過一過,zk的watcher的相關的架構的設計還是挺精妙的。 從這一篇起開始說ZK client端-server端互動相關的程式碼,主要是從client本身,client和server的連
Zookeeper原始碼閱讀(九) ZK Client-Server(2)
前言 前面一篇部落格主要從大致流程的角度說了下client和server建立連線的流程,這篇和下一篇部落格會詳細的把上一篇不是很細緻的地方展開和補充。 初始化階段 初始化階段主要就是把Zookeeper類中比較重要的功能類例項化,前面對這個過程說的已經比較詳細了。這裡主要補充幾點: Client
Nginx原始碼閱讀(IPC)
共享記憶體 /* src/os/unix/ngx_shmem.h */ typedef struct { u_char *addr; // 地址 size_t size; // 長度 ngx_str_t
Mybatis原始碼閱讀之一
Spring中使用Mybatis-spring。 Spring版本4.3
從路由原理出發,深入閱讀理解react-router 4.0的原始碼
react-router等前端路由的原理大致相同,可以實現無重新整理的條件下切換顯示不同的頁面。路由的本質就是頁面的URL發生改變時,頁面的顯示結果可以根據URL的變化而變化,但是頁面不會重新整理。通過前端路由可以實現單頁(SPA)應用,本文首先從前端路由的原
hbase-2.1.0 原始碼閱讀
1.Hmaster啟動 用了這麼久的hbase,今天開始著手hbase方面的原始碼閱讀 2.1.0版本剛釋出不久,是Hbase 2.x系列的第二次版本。旨在提高 HBase 的穩定性和可靠性,主要更新內容如下: 基於 Procedure v2 的複製對等修改 串
Spring原始碼閱讀(5.1.0版本)——Contextloaderlistener
目錄 前言 結論 原始碼解析 前言 上了大三了,逐漸想保研,現在一邊準備比賽,一邊學習新知識,一邊做著專案,希望自己能扛下去吧,這篇部落格的原始碼來自spring 5.1.0版本,如有錯誤,歡迎指出 結論 Contextloaderlistener幹了下
Spring和Spring Boot2.0原始碼閱讀環境搭建和結構
一:Spring 第一步:安裝git和gradle,以及Java8,並配置環境變數。 第二步:開啟gitbash,cd到你要放置spring原始碼工程的目錄,輸入:git clone https://github.com/spring-projects/sprin
Android7.0原始碼分析之Binder——Client分析
Binder Client分析,咋一看,就那麼四個關鍵方法:getService()、addService()、checkService()、listServices()。四個方法原理都差不多,以下僅
AFNetWorking3.2.0原始碼閱讀-AFURLSessionManager(二)
AFNetWorking3.2.0原始碼閱讀-AFURLSessionManager(二) AFURLSessionManager.m 檔案內容解析 Define static dispatch_queue_t url_session_manager_creation_qu
AFNetWorking3.2.0原始碼閱讀(一)-AFURLSessionManager.h
AFNetWorking3.2.0原始碼閱讀(一)-AFURLSessionManager.h AFNetWorking3.2.0原始碼閱讀(一)-AFURLSessionManager.h AFURLSessionManager.h 介紹
muduo2.0原始碼閱讀記錄
花了20天的時間讀了陳碩先生的《Linux多執行緒服務端程式設計》一書的前8章。當然,每天閱讀的時間並不算多,中間有些部分也反反覆覆看了幾遍,最後也算是能勉強接受作者傳授的知識。配合書把muduo2.0網路部分的程式碼和日誌庫程式碼細讀了一遍,這也算是個人第一次較為深入地去讀取一個開源專案原始碼。通過書和原始
hadoop異常:org.apache.hadoop.ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8031
錯誤資訊: 2014-01-01 23:07:09,365 INFO org.apache.hadoop.ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8031. Already tried 9 time(s
spring原始碼閱讀(5.1.0版本)——AbstractBeanDefinition
目錄 原始碼解析 繼承結構 定義的常量 屬性 總結 什麼是AbstractBeanDefinition AbstractBeanDefinition直接繼承BeanDefiniton,實現了BeanDefinition定義的一系列操作,定義了描述
spring原始碼閱讀(5.1.0)——DefaultSingletonBeanRegistry
目錄 前言 程式碼 屬性 總結 前言 看原始碼才知道spring真的是一個大傢伙,之前不注重方法,看的稀裡糊塗的,這裡寫一點自己看框架原始碼的感想,一個框架通常有許多介面定義,繼承體系比較複雜,首先需要了解框架原始碼的組
Vue2.0原始碼閱讀筆記(四):nextTick
在閱讀 nextTick 的原始碼之前,要先弄明白 JS 執行環境執行機制,介紹 JS 執行環境的事件迴圈機制的文章很多,大部分都闡述的比較籠統,甚至有些文章說的是錯誤的,以下為個人理解,如有錯誤,歡迎指正。 一、瀏覽器中的程序與執行緒 以 chorme 瀏覽器為例,瀏覽器中的每個頁面都是一個獨立的程
【Spark2.0源碼學習】-6.Client啟動
rms permsize wrapper 2.0 proxy waiting 默認 說明 加載器 Client作為Endpoint的具體實例,下面我們介紹一下Client啟動以及OnStart指令後的額外工作 一、腳本概覽 下面是一個舉例: /opt
redis client 2.0.0 pipeline 的list的rpop bug
edi resp response pub pop clas space finally void 描寫敘述: redis client 2.0.0 pipeline 的list的rpop 存在嚴重bug,rpop list的時候,假設list已經為空的時候,rpop出