Hadoop RPC Server基於Reactor模式和Java NIO 的架構和原理
Hadoop RPC遠端過程呼叫的高效能和高併發性是Hadoop高效能、高併發性的根本保證。尤其是作為Master/Slave結構的Hadoop設計,比如HDFS NameNode 或者 Yarn ResourceManager這種master型別的節點,它們以RPC Server的身份,需要併發處理大量的RPC Client請求,比如,Yarn的ResourceManager,需要處理來自NodeManager、ApplicationMaster的基於各種協議的RPC請求,這些請求併發、隨機且請求量巨大,ResourceManager必須做到高併發和穩定性。那麼,ResourceManager基於怎樣的設計,才達到了這樣的需求呢?
Hadoop的RPC服務端的核心實現是ipc.Server, 這是一個抽象類 ,但是已經實現了RPC Server的所有執行角色,唯一抽象方法是call(),用來進行最後的請求處理,顯然,實際的處理需要交付給具體的ipc.Server的實現類進行處理,各個請求處理方式不同。
ipc.Server基於Reactor設計模式,是RPC Server高效的根本原因。
1.Reactor設計模式概覽
先來看看標準Reactor設計模式的構成:
Reactor模式的基本組成:
- Reactor:I/O事件的派發者
- Acceptor:接收來自Client的連線,建立與Client對應的Handler,並向Reactor註冊Handler
- Handler:與Client進行通訊的通訊實體,按照一定的過程實現業務處理。Handler內部往往會有更進一步的層次劃分,用來抽象reader、decode、compute、encode、send等過程。由於業務處理流程可能會被分散的I/O過程打破,所以Handler需要有適當的機制儲存上下文,並在下一次I/O 到來的時候恢復上下文。
- Reader/Sender:為了加速資料處理,Reactor設計模式會構建一個存放資料處理執行緒的執行緒池。資料讀出以後,立即扔給執行緒池即可。因此, Handler中的讀和寫兩個事件被單獨分離出來, 由對應的Reader和Sender進行單獨處理。
這是Reactor模式的通用角色,在ipc.Server中的Reactor模式的具體實現與之非常相近:
- Listener執行緒:單執行緒,負責建立伺服器監聽,即負責處理SelectionKey.OP_ACCEPT事件,一旦對應事件發生,就呼叫doAccept()方法對事件進行處理,處理方法其實只是將對應的channel封裝成Connection,Reader.getReader()負責選出一個Reader執行緒,然後把這個新的請求交付給這個Reader物件(新增到這個物件的pendingConnections佇列)。getReader()選擇Reader執行緒的方式為簡單輪詢。
- Reader執行緒:多執行緒,由Listener執行緒建立並管理,通過doRunLoop()方法,反覆從自己的pendingConnections 中取出連線通道,註冊到自己的readSelector,處理SelectionKey.OP_READ事件,一旦對應事件發生,則呼叫doRead()進行處理。doRead()的實際工作,是從請求頭中提取諸如callId、retry、rpcKind,生成對應的Call物件,放入callQueue中,callQueue 佇列將由Handler進行處理。
- Call物件:封裝了RPC請求的資訊,包括callId、retryCount、rpcKink(RPC.rpcKind)、clientId、connection資訊。Reader執行緒建立了Call物件,封裝了請求資訊,交付給下面的Handler執行緒。此後,資訊在Reator的不同角色之間的傳遞都封裝在了Call物件中,包括請求、響應。
- Handler執行緒:Handler的總體職責是取出Call物件中的使用者請求,對請求進行處理並拿到response,然後將response封裝在Call中,交付給Responder進行響應。
- Responder執行緒:單執行緒,內部有一個Selector物件,負責監聽writeSelector上的SelectionKey.OP_WRITE,將response通過對應的連線返回給客戶端。後面我會詳細介紹到,並不是所有的寫都是Responder進行的,有一部分是Handler直接進行的:Handler在將響應交付給Responder之前,會檢查當前連線上的響應是否只有當前一個,如果是,就會嘗試在自己的當前執行緒中直接把響應傳送出去,如果發現響應很多,或者這個響應無法完全傳送給遠端客戶端,才會將剩餘任務交付給Responder進行。
為了更好了解各個不同角色的分工,我們從原始碼入手,來分析各個角色都幹了什麼。
2.RPC總服務啟動
我在多篇部落格中都提到了Hadoop的服務化設計思想,即把某些功能模組抽象為服務,進而抽象出init()、start()、stop()等方法,同時,某個服務還有多個子服務,某個服務啟動的標記,是所有子服務啟動完畢。ipc.Server
也被抽象為服務,通過start()
方法啟動服務,即啟動Responder子服務、Listener子服務和Handler子服務:
/** Starts the service. Must be called before any calls will be handled. */
public synchronized void start() {
//Responder、Listener和Handler都是執行緒,start就是呼叫Thread.start()啟動執行緒
responder.start();
listener.start();
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start();
}
}
3.Listener
Listener直接定義為ipc.Server
的內部類,因為這個類只會被ipc.Server所使用到。
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// Bind the server socket to the local host and port
//將channel繫結到固定到ip和埠號
bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();
readers = new Reader[readThreads];
for (int i = 0; i < readThreads; i++) {
Reader reader = new Reader(
"Socket Reader #" + (i + 1) + " for port " + port);
readers[i] = reader;
reader.start();
}
// Register accepts on the server socket with the selector.
//在當前這個server socket上的selector註冊accept事件
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}
從程式碼裡面可以看到,Hadoop RPC的網路通訊基於java NIO構建。NIO的顯著特性,就是用有限的或者很少的執行緒,實現大量的網路請求的同時處理,網路請求處理的效率很高。
Listener的構造方法主要負責RPC客戶端的建立連線請求 ,建立請求通道,讓selector在這個channel上 註冊SelectionKey.OP_ACCEPT事件,也就是建立連線請求都會被Listener執行緒處理。Listener是一個Thread , run()方法為:
public void run() {
LOG.info(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
connectionManager.startIdleScan();
while (running) {
SelectionKey key = null;
try {
getSelector().select();
Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable()) //一個新的socket連線請求是否被接受
doAccept(key);//執行ACCEPT對應的處理邏輯
}
} catch (IOException e) {
//.....
}
key = null;
}
} catch (OutOfMemoryError e) {
//......
} catch (Exception e) {
closeCurrentConnection(key, e);
}
}
LOG.info("Stopping " + Thread.currentThread().getName());
//....
//關閉連線操作
}
}
迴圈監聽這個通道上的OP_ACCEPT事件,如果是建立連線請求(
SelectionKey.isAcceptable()),就交付給
doAccept()`進行處理:
/**
* 執行接受新的socket的連線請求的邏輯
*/
void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
//非關鍵程式碼 略
Reader reader = getReader(); //採用輪詢方式在眾多的reader中取出一個reader進行處理
Connection c = connectionManager.register(channel);
// If the connectionManager can't take it, close the connection.
if (c == null) {
if (channel.isOpen()) {
IOUtils.cleanup(null, channel);
}
continue;
}
//將這個封裝了對應的SocketChannel的Connection物件attatch到當前這個SelectionKey物件上
//這樣,如果這個SelectionKey物件對應的Channel有讀寫事件,就可以從這個SelectionKey上取出
//Connection,獲取到這個Channel的相關資訊
key.attach(c); // so closeCurrentConnection can get the object
//將當前的connection新增給reader的connection佇列,reader將會依次從佇列中取出連線進行處理
reader.addConnection(c);
}
}
doAccept()
方法,從自己管理的多個Reader中通過Round Robin方式獲取一個Reader來處理,通過reader.addConnection(c)
將這個Connection物件新增到Reader物件所維護的一個連線佇列pendingConnections
中,Listener此次任務即可結束。此後,這個channel上的讀與寫任務將一直固定由這個分派給自己的Reader直接負責
,而不會被其它Reader執行緒處理。注意,Connection是對NIO SocketChannel的封裝,它們一一對應。
4.Reader
Reader是Listener的內部類,在Listener的建構函式中可以看到:
readers = new Reader[readThreads];//readThreads個Reader進行處理
for (int i = 0; i < readThreads; i++) {
Reader reader = new Reader(
"Socket Reader #" + (i + 1) + " for port " + port);
readers[i] = reader;
reader.start();
}
Listener會建立一個Reader 執行緒的陣列。上面已經說過,收到ACCEPT
請求以後,其實是通過Round-Robin選出一個Reader進行處理。來看Reader 的處理方式:
private synchronized void doRunLoop() {
while (running) {
SelectionKey key = null;
try {
// consume as many connections as currently queued to avoid
// unbridled acceptance of connections that starves the select
int size = pendingConnections.size();
for (int i=size; i>0; i--) {
Connection conn = pendingConnections.take();
conn.channel.register(readSelector, SelectionKey.OP_READ, conn);//向Selector註冊OP_READ
}
readSelector.select();
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid()) {
if (key.isReadable()) {
doRead(key);
}
}
key = null;
}
} catch (InterruptedException e) {
//....
}
}
}
從pendingConnections中取出Listener交付給自己的連線請求,從請求中取出通道,將自己的readSelector註冊到通道上,並監聽SelectionKey.OP_READ
。這樣,Reader就可以開始處理該通道上的SelectionKey.OP_READ
事件,即客戶端已經可以通過這個RPC連線,向伺服器端傳送訊息。Reader.doRead()
方法負責處理訊息:
void doRead(SelectionKey key) throws InterruptedException {
int count = 0;
Connection c = (Connection)key.attachment();
//.....
try {
count = c.readAndProcess();
} catch (InterruptedException ieo) {
//.....
}
//....
}
/**
* 處理當前的連線請求
*/
public int readAndProcess()
throws WrappedRpcServerException, IOException, InterruptedException {
while (true) {
int count = -1;
if (dataLengthBuffer.remaining() > 0) {
count = channelRead(channel, dataLengthBuffer);
/**
* 正常情況下dataLengthBuffer.reamaining()應該剛好為0,也就是讀取到的剛好是四個位元組的head RpcConstant.HEADER()
* 如果count < 0 || dataLengthBuffer.remaining() > 0,則已經出現異常,直接返回
*/
if (count < 0 || dataLengthBuffer.remaining() > 0)
return count;
}
if (!connectionHeaderRead) { //如果還沒有讀到連線的header資訊,第一次進入迴圈,肯定是false
//Every connection is expected to send the header.
if (connectionHeaderBuf == null) {
connectionHeaderBuf = ByteBuffer.allocate(3);//分配空閒ByteBuffer
}
count = channelRead(channel, connectionHeaderBuf);//從channel中讀取Header資訊到connectionHeaderBuf
if (count < 0 || connectionHeaderBuf.remaining() > 0) {
return count;//如果ByteBuffer還有剩餘,說明讀取出現了異常情況,退出
}
int version = connectionHeaderBuf.get(0);//第一個位元組,版本資訊
// TODO we should add handler for service class later
this.setServiceClass(connectionHeaderBuf.get(1));//第二個位元組,serviceClass
dataLengthBuffer.flip();//準備開始讀取dataLengthBuffer中的資訊
//檢測使用者錯誤地往這個ipd地址上傳送了一個get請求
if (HTTP_GET_BYTES.equals(dataLengthBuffer)) {
setupHttpRequestOnIpcPortResponse();
return -1;
}
//一個合法的RPC請求的請求頭應該是hrpc四個位元組,VERSION= 9
if (!RpcConstants.HEADER.equals(dataLengthBuffer)
|| version != CURRENT_VERSION) {
//請求不合法,返回異常,程式碼略
}
// this may switch us into SIMPLE
//獲取授權型別,none或者SALS
authProtocol = initializeAuthContext(connectionHeaderBuf.get(2));
dataLengthBuffer.clear(); //clear方法並不清除資料,而是將position 設定為0,capacity和limit都設定為capacity
connectionHeaderBuf = null;
connectionHeaderRead = true;
continue;//如果當前讀取到的是header,則繼續while迴圈,讀取到的應該是資料長度欄位
}
//開始讀取資料長度欄位
if (data == null) {
dataLengthBuffer.flip();
dataLength = dataLengthBuffer.getInt();
checkDataLength(dataLength);
//根據資料長度初始化data,用來裝載資料本身
data = ByteBuffer.allocate(dataLength);
}
//讀取資料到data中
count = channelRead(channel, data);
//由於data是按照訊息頭中的資料長度描述值建立的大小,因此當data.remaining() == 0,則已經讀取完了所有的資料,可以開始進行處理了
if (data.remaining() == 0) {
dataLengthBuffer.clear();
data.flip();
boolean isHeaderRead = connectionContextRead;
processOneRpc(data.array());//開始解析RPC請求,將請求交付給具體的處理器類
data = null;
if (!isHeaderRead) {
continue;
}
}
return count;
}
}
readAndProcess()
方法負責對RPC請求頭進行提取、分析、校驗和處理,這裡,我們做一下詳細分析,有助於我們理解基於protobuf協議的RPC的一些執行機制。
RPC訊息頭欄位的含義如下:
注意,這裡將header分為兩個:
- RPC Header:RPC協議本身的頭資訊,與具體業務無關。RPC的訊息體中的資料對RPC Header來說就是一堆二進位制,如同TCP頭不需要關心TCP訊息體中攜帶了什麼訊息一樣;
- 業務 Header:業務本身的頭資訊。假如我們使用的是基於Protobuf協議的RPC,那麼,RPC訊息的訊息體就包含具體的業務頭資訊和基於protobuf協議的訊息體。
下面我來解釋一下RPC訊息的第0-10個位元組,這10個位元組存放的是RPC Header。從第11個位元組開始,就是RPC訊息體,包含了具體的業務頭資訊以及業務訊息體。
0-3位元組:存放固定字元hrpc
,作為RPC的標記
/**
* The first four bytes of Hadoop RPC connections
*/
public static final ByteBuffer HEADER = ByteBuffer.wrap(“hrpc”.getBytes
(Charsets.UTF_8));
第4位元組:版本資訊,RPC Server將版本資訊hard code在程式碼中: public static final byte CURRENT_VERSION = 9;
,任何RPC請求都會比較這個版本資訊與CURRENT_VERSION
是否一致,如果不一致,則返回版本不一致的響應資訊
第5位元組:整數,作為這個連線的serviceClass,但是我在hadoop程式碼中沒有找到對serviceClass的使用,應該是出於版本迭代等原因,現在已經沒有任何作用。
第6位元組:authType,授權型別,略過
第7-10位元組:資料長度欄位,讀取到該欄位的值以後,會建立該長度的ByteBuffer以接收RPC訊息體
第11欄位以後:RPC訊息體
瞭解了RPC訊息頭的基本結構,我們一起來看程式碼中是如何對RPC訊息頭進行提取、解析、校驗的。基本步驟如下:
提取訊息流的前四個位元組,
count = channelRead(channel, dataLengthBuffer)
是第一次讀取,看this.dataLengthBuffer = ByteBuffer.allocate(4)
知道它是一個4位元組陣列,這4個位元組是RPC標記字元hrpc
;讀取3個位元組的HEADER資訊,分別記錄了版本資訊、本次連線的serviceClass和授權型別資訊;
判斷協議版本合法性以及頭四個位元組的合法性,包括前四個位元組是否是規定的
hrpc
以及版本號是否與服務端一致;繼續獲取4個位元組的資訊,這四個位元組的資訊是一個整數,代表了本次訊息的訊息體的長度。從程式碼中可以看到,讀取訊息長度資訊以後,會對訊息長度資訊進行校驗,如果校驗成功,則建立一個長度為
dataLength
的ByteArray data
,用來存放訊息體。- 讀取RPC訊息體,放入
ByteArray data
中 - 通過
processOneRpc(data.array());
,對RPC訊息體進行解析,如果是基於protobuf協議的RPC,那麼這個RPC訊息體就包括protobuf的訊息頭和protobuf的訊息體。
private void processOneRpc(byte[] buf)
throws IOException, WrappedRpcServerException, InterruptedException {
int callId = -1;
int retry = RpcConstants.INVALID_RETRY_COUNT;
try {
final DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
//對protobuf的資料進行解碼操作,protobuf客戶端在傳送前的encode與接收端接收後的decod是一正一反的過程
final RpcRequestHeaderProto header =
decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);
callId = header.getCallId();//獲取callId,其實是本次互動的序列號資訊,對本次請求的response中會攜帶序列號,以便客戶端分辨對響應進行識別
retry = header.getRetryCount();//獲取重試次數字段,傳送響應的時候,如果發生錯誤,會根據該欄位進行有限次重試
//檢查業務頭資訊
checkRpcHeaders(header);
//callId<0意味著連線、認證尚未正確完成,因此需要進行連線有關的操作
if (callId < 0) { // callIds typically used during connection setup
processRpcOutOfBandRequest(header, dis);
} else if (!connectionContextRead) {
throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"Connection context not established");
} else {
processRpcRequest(header, dis);//校驗正常,開始處理RPC請求
}
} catch (WrappedRpcServerException wrse) { // inform client of error
//發生異常,立刻響應error ,程式碼略
}
}
processOneRpc()
的引數byte[] buf
是RPC訊息體,如果是基於目前最流行的protbuf協議的RPC,那麼這個訊息體就是經過protobuf協議序列化(encode)的訊息。因此,processOneRpc()
會對這個訊息通過decodeProtobufFromStream()
進行decode操作,解析出protobuf頭資訊,放入RpcRequestHeaderProto header
中。
decode完畢以後,會通過checkRpcHeaders()
對protobuf訊息頭中的頭資訊進行校驗,主要是校驗RPC_OPERATION
和RPC_KIND
是否合法。RPC_OPERATION
目前只支援RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET
,否則認為非法。
當基於protobuf協議的RPC訊息體被成功地decode,同時,decode出來的訊息中的頭資訊經過了校驗,則開始呼叫processRpcRequest(RpcRequestHeaderProto header,DataInputStream dis)
對訊息進行處理,它的核心任務,是對資料進行解析,封裝成Call物件,放到callQueue
中。Handler執行緒將從callQueue
中取出請求,並進行處理和響應:
private void processRpcRequest(RpcRequestHeaderProto header,
DataInputStream dis) throws WrappedRpcServerException,
InterruptedException {
//獲取RPC型別,目前主要有兩種RPC型別有WritableRPC 和ProtobufRPC
//老版本的Hadoop使用WritableRPC,新版本的Hadoop開始使用基於Protobuf協議的RPC,即ProtobufRPC
//以ProtobufRpcEngine為例,對應的WrapperClass是ProtobufRpcEngine.RpcRequestWrapper
//提取並例項化wrapper class,用來解析請求中的具體欄位
Class<? extends Writable> rpcRequestClass =
getRpcRequestWrapper(header.getRpcKind());
if (rpcRequestClass == null) {
//無法從header中解析出對應的RPCRequestClass,丟擲異常
}
Writable rpcRequest;
try { //Read the rpc request
//可以將rpcRequestClass理解為當前基於具體某個序列化協議的直譯器,直譯器負責解釋
//和解析請求內容,封裝為rpcRequest物件
rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
rpcRequest.readFields(dis);
} catch (Throwable t) { // includes runtime exception from newInstance
//資料解析發生異常,則丟擲異常
}
//略
//根據請求中提取的callId、重試次數、當前的連線、RPC型別、發起請求的客戶端ID等,建立對應的Call物件
Call call = new Call(header.getCallId(), header.getRetryCount(),
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
header.getClientId().toByteArray(), traceSpan);
//將Call物件放入callQueue中,Handler執行緒將負責從callQueue中逐一取出請求並處理
callQueue.put(call); // queue the call; maybe blocked here
incRpcCount(); // Increment the rpc count
}
這裡注意區分Call物件和Connection物件的關係:Connection是對一個SocketChannel的封裝,即代表了一個連線。一個Call是這個Connection之上的一次請求,可見,Connection和Call是一對多的關係,如下圖:
5.Handler
上文提到,在ipc.Server.start()
方法中,建立了一個Handler陣列並將這些Handler一一進行啟動。其實是呼叫Handler作為一個執行緒的Thread.start()方法,因此我們來看Handler執行緒的 run()
方法:
public void run() {
SERVER.set(Server.this);
ByteArrayOutputStream buf =
new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
while (running) {
try {
//從callQueue中取出Call物件,Call物件封裝了請求的所有資訊,包括連線物件、序列號等等資訊
final Call call = callQueue.take(); // pop the queue; maybe blocked here
//判斷這個請求對應是SocketChannel是否是open狀態,如果不是,可能客戶端已經斷開連線,沒有響應的必要
if (!call.connection.channel.isOpen()) {
LOG.info(Thread.currentThread().getName() + ": skipped " + call);
continue;
}
//略
CurCall.set(call);
try {
//call方法是一個抽象方法,實際執行的時候會呼叫具體實現類的call
value = call(call.rpcKind, call.connection.protocolName,
call.rpcRequest, call.timestamp);
} catch (Throwable e) {
//發生異常,根據異常的型別,設定異常的詳細資訊、返回碼等等
}
//服務端呼叫結束,即服務端已經完成了客戶端請求的相關操作,開始對響應進行設定,將響應傳送給客戶端
CurCall.set(null);
synchronized (call.connection.responseQueue) {
//將error資訊封裝在call物件中,responder執行緒將會處理這個Call物件,向客戶端返回響應
setupResponse(buf, call, returnStatus, detailedErr,
value, errorClass, error);
//將封裝了Error資訊或者成功呼叫的資訊的Call物件交付給Responder執行緒進行處理
responder.doRespond(call);
}
} catch (InterruptedException e) {
//異常資訊
} finally {
//略
}
LOG.debug(Thread.currentThread().getName() + ": exiting");
}
}
從程式碼中可以看到,Handler執行緒其實是一個事件分發器,一個用來連線Reader和Responder的快取器:Reader執行緒根據接收到的RPC請求封裝成Call物件,放入callQueue
中。Handler執行緒池中的Handler各自以競爭的方式,不斷從callQueue
中取出Call物件,呼叫call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp);
進行處理。這裡的call()
方法是ipc.Server
這個抽象類中的唯一抽象方法。
這裡可以聊一下為什麼ipc.Server是一個抽象方法,以及為什麼只有call()方法一個抽象方法:ipc.Server
設計為抽象類,是因為Hadoop的設計者不希望任何人修改ipc.Server
關於Reactor設計模式的架構和設計,即Hadoop的設計者認為基於Reactor設計模式的架構已經沒有修改的必要了,因此,關於Reactor模式的設計,直接在ipc.Server
進行了實現。但是,程序間通訊的方式有很多種,RPC(Remote Process Call,遠端過程呼叫)只是ipc(Inter-Process Communication,程序間通訊)的一種實現方式而已,因此,call()
方法宣告為抽象方法,讓具體的某種ipc實現類具體實現對某個請求的處理。我們來看Hadoop中ipc.Server
的RPC實現類RPC.Server
對call()方法的實現:
```
public Writable call(RPC.RpcKind rpcKind, String protocol,
Writable rpcRequest, long receiveTime) throws Exception {
return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
receiveTime);
}
```
RPC.Server.call()
會根據rpc型別,提取出對應的RpcInvoker
,實際呼叫Invoker.call()
方法進行處理。在我的兩篇拙文《Hadoop 基於protobuf 的RPC的客戶端實現原理》和《Hadoop 基於protobuf 的RPC的伺服器端實現原理》中詳細介紹了不同的RPC Engine通過註冊的方式向ipc.Server
註冊自己,因此RPC.Server就有了rpcKind和RpcInvoker的對應關係,這個註冊過程不再詳述。我們同樣以ProtobufRpcEngine為例,ProtobufRpcEngine啟動的時候會向RPC.Server註冊自己的ProtobufRpcEngine.Invoker ,即聲稱自己能夠處理protobuf這種rpcKind的請求。因此RPC.Server收到了,就可以根據請求中攜帶的rpcKind,取出ProtobufRpcEngine.Invoker
進行處理,即呼叫ProtobufRpcEngine.Invoker.call()
方法。
Handler呼叫完call()方法,將返回結果value
經過處理放回到Call物件中,然後呼叫responder.doResponse(call)
進行響應操作。下面講解Responder執行緒的時候會詳細講到,Responder通過呼叫responder.doResponse(call)
試圖在這個Connection上只有當前一個response的情況下,直接將response返回給客戶端而不麻煩Responder,如果不止當前一個響應,或者自己一次性無法將當前的response全部發送給遠端客戶端,才會交給Responder繼續進行。
6.Responder
Responder執行緒負責返回處理Selector上處於writable狀態的SelectionKey,然後執行寫操作,這個我們跟蹤Responder.run()的程式碼可以很清楚地看到,與Reader類似,這裡不做詳述。我們詳細
拋開寫操作的具體細節,我們知道,要想一個Selector可以監控一個channel是否是writable,這個channel必須得預先將自己註冊到Selector,這是在Handler.run()
裡面通過呼叫Responder.processResponse()
進行的:
private boolean processResponse(LinkedList<Call> responseQueue,
boolean inHandler) throws IOException {
try {
synchronized (responseQueue) {
//先進先出,因此從respondeQueue中取出第一個Call物件進行處理
call = responseQueue.removeFirst();//
SocketChannel channel = call.connection.channel;
//將call.rpcResponse中的資料寫入到channel中
int numBytes = channelWrite(channel, call.rpcResponse);
if (!call.rpcResponse.hasRemaining()) {//資料已經寫入完畢
//資料已經寫完,進行一個buffer的清理工作
} else {
//如果資料沒有完成寫操作,則把Call物件重新放進responseQueue中的第一個,下次會進行傳送剩餘資料
call.connection.responseQueue.addFirst(call);
//如果是inHandler,說明這個方法是Handler直接呼叫的,這時候資料沒有傳送完畢,需要將channel註冊到writeSelector, 這樣Responder.doRunLoop()中就可以檢測到這個writeSelector上的writable的SocketChannel,然後把剩餘資料傳送給客戶端
if (inHandler) {
// Wakeup the thread blocked on select, only then can the call
// to channel.register() complete.
writeSelector.wakeup();
//將channel註冊到writeSelector,同時將這個Call物件attach到這個SelectionKey物件,這樣Responder執行緒就可以通過select方法檢測到channel上的寫事件,同時從Call中提取需要寫的資料以及SocketChannel,進而進行寫操作
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
}
}
error = false; // everything went off well
}
}
return done;
}
從上面的程式碼可以看到,processResponse()
負責對某一個Connection的多個響應中取出第一個(遵循先進先出規則),然後把這個響應通過這個SocketChannel返回給客戶端。同時,我們看到,processResponse()的第二個引數inHandler,這個引數標記著這個processResponse()的呼叫者是否是Handler,因為從Handler.run()方法中可以看到,Handler執行緒在封裝好了響應結果Call物件以後,會試圖直接通過呼叫doRespond()進行響應:
void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);//將這個Call物件新增到對應的connection的responseQueue中
if (call.connection.responseQueue.size() == 1) {//如果目前與這個客戶端的連線的相應佇列中只有一條資料,則直接處理
//對這個connection的responseQueue進行處理,之所以設定第二個引數為true,是為了
//在Handler中呼叫doRespond方法的時候,由於是Handler,所以必定是一個新的請求過來,必須重新將channel註冊到在Responder.writerSelector上,以便下次響應
processResponse(call.connection.responseQueue, true);
}
}
}
Handler通過doRespond()
方法將Call物件新增到當前這個Connection的responseQueue
中,同時判斷responseQueue
是不是隻有當前一個response,如果是,則Handler會在自身執行緒中直接呼叫Responder.processResponse(call.connection.responseQueue, true);
直接響應,不必麻煩Responder執行緒。第二個引數inHandler=true
,用來標記這個processResponse()
方法是被Handler直接呼叫的,而不是在Responder執行緒裡的呼叫。這樣,如果響應資料只是一部分返回給了客戶端,那麼Handler會將這個socketChannel註冊到Responder.writeSelector並監聽SelectionKey.OP_WRITE
,這樣,Responder在對這個writeSelector進行輪詢的時候,會發現當前socketChannel是writable,並負責將Handler沒有傳送完成的剩餘資料響應給客戶端。而如果Handler直接把資料全部發完,就不用勞煩Responder了。
結束
以上就是Hadoop基於Reactor模式設計的ipc Server,無論是HDFS NameNode,還是Yarn ResourceManager,都是基於ipc.Server的實現類RPC.Server進行的實現。通過NIO的高效處理方式,NameNode和ResourceManager雖然是整個系統的核心,卻不會成為整個系統的瓶頸。一些耗時的IO操作,都交給具體的業務處理器進行處理,處理的過程中RPC.Server會繼續接收其它的RPC請求而不會block掉。當這些耗時的IO操作完成,只需要將結果交付給RPC.Server,RPC.Server將請求返回給用使用者。