1.1 RPC的基礎概念

RPC,即Remote Procdure Call,中文名:遠端過程呼叫






1.2 RPC的顯著特點


  (2)高效能:RPC Server能夠併發處理多個來自Client的請求;

  (3)可控性:jdk中已經提供了一個RPC框架RMI,但是該PRC框架過於重量級並且可控之處比較少,所以Hadoop RPC實現了自定義的PRC框架。







1.4 Hadoop中的RPC機制(IPC)

同其他RPC框架一樣,Hadoop RPC分為四個部分:


  (2)函式呼叫層:Hadoop RPC通過動態代理以及java反射實現函式呼叫;

  (3)網路傳輸層:Hadoop RPC採用了基於


  (4)伺服器端框架層:RPC Server利用java NIO以及採用了事件驅動的I/O模型,提高RPC Server的併發處理能力;

1.5 Hadoop RPC設計技術






RPC是在分散式系統中必須要關注的,就是你在某一臺機器要呼叫其他機器上的函式的時候,就可以用RPC,使得這個函式呼叫就像呼叫本地函式一樣,你不需要擔心底層如何實現的,就跟TCP一樣, 上層呼叫無需關注下層實現。

Client的大致流程全在下面的程式碼中,你需要有的基礎知識(1)動態代理 (2)JAVA NIO 。  


  需要的知識:1動態代理 2.JAVA NIO


  @proxy  需要被代理的協議
  @method 需要被ipc的方法
  @args   引數
// ProtobufRpcEngine.invoke()
public Object invoke(Object proxy, final Method method, Object[] args)throws ServiceException {
      long startTime = 0;

      if (args.length != 2) { // RpcController + Message
        throw new ServiceException("Too many parameters for request. Method: ["
            + method.getName() + "]" + ", Expected: 2, Actual: "
            + args.length);
      if (args[1] == null) {
        throw new ServiceException("null param while calling Method: ["
            + method.getName() + "]");

      // if Tracing is on then start a new span for this rpc.
      // guard it in the if statement to make sure there isn't
      // any extra string manipulation.
      Tracer tracer = Tracer.curThreadTracer();
      TraceScope traceScope = null;
      if (tracer != null) {
        traceScope = tracer.newScope(RpcClientUtil.methodToTraceString(method));
      RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
      Message theRequest = (Message) args[1];
      final RpcResponseWrapper val;
      try {
        * 傳送rpc請求,等待返回結果
        * */
        val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
            new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,

      } finally {
        if (traceScope != null) traceScope.close();
      if (Client.isAsynchronousMode()) {
        final AsyncGet<RpcResponseWrapper, IOException> arr
            = Client.getAsyncRpcResponse();
        final AsyncGet<Message, Exception> asyncGet
            = new AsyncGet<Message, Exception>() {
          public Message get(long timeout, TimeUnit unit) throws Exception {
            return getReturnMessage(method, arr.get(timeout, unit));

          public boolean isDone() {
            return arr.isDone();
        return null;
      } else {
        return getReturnMessage(method, val);

   * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
   * <code>remoteId</code>, returning the rpc respond.
   * @param rpcKind
   * @param rpcRequest -  contains serialized method and method parameters
   * @param remoteId - the target rpc server
   * @param fallbackToSimpleAuth - set to true or false during this method to
   *   indicate if a secure client falls back to simple auth
   * @returns the rpc response
   * Throws exceptions if there are network problems or if the remote code
   * threw an exception.
// Client.call()
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
      ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth)
      throws IOException {
    return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,

   * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
   * <code>remoteId</code>, returning the rpc response.
   * @param rpcKind
   * @param rpcRequest -  contains serialized method and method parameters
   * @param remoteId - the target rpc server
   * @param serviceClass - service class for RPC
   * @param fallbackToSimpleAuth - set to true or false during this method to
   *   indicate if a secure client falls back to simple auth
   * @returns the rpc response
   * Throws exceptions if there are network problems or if the remote code
   * threw an exception.
  * 產生一個 call,傳遞rcpRequest到由remoteId指定的IPC server,並且返回一個 rpc response
  * */
// Client.call()
Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,ConnectionId remoteId, int serviceClass,AtomicBoolean fallbackToSimpleAuth) throws IOException {
    final Call call = createCall(rpcKind, rpcRequest);
    final Connection connection = getConnection(remoteId, call, serviceClass,

    try {
      try {
        connection.sendRpcRequest(call);                 // send the rpc request
    if (isAsynchronousMode()) {
      final AsyncGet<Writable, IOException> asyncGet
          = new AsyncGet<Writable, IOException>() {
        public Writable get(long timeout, TimeUnit unit)
            throws IOException, TimeoutException{
          boolean done = true;
          try {
            final Writable w = getRpcResponse(call, connection, timeout, unit);
            if (w == null) {
              done = false;
              throw new TimeoutException(call + " timed out "
                  + timeout + " " + unit);
            return w;
          } finally {
            if (done) {

        public boolean isDone() {
          synchronized (call) {
            return call.done;

      return null;
    } else {
      //返回rpc response
      return getRpcResponse(call, connection, -1, null);

/** Get a connection from the pool, or create a new one and add it to the
   * pool.  Connections to a given ConnectionId are reused. */
// Client.Connetcion.getConnection()
private Connection getConnection(ConnectionId remoteId,Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)throws IOException {
    Connection connection;
    /* we could avoid this allocation for each RPC by having a  
     * connectionsId object and with set() method. We need to manage the
     * refs for keys in HashMap properly. For now its ok.
    while (true) {
      // These lines below can be shorten with computeIfAbsent in Java8
      connection = connections.get(remoteId);
      if (connection == null) {
        // 初始化connection資訊
        connection = new Connection(remoteId, serviceClass);

        //  putIfAbsent
         * If the specified key is not already associated
         * with a value, associate it with the given value.
         * This is equivalent to
         * <pre>
         *   if (!map.containsKey(key))
         *       return map.put(key, value);
         *   else
         *       return map.get(key);</pre>
         * except that the action is performed atomically.
         * .....
        Connection existing = connections.putIfAbsent(remoteId, connection);
        if (existing != null) {
          connection = existing;
      // 在該connection中加入一個call,執行緒安全的加
      if (connection.addCall(call)) {
      } else {
        // This connection is closed, should be removed. But other thread could
        // have already known this closedConnection, and replace it with a new
        // connection. So we should call conditional remove to make sure we only
        // remove this closedConnection.
        connections.remove(remoteId, connection);

    // If the server happens to be slow, the method below will take longer to
    // establish a connection.
    return connection;

     * Add a call to this connection's call queue and notify
     * a listener; synchronized.
     * Returns false if called during shutdown.
     * @param call to add
     * @return true if the call was added.

    往這個連線中加入一個call, 並且喚醒Connection run執行緒的等待
private synchronized boolean addCall(Call call) {
      if (shouldCloseConnection.get())
        return false;
      calls.put(call.id, call);
      return true;

/** Connect to the server and set up the I/O streams. It then sends
     * a header to the server and starts
     * the connection thread that waits for responses.
    * 建立這個socket 的IO 流
    * */

private synchronized void setupIOstreams(AtomicBoolean fallbackToSimpleAuth) {

      try {
        Span span = Tracer.getCurrentSpan();
        if (span != null) {
          span.addTimelineAnnotation("IPC client connecting to " + server);
        short numRetries = 0;
        Random rand = null;
        while (true) {
          // 建立socket連線
          InputStream inStream = NetUtils.getInputStream(socket);
          OutputStream outStream = NetUtils.getOutputStream(socket);
          // 寫 rpc 請求頭
           * Write the connection header - this is sent when connection is established
           * +----------------------------------+
           * |  "hrpc" 4 bytes                  |
           * +----------------------------------+
           * |  Version (1 byte)                |
           * +----------------------------------+
           * |  Service Class (1 byte)          |
           * +----------------------------------+
           * |  AuthProtocol (1 byte)           |
           * +----------------------------------+
               private void writeConnectionHeader(OutputStream outStream)
                  throws IOException {
                DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
                // Write out the header, version and authentication method
                // “hrpc”
          // 都是以 Ping請求傳送的      
          if (doPing) {
            inStream = new PingInputStream(inStream);
          this.in = new DataInputStream(new BufferedInputStream(inStream));

          // SASL may have already buffered the stream
          if (!(outStream instanceof BufferedOutputStream)) {
            outStream = new BufferedOutputStream(outStream);
          this.out = new DataOutputStream(outStream);
          // 第二次寫
          writeConnectionContext(remoteId, authMethod);

          // update last activity time

          span = Tracer.getCurrentSpan();
          if (span != null) {
            span.addTimelineAnnotation("IPC client connected to " + server);

          // start the receiver thread after the socket connection has been set
          // up

          //開啟 connection執行緒,如果calls佇列中有call,就會去接受訊息

  傳送rpc 請求  第三次寫
// Client.Connection.sendRpcRequest() 
public void sendRpcRequest(final Call call)
        throws InterruptedException, IOException {
      if (shouldCloseConnection.get()) {

      // Serialize the call to be sent. This is done from the actual
      // caller thread, rather than the sendParamsExecutor thread,
      // so that if the serialization throws an error, it is reported
      // properly. This also parallelizes the serialization.
      // Format of a call on the wire:
      // 0) Length of rest below (1 + 2)
      // 1) RpcRequestHeader  - is serialized Delimited hence contains length
      // 2) RpcRequest
      // Items '1' and '2' are prepared here. 
      final DataOutputBuffer d = new DataOutputBuffer();
      * call.rpcKind  rpc引擎
      * */
      RpcRequestHeaderaderProto header = ProtoUtil.makeRpcRequestHeader(
          call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
      // 同步鎖 sendRpcRequestLock  
      synchronized (sendRpcRequestLock) {
        Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
          // 傳送具體回撥函式給server端
          public void run() {
            try {
              synchronized (Connection.this.out) {
                if (shouldCloseConnection.get()) {
                if (LOG.isDebugEnabled())
                  LOG.debug(getName() + " sending #" + call.id);
                // 獲取資料長度
                byte[] data = d.getData();
                int totalLength = d.getLength();
                out.writeInt(totalLength); // Total Length
                out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
            } catch (IOException e) {
              // exception at this point would leave the connection in an
              // unrecoverable state (eg half a call left on the wire).
              // So, close the connection, killing any outstanding calls
            } finally {
              //the buffer is just an in-memory buffer, but it is still polite to
              // close early
        try {

public void run() {

      try {
        // 等待receive的工作
        while (waitForWork()) {//wait here for work - read or close connection


private void receiveRpcResponse() {
      if (shouldCloseConnection.get()) {
      try {
        int totalLen = in.readInt();
         * Protobuf type {@code hadoop.common.RpcResponseHeaderProto}
         * <pre>
         * Rpc Response Header
         * +------------------------------------------------------------------+
         * | Rpc total response length in bytes (4 bytes int)                 |
         * |  (sum of next two parts)                                         |
         * +------------------------------------------------------------------+
         * | RpcResponseHeaderProto - serialized delimited ie has len         |
         * +------------------------------------------------------------------+
         * | if request is successful:                                        |
         * |   - RpcResponse -  The actual rpc response  bytes follow         |
         * |     the response header                                          |
         * |     This response is serialized based on RpcKindProto            |
         * | if request fails :                                               |
         * |   The rpc response header contains the necessary info            |
         * +------------------------------------------------------------------+
         * Note that rpc response header is also used when connection setup fails.
         * Ie the response looks like a rpc response with a fake callId.
         * </pre>
        RpcResponseHeaderProto header =

        int headerLen = header.getSerializedSize();
        headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
        int callId = header.getCallId();
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + " got value #" + callId);
        //Rpc 回覆的狀態
        RpcStatusProto status = header.getStatus();
        //判斷返回的rpc response 狀態
        if (status == RpcStatusProto.SUCCESS) {
          Writable value = ReflectionUtils.newInstance(valueClass, conf);
          value.readFields(in);                 // read value
          // 移除這個callid
          final Call call = calls.remove(callId);
          // 設定返回值
          // verify that length was correct
          // only for ProtobufEngine where len can be verified easily
          if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {
            ProtobufRpcEngine.RpcWrapper resWrapper = 
                (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();
            if (totalLen != headerLen + resWrapper.getLength()) { 
              throw new RpcClientException(
                  "RPC response length mismatch on rpc success");
        } else { // Rpc Request failed
          // Verify that length was correct
          if (totalLen != headerLen) {
            throw new RpcClientException(
                "RPC response length mismatch on rpc error");
          final String exceptionClassName = header.hasExceptionClassName() ?
                header.getExceptionClassName() : 
          final String errorMsg = header.hasErrorMsg() ? 
                header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;
          final RpcErrorCodeProto erCode = 
                    (header.hasErrorDetail() ? header.getErrorDetail() : null);
          if (erCode == null) {
             LOG.warn("Detailed error code not set by server on rpc error");
          RemoteException re = new RemoteException(exceptionClassName, errorMsg, erCode);
          if (status == RpcStatusProto.ERROR) {
            final Call call = calls.remove(callId);
          } else if (status == RpcStatusProto.FATAL) {
            // Close the connection
      } catch (IOException e) {


