1. 程式人生 > >Hadoop RPC Server基於Reactor模式和Java NIO 的架構和原理

Hadoop RPC Server基於Reactor模式和Java NIO 的架構和原理

Hadoop RPC遠端過程呼叫的高效能和高併發性是Hadoop高效能、高併發性的根本保證。尤其是作為Master/Slave結構的Hadoop設計,比如HDFS NameNode 或者 Yarn ResourceManager這種master型別的節點,它們以RPC Server的身份,需要併發處理大量的RPC Client請求,比如,Yarn的ResourceManager,需要處理來自NodeManager、ApplicationMaster的基於各種協議的RPC請求,這些請求併發、隨機且請求量巨大,ResourceManager必須做到高併發和穩定性。那麼,ResourceManager基於怎樣的設計,才達到了這樣的需求呢?
Hadoop的RPC服務端的核心實現是ipc.Server, 這是一個抽象類 ,但是已經實現了RPC Server的所有執行角色,唯一抽象方法是call(),用來進行最後的請求處理,顯然,實際的處理需要交付給具體的ipc.Server的實現類進行處理,各個請求處理方式不同。
ipc.Server基於Reactor設計模式,是RPC Server高效的根本原因。

1.Reactor設計模式概覽

先來看看標準Reactor設計模式的構成:

Reactor模式的基本組成:

  • Reactor:I/O事件的派發者
  • Acceptor:接收來自Client的連線,建立與Client對應的Handler,並向Reactor註冊Handler
  • Handler:與Client進行通訊的通訊實體,按照一定的過程實現業務處理。Handler內部往往會有更進一步的層次劃分,用來抽象reader、decode、compute、encode、send等過程。由於業務處理流程可能會被分散的I/O過程打破,所以Handler需要有適當的機制儲存上下文,並在下一次I/O 到來的時候恢復上下文。
  • Reader/Sender:為了加速資料處理,Reactor設計模式會構建一個存放資料處理執行緒的執行緒池。資料讀出以後,立即扔給執行緒池即可。因此, Handler中的讀和寫兩個事件被單獨分離出來, 由對應的Reader和Sender進行單獨處理。

這是Reactor模式的通用角色,在ipc.Server中的Reactor模式的具體實現與之非常相近:

這裡寫圖片描述

  • Listener執行緒:單執行緒,負責建立伺服器監聽,即負責處理SelectionKey.OP_ACCEPT事件,一旦對應事件發生,就呼叫doAccept()方法對事件進行處理,處理方法其實只是將對應的channel封裝成Connection,Reader.getReader()負責選出一個Reader執行緒,然後把這個新的請求交付給這個Reader物件(新增到這個物件的pendingConnections佇列)。getReader()選擇Reader執行緒的方式為簡單輪詢。
  • Reader執行緒:多執行緒,由Listener執行緒建立並管理,通過doRunLoop()方法,反覆從自己的pendingConnections 中取出連線通道,註冊到自己的readSelector,處理SelectionKey.OP_READ事件,一旦對應事件發生,則呼叫doRead()進行處理。doRead()的實際工作,是從請求頭中提取諸如callId、retry、rpcKind,生成對應的Call物件,放入callQueue中,callQueue 佇列將由Handler進行處理。
  • Call物件:封裝了RPC請求的資訊,包括callId、retryCount、rpcKink(RPC.rpcKind)、clientId、connection資訊。Reader執行緒建立了Call物件,封裝了請求資訊,交付給下面的Handler執行緒。此後,資訊在Reator的不同角色之間的傳遞都封裝在了Call物件中,包括請求、響應。
  • Handler執行緒:Handler的總體職責是取出Call物件中的使用者請求,對請求進行處理並拿到response,然後將response封裝在Call中,交付給Responder進行響應。
  • Responder執行緒:單執行緒,內部有一個Selector物件,負責監聽writeSelector上的SelectionKey.OP_WRITE,將response通過對應的連線返回給客戶端。後面我會詳細介紹到,並不是所有的寫都是Responder進行的,有一部分是Handler直接進行的:Handler在將響應交付給Responder之前,會檢查當前連線上的響應是否只有當前一個,如果是,就會嘗試在自己的當前執行緒中直接把響應傳送出去,如果發現響應很多,或者這個響應無法完全傳送給遠端客戶端,才會將剩餘任務交付給Responder進行。

為了更好了解各個不同角色的分工,我們從原始碼入手,來分析各個角色都幹了什麼。

2.RPC總服務啟動

我在多篇部落格中都提到了Hadoop的服務化設計思想,即把某些功能模組抽象為服務,進而抽象出init()、start()、stop()等方法,同時,某個服務還有多個子服務,某個服務啟動的標記,是所有子服務啟動完畢。ipc.Server也被抽象為服務,通過start()方法啟動服務,即啟動Responder子服務、Listener子服務和Handler子服務:

  /** Starts the service.  Must be called before any calls will be handled. */
  public synchronized void start() {
  //Responder、Listener和Handler都是執行緒,start就是呼叫Thread.start()啟動執行緒
    responder.start();
    listener.start();
    handlers = new Handler[handlerCount];
    for (int i = 0; i < handlerCount; i++) {
      handlers[i] = new Handler(i);
      handlers[i].start();
    }
  }

3.Listener

Listener直接定義為ipc.Server的內部類,因為這個類只會被ipc.Server所使用到。

    public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // Bind the server socket to the local host and port
      //將channel繫結到固定到ip和埠號
      bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();
      readers = new Reader[readThreads];
      for (int i = 0; i < readThreads; i++) {
        Reader reader = new Reader(
            "Socket Reader #" + (i + 1) + " for port " + port);
        readers[i] = reader;
        reader.start();
      }

      // Register accepts on the server socket with the selector.
      //在當前這個server socket上的selector註冊accept事件
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }

從程式碼裡面可以看到,Hadoop RPC的網路通訊基於java NIO構建。NIO的顯著特性,就是用有限的或者很少的執行緒,實現大量的網路請求的同時處理,網路請求處理的效率很高。
Listener的構造方法主要負責RPC客戶端的建立連線請求 ,建立請求通道,讓selector在這個channel上 註冊SelectionKey.OP_ACCEPT事件,也就是建立連線請求都會被Listener執行緒處理。Listener是一個Thread , run()方法為:

    public void run() {
      LOG.info(Thread.currentThread().getName() + ": starting");
      SERVER.set(Server.this);
      connectionManager.startIdleScan();
      while (running) {
        SelectionKey key = null;
        try {
          getSelector().select();
          Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                if (key.isAcceptable())  //一個新的socket連線請求是否被接受
                  doAccept(key);//執行ACCEPT對應的處理邏輯
              }
            } catch (IOException e) {
            //.....
            }
            key = null;
          }
        } catch (OutOfMemoryError e) {
          //......
        } catch (Exception e) {
          closeCurrentConnection(key, e);
        }
      }
      LOG.info("Stopping " + Thread.currentThread().getName()); 
        //....
        //關閉連線操作
      }
    }

迴圈監聽這個通道上的OP_ACCEPT事件,如果是建立連線請求(SelectionKey.isAcceptable()),就交付給doAccept()`進行處理:

    /**
     * 執行接受新的socket的連線請求的邏輯
     */
     void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) {
        //非關鍵程式碼 略
        Reader reader = getReader(); //採用輪詢方式在眾多的reader中取出一個reader進行處理
        Connection c = connectionManager.register(channel);
        // If the connectionManager can't take it, close the connection.
        if (c == null) {
          if (channel.isOpen()) {
            IOUtils.cleanup(null, channel);
          }
          continue;
        }
        //將這個封裝了對應的SocketChannel的Connection物件attatch到當前這個SelectionKey物件上
        //這樣,如果這個SelectionKey物件對應的Channel有讀寫事件,就可以從這個SelectionKey上取出
        //Connection,獲取到這個Channel的相關資訊
        key.attach(c);  // so closeCurrentConnection can get the object
        //將當前的connection新增給reader的connection佇列,reader將會依次從佇列中取出連線進行處理
        reader.addConnection(c);
      }
    }

doAccept()方法,從自己管理的多個Reader中通過Round Robin方式獲取一個Reader來處理,通過reader.addConnection(c)將這個Connection物件新增到Reader物件所維護的一個連線佇列pendingConnections中,Listener此次任務即可結束。此後,這個channel上的讀與寫任務將一直固定由這個分派給自己的Reader直接負責,而不會被其它Reader執行緒處理。注意,Connection是對NIO SocketChannel的封裝,它們一一對應。

4.Reader

Reader是Listener的內部類,在Listener的建構函式中可以看到:

      readers = new Reader[readThreads];//readThreads個Reader進行處理
      for (int i = 0; i < readThreads; i++) {
        Reader reader = new Reader(
            "Socket Reader #" + (i + 1) + " for port " + port);
        readers[i] = reader;
        reader.start();
      }

Listener會建立一個Reader 執行緒的陣列。上面已經說過,收到ACCEPT請求以後,其實是通過Round-Robin選出一個Reader進行處理。來看Reader 的處理方式:

private synchronized void doRunLoop() {
        while (running) {
          SelectionKey key = null;
          try {
            // consume as many connections as currently queued to avoid
            // unbridled acceptance of connections that starves the select
            int size = pendingConnections.size();
            for (int i=size; i>0; i--) {
              Connection conn = pendingConnections.take();
              conn.channel.register(readSelector, SelectionKey.OP_READ, conn);//向Selector註冊OP_READ
            }
            readSelector.select();
            Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
            while (iter.hasNext()) {
              key = iter.next();
              iter.remove();
              if (key.isValid()) {
                if (key.isReadable()) {
                  doRead(key);
                }
              }
              key = null;
            }
          } catch (InterruptedException e) {
            //....
          }
        }
      }

從pendingConnections中取出Listener交付給自己的連線請求,從請求中取出通道,將自己的readSelector註冊到通道上,並監聽SelectionKey.OP_READ。這樣,Reader就可以開始處理該通道上的SelectionKey.OP_READ事件,即客戶端已經可以通過這個RPC連線,向伺服器端傳送訊息。Reader.doRead()方法負責處理訊息:

void doRead(SelectionKey key) throws InterruptedException {
      int count = 0;
      Connection c = (Connection)key.attachment();
      //.....
      try {
        count = c.readAndProcess();
      } catch (InterruptedException ieo) {
        //.....
        }
      //....
      }
/**
     * 處理當前的連線請求
     */
    public int readAndProcess()
        throws WrappedRpcServerException, IOException, InterruptedException {
      while (true) {
        int count = -1;
        if (dataLengthBuffer.remaining() > 0) {
          count = channelRead(channel, dataLengthBuffer);    
        /**
         * 正常情況下dataLengthBuffer.reamaining()應該剛好為0,也就是讀取到的剛好是四個位元組的head RpcConstant.HEADER()
         * 如果count < 0 || dataLengthBuffer.remaining() > 0,則已經出現異常,直接返回
         */
          if (count < 0 || dataLengthBuffer.remaining() > 0) 
            return count; 
        }

        if (!connectionHeaderRead) { //如果還沒有讀到連線的header資訊,第一次進入迴圈,肯定是false
          //Every connection is expected to send the header.
          if (connectionHeaderBuf == null) {
            connectionHeaderBuf = ByteBuffer.allocate(3);//分配空閒ByteBuffer
          }
          count = channelRead(channel, connectionHeaderBuf);//從channel中讀取Header資訊到connectionHeaderBuf
          if (count < 0 || connectionHeaderBuf.remaining() > 0) {
            return count;//如果ByteBuffer還有剩餘,說明讀取出現了異常情況,退出
          }
          int version = connectionHeaderBuf.get(0);//第一個位元組,版本資訊
          // TODO we should add handler for service class later
          this.setServiceClass(connectionHeaderBuf.get(1));//第二個位元組,serviceClass
          dataLengthBuffer.flip();//準備開始讀取dataLengthBuffer中的資訊

          //檢測使用者錯誤地往這個ipd地址上傳送了一個get請求
          if (HTTP_GET_BYTES.equals(dataLengthBuffer)) {
            setupHttpRequestOnIpcPortResponse();
            return -1;
          }

          //一個合法的RPC請求的請求頭應該是hrpc四個位元組,VERSION= 9
          if (!RpcConstants.HEADER.equals(dataLengthBuffer)
              || version != CURRENT_VERSION) {
            //請求不合法,返回異常,程式碼略
          }

          // this may switch us into SIMPLE
          //獲取授權型別,none或者SALS
          authProtocol = initializeAuthContext(connectionHeaderBuf.get(2));          

          dataLengthBuffer.clear(); //clear方法並不清除資料,而是將position 設定為0,capacity和limit都設定為capacity 
          connectionHeaderBuf = null;
          connectionHeaderRead = true;
          continue;//如果當前讀取到的是header,則繼續while迴圈,讀取到的應該是資料長度欄位
        }

        //開始讀取資料長度欄位
        if (data == null) {
          dataLengthBuffer.flip();
          dataLength = dataLengthBuffer.getInt();
          checkDataLength(dataLength);
         //根據資料長度初始化data,用來裝載資料本身
          data = ByteBuffer.allocate(dataLength);
        }
        //讀取資料到data中
        count = channelRead(channel, data);

        //由於data是按照訊息頭中的資料長度描述值建立的大小,因此當data.remaining() == 0,則已經讀取完了所有的資料,可以開始進行處理了
        if (data.remaining() == 0) {
          dataLengthBuffer.clear();
          data.flip();
          boolean isHeaderRead = connectionContextRead;
          processOneRpc(data.array());//開始解析RPC請求,將請求交付給具體的處理器類
          data = null;
          if (!isHeaderRead) {
            continue;
          }
        } 
        return count;
      }
    }

readAndProcess()方法負責對RPC請求頭進行提取、分析、校驗和處理,這裡,我們做一下詳細分析,有助於我們理解基於protobuf協議的RPC的一些執行機制。
RPC訊息頭欄位的含義如下:
這裡寫圖片描述
注意,這裡將header分為兩個:

  1. RPC Header:RPC協議本身的頭資訊,與具體業務無關。RPC的訊息體中的資料對RPC Header來說就是一堆二進位制,如同TCP頭不需要關心TCP訊息體中攜帶了什麼訊息一樣;
  2. 業務 Header:業務本身的頭資訊。假如我們使用的是基於Protobuf協議的RPC,那麼,RPC訊息的訊息體就包含具體的業務頭資訊和基於protobuf協議的訊息體。

下面我來解釋一下RPC訊息的第0-10個位元組,這10個位元組存放的是RPC Header。從第11個位元組開始,就是RPC訊息體,包含了具體的業務頭資訊以及業務訊息體。

0-3位元組:存放固定字元hrpc,作為RPC的標記

/**
* The first four bytes of Hadoop RPC connections
*/
public static final ByteBuffer HEADER = ByteBuffer.wrap(“hrpc”.getBytes
(Charsets.UTF_8));

第4位元組:版本資訊,RPC Server將版本資訊hard code在程式碼中: public static final byte CURRENT_VERSION = 9;,任何RPC請求都會比較這個版本資訊與CURRENT_VERSION是否一致,如果不一致,則返回版本不一致的響應資訊
第5位元組:整數,作為這個連線的serviceClass,但是我在hadoop程式碼中沒有找到對serviceClass的使用,應該是出於版本迭代等原因,現在已經沒有任何作用。
第6位元組:authType,授權型別,略過
第7-10位元組:資料長度欄位,讀取到該欄位的值以後,會建立該長度的ByteBuffer以接收RPC訊息體
第11欄位以後:RPC訊息體

瞭解了RPC訊息頭的基本結構,我們一起來看程式碼中是如何對RPC訊息頭進行提取、解析、校驗的。基本步驟如下:

  1. 提取訊息流的前四個位元組,
    count = channelRead(channel, dataLengthBuffer)是第一次讀取,看this.dataLengthBuffer = ByteBuffer.allocate(4)知道它是一個4位元組陣列,這4個位元組是RPC標記字元hrpc

  2. 讀取3個位元組的HEADER資訊,分別記錄了版本資訊、本次連線的serviceClass和授權型別資訊;

  3. 判斷協議版本合法性以及頭四個位元組的合法性,包括前四個位元組是否是規定的hrpc以及版本號是否與服務端一致;

  4. 繼續獲取4個位元組的資訊,這四個位元組的資訊是一個整數,代表了本次訊息的訊息體的長度。從程式碼中可以看到,讀取訊息長度資訊以後,會對訊息長度資訊進行校驗,如果校驗成功,則建立一個長度為dataLengthByteArray data,用來存放訊息體。

  5. 讀取RPC訊息體,放入ByteArray data
  6. 通過processOneRpc(data.array());,對RPC訊息體進行解析,如果是基於protobuf協議的RPC,那麼這個RPC訊息體就包括protobuf的訊息頭和protobuf的訊息體。
private void processOneRpc(byte[] buf)
        throws IOException, WrappedRpcServerException, InterruptedException {
      int callId = -1;
      int retry = RpcConstants.INVALID_RETRY_COUNT;
      try {
        final DataInputStream dis =
            new DataInputStream(new ByteArrayInputStream(buf));
        //對protobuf的資料進行解碼操作,protobuf客戶端在傳送前的encode與接收端接收後的decod是一正一反的過程
        final RpcRequestHeaderProto header =
            decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);
        callId = header.getCallId();//獲取callId,其實是本次互動的序列號資訊,對本次請求的response中會攜帶序列號,以便客戶端分辨對響應進行識別
        retry = header.getRetryCount();//獲取重試次數字段,傳送響應的時候,如果發生錯誤,會根據該欄位進行有限次重試
        //檢查業務頭資訊
        checkRpcHeaders(header);
        //callId<0意味著連線、認證尚未正確完成,因此需要進行連線有關的操作
        if (callId < 0) { // callIds typically used during connection setup
          processRpcOutOfBandRequest(header, dis);
        } else if (!connectionContextRead) {
          throw new WrappedRpcServerException(
              RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
              "Connection context not established");
        } else {
          processRpcRequest(header, dis);//校驗正常,開始處理RPC請求
        }
      } catch (WrappedRpcServerException wrse) { // inform client of error
        //發生異常,立刻響應error ,程式碼略
      }
    }

processOneRpc()的引數byte[] buf是RPC訊息體,如果是基於目前最流行的protbuf協議的RPC,那麼這個訊息體就是經過protobuf協議序列化(encode)的訊息。因此,processOneRpc()會對這個訊息通過decodeProtobufFromStream()進行decode操作,解析出protobuf頭資訊,放入RpcRequestHeaderProto header中。
decode完畢以後,會通過checkRpcHeaders()對protobuf訊息頭中的頭資訊進行校驗,主要是校驗RPC_OPERATIONRPC_KIND是否合法。RPC_OPERATION目前只支援RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET,否則認為非法。
當基於protobuf協議的RPC訊息體被成功地decode,同時,decode出來的訊息中的頭資訊經過了校驗,則開始呼叫processRpcRequest(RpcRequestHeaderProto header,DataInputStream dis)對訊息進行處理,它的核心任務,是對資料進行解析,封裝成Call物件,放到callQueue中。Handler執行緒將從callQueue中取出請求,並進行處理和響應:

private void processRpcRequest(RpcRequestHeaderProto header,
        DataInputStream dis) throws WrappedRpcServerException,
        InterruptedException {
      //獲取RPC型別,目前主要有兩種RPC型別有WritableRPC 和ProtobufRPC
      //老版本的Hadoop使用WritableRPC,新版本的Hadoop開始使用基於Protobuf協議的RPC,即ProtobufRPC
      //以ProtobufRpcEngine為例,對應的WrapperClass是ProtobufRpcEngine.RpcRequestWrapper
     //提取並例項化wrapper class,用來解析請求中的具體欄位
        Class<? extends Writable> rpcRequestClass = 
          getRpcRequestWrapper(header.getRpcKind());
      if (rpcRequestClass == null) {
        //無法從header中解析出對應的RPCRequestClass,丟擲異常
      }

      Writable rpcRequest;
      try { //Read the rpc request
        //可以將rpcRequestClass理解為當前基於具體某個序列化協議的直譯器,直譯器負責解釋
        //和解析請求內容,封裝為rpcRequest物件
        rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
        rpcRequest.readFields(dis);
      } catch (Throwable t) { // includes runtime exception from newInstance
        //資料解析發生異常,則丟擲異常
      }
      //略

      //根據請求中提取的callId、重試次數、當前的連線、RPC型別、發起請求的客戶端ID等,建立對應的Call物件
      Call call = new Call(header.getCallId(), header.getRetryCount(),
          rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
          header.getClientId().toByteArray(), traceSpan);

      //將Call物件放入callQueue中,Handler執行緒將負責從callQueue中逐一取出請求並處理
      callQueue.put(call);              // queue the call; maybe blocked here
      incRpcCount();  // Increment the rpc count
    }

這裡注意區分Call物件和Connection物件的關係:Connection是對一個SocketChannel的封裝,即代表了一個連線。一個Call是這個Connection之上的一次請求,可見,Connection和Call是一對多的關係,如下圖:

Call物件和Connection物件之間的類關係圖

5.Handler

上文提到,在ipc.Server.start()方法中,建立了一個Handler陣列並將這些Handler一一進行啟動。其實是呼叫Handler作為一個執行緒的Thread.start()方法,因此我們來看Handler執行緒的 run()方法:

    public void run() {
      SERVER.set(Server.this);
      ByteArrayOutputStream buf = 
        new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
      while (running) {
        try {
          //從callQueue中取出Call物件,Call物件封裝了請求的所有資訊,包括連線物件、序列號等等資訊
          final Call call = callQueue.take(); // pop the queue; maybe blocked here
          //判斷這個請求對應是SocketChannel是否是open狀態,如果不是,可能客戶端已經斷開連線,沒有響應的必要
          if (!call.connection.channel.isOpen()) {
            LOG.info(Thread.currentThread().getName() + ": skipped " + call);
            continue;
          }
          //略
          CurCall.set(call);
          try {
             //call方法是一個抽象方法,實際執行的時候會呼叫具體實現類的call
             value = call(call.rpcKind, call.connection.protocolName, 
                                   call.rpcRequest, call.timestamp);

          } catch (Throwable e) {
            //發生異常,根據異常的型別,設定異常的詳細資訊、返回碼等等
          }
          //服務端呼叫結束,即服務端已經完成了客戶端請求的相關操作,開始對響應進行設定,將響應傳送給客戶端
          CurCall.set(null);
          synchronized (call.connection.responseQueue) {
            //將error資訊封裝在call物件中,responder執行緒將會處理這個Call物件,向客戶端返回響應
            setupResponse(buf, call, returnStatus, detailedErr, 
                value, errorClass, error);
            //將封裝了Error資訊或者成功呼叫的資訊的Call物件交付給Responder執行緒進行處理
            responder.doRespond(call);
          }
        } catch (InterruptedException e) {
          //異常資訊
        } finally {
          //略
      }
      LOG.debug(Thread.currentThread().getName() + ": exiting");
    }
  }

從程式碼中可以看到,Handler執行緒其實是一個事件分發器,一個用來連線Reader和Responder的快取器:Reader執行緒根據接收到的RPC請求封裝成Call物件,放入callQueue中。Handler執行緒池中的Handler各自以競爭的方式,不斷從callQueue中取出Call物件,呼叫call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp);進行處理。這裡的call()方法是ipc.Server這個抽象類中的唯一抽象方法。

這裡可以聊一下為什麼ipc.Server是一個抽象方法,以及為什麼只有call()方法一個抽象方法:ipc.Server設計為抽象類,是因為Hadoop的設計者不希望任何人修改ipc.Server關於Reactor設計模式的架構和設計,即Hadoop的設計者認為基於Reactor設計模式的架構已經沒有修改的必要了,因此,關於Reactor模式的設計,直接在ipc.Server進行了實現。但是,程序間通訊的方式有很多種,RPC(Remote Process Call,遠端過程呼叫)只是ipc(Inter-Process Communication,程序間通訊)的一種實現方式而已,因此,call()方法宣告為抽象方法,讓具體的某種ipc實現類具體實現對某個請求的處理。我們來看Hadoop中ipc.Server的RPC實現類RPC.Server對call()方法的實現:

```
public Writable call(RPC.RpcKind rpcKind, String protocol,
    Writable rpcRequest, long receiveTime) throws Exception {
  return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
      receiveTime);
}
```

RPC.Server.call()會根據rpc型別,提取出對應的RpcInvoker,實際呼叫Invoker.call()方法進行處理。在我的兩篇拙文《Hadoop 基於protobuf 的RPC的客戶端實現原理》《Hadoop 基於protobuf 的RPC的伺服器端實現原理》中詳細介紹了不同的RPC Engine通過註冊的方式向ipc.Server註冊自己,因此RPC.Server就有了rpcKind和RpcInvoker的對應關係,這個註冊過程不再詳述。我們同樣以ProtobufRpcEngine為例,ProtobufRpcEngine啟動的時候會向RPC.Server註冊自己的ProtobufRpcEngine.Invoker ,即聲稱自己能夠處理protobuf這種rpcKind的請求。因此RPC.Server收到了,就可以根據請求中攜帶的rpcKind,取出ProtobufRpcEngine.Invoker進行處理,即呼叫ProtobufRpcEngine.Invoker.call()方法。

Handler呼叫完call()方法,將返回結果value經過處理放回到Call物件中,然後呼叫responder.doResponse(call)進行響應操作。下面講解Responder執行緒的時候會詳細講到,Responder通過呼叫responder.doResponse(call)試圖在這個Connection上只有當前一個response的情況下,直接將response返回給客戶端而不麻煩Responder,如果不止當前一個響應,或者自己一次性無法將當前的response全部發送給遠端客戶端,才會交給Responder繼續進行。

6.Responder

Responder執行緒負責返回處理Selector上處於writable狀態的SelectionKey,然後執行寫操作,這個我們跟蹤Responder.run()的程式碼可以很清楚地看到,與Reader類似,這裡不做詳述。我們詳細

拋開寫操作的具體細節,我們知道,要想一個Selector可以監控一個channel是否是writable,這個channel必須得預先將自己註冊到Selector,這是在Handler.run()裡面通過呼叫Responder.processResponse()進行的:

 private boolean processResponse(LinkedList<Call> responseQueue,
                                    boolean inHandler) throws IOException {

      try {
        synchronized (responseQueue) {
          //先進先出,因此從respondeQueue中取出第一個Call物件進行處理
          call = responseQueue.removeFirst();//
          SocketChannel channel = call.connection.channel;
          //將call.rpcResponse中的資料寫入到channel中
          int numBytes = channelWrite(channel, call.rpcResponse);
          if (!call.rpcResponse.hasRemaining()) {//資料已經寫入完畢
            //資料已經寫完,進行一個buffer的清理工作
          } else {
             //如果資料沒有完成寫操作,則把Call物件重新放進responseQueue中的第一個,下次會進行傳送剩餘資料
            call.connection.responseQueue.addFirst(call);
              //如果是inHandler,說明這個方法是Handler直接呼叫的,這時候資料沒有傳送完畢,需要將channel註冊到writeSelector, 這樣Responder.doRunLoop()中就可以檢測到這個writeSelector上的writable的SocketChannel,然後把剩餘資料傳送給客戶端
            if (inHandler) {
                // Wakeup the thread blocked on select, only then can the call 
                // to channel.register() complete.
                writeSelector.wakeup();
                //將channel註冊到writeSelector,同時將這個Call物件attach到這個SelectionKey物件,這樣Responder執行緒就可以通過select方法檢測到channel上的寫事件,同時從Call中提取需要寫的資料以及SocketChannel,進而進行寫操作
                channel.register(writeSelector, SelectionKey.OP_WRITE, call);
            }
          }
          error = false;              // everything went off well
        }
      }  
      return done;
    }

從上面的程式碼可以看到,processResponse()負責對某一個Connection的多個響應中取出第一個(遵循先進先出規則),然後把這個響應通過這個SocketChannel返回給客戶端。同時,我們看到,processResponse()的第二個引數inHandler,這個引數標記著這個processResponse()的呼叫者是否是Handler,因為從Handler.run()方法中可以看到,Handler執行緒在封裝好了響應結果Call物件以後,會試圖直接通過呼叫doRespond()進行響應:

    void doRespond(Call call) throws IOException {
      synchronized (call.connection.responseQueue) {
        call.connection.responseQueue.addLast(call);//將這個Call物件新增到對應的connection的responseQueue中
        if (call.connection.responseQueue.size() == 1) {//如果目前與這個客戶端的連線的相應佇列中只有一條資料,則直接處理
            //對這個connection的responseQueue進行處理,之所以設定第二個引數為true,是為了
            //在Handler中呼叫doRespond方法的時候,由於是Handler,所以必定是一個新的請求過來,必須重新將channel註冊到在Responder.writerSelector上,以便下次響應
            processResponse(call.connection.responseQueue, true);
        }
      }
    }

Handler通過doRespond()方法將Call物件新增到當前這個Connection的responseQueue中,同時判斷responseQueue是不是隻有當前一個response,如果是,則Handler會在自身執行緒中直接呼叫Responder.processResponse(call.connection.responseQueue, true);直接響應,不必麻煩Responder執行緒。第二個引數inHandler=true,用來標記這個processResponse()方法是被Handler直接呼叫的,而不是在Responder執行緒裡的呼叫。這樣,如果響應資料只是一部分返回給了客戶端,那麼Handler會將這個socketChannel註冊到Responder.writeSelector並監聽SelectionKey.OP_WRITE,這樣,Responder在對這個writeSelector進行輪詢的時候,會發現當前socketChannel是writable,並負責將Handler沒有傳送完成的剩餘資料響應給客戶端。而如果Handler直接把資料全部發完,就不用勞煩Responder了。

結束

以上就是Hadoop基於Reactor模式設計的ipc Server,無論是HDFS NameNode,還是Yarn ResourceManager,都是基於ipc.Server的實現類RPC.Server進行的實現。通過NIO的高效處理方式,NameNode和ResourceManager雖然是整個系統的核心,卻不會成為整個系統的瓶頸。一些耗時的IO操作,都交給具體的業務處理器進行處理,處理的過程中RPC.Server會繼續接收其它的RPC請求而不會block掉。當這些耗時的IO操作完成,只需要將結果交付給RPC.Server,RPC.Server將請求返回給用使用者。