1. 程式人生 > >RPC-Thrift(四)

RPC-Thrift(四)

正在 throws 周期 isset xtend block exce llb logger

Client

  Thrift客戶端有兩種:同步客戶端和異步客戶端。

  同步客戶端

    同步客戶端比較簡單,以RPC-Thrift(一)中的的例子為基礎進行研究源碼,先看一下類圖。

    技術分享圖片

    TServiceClient:用於以同步方式與TService進行通信;

    Iface接口和Client類都是通過Thrift文件自動生成的代碼。

    TServiceClient

      TServiceClient定義了基礎的向Server發送請求和從Server接收響應的方法。

public abstract class TServiceClient {
  public
TServiceClient(TProtocol prot) { this(prot, prot); } public TServiceClient(TProtocol iprot, TProtocol oprot) { iprot_ = iprot; oprot_ = oprot; } protected TProtocol iprot_;//輸入TProtocol protected TProtocol oprot_;//輸出TProtocol protected int seqid_;//序列號 public TProtocol getInputProtocol() {
return this.iprot_; } public TProtocol getOutputProtocol() { return this.oprot_; } //向Server發送請求 protected void sendBase(String methodName, TBase args) throws TException { //寫消息頭,seqid_只是簡單的++,非線程安全,接收響應時要進行seqid_的校驗 oprot_.writeMessageBegin(new TMessage(methodName, TMessageType.CALL, ++seqid_)); args.write(oprot_);
//寫參數 oprot_.writeMessageEnd(); oprot_.getTransport().flush();//發送 } //從Server接收響應 protected void receiveBase(TBase result, String methodName) throws TException { TMessage msg = iprot_.readMessageBegin();//讀消息頭,若沒有數據一直等待,詳見TTransport的實現 if (msg.type == TMessageType.EXCEPTION) { //異常消息通過TApplicationException讀取 TApplicationException x = TApplicationException.read(iprot_); iprot_.readMessageEnd(); throw x; } if (msg.seqid != seqid_) { //序列號不一致報異常 throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response"); } result.read(iprot_);//讀數據,由其result子類實現 iprot_.readMessageEnd(); } }

    Iface

  public interface Iface {
    //thrift中定義的方法
    public ResultCommon sayHello(String paramJson) throws org.apache.thrift.TException;
  }

    Client

  public static class Client extends org.apache.thrift.TServiceClient implements Iface {
    //Client工廠類
    public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
      public Factory() {}
      public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
        return new Client(prot);
      }
      public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
        return new Client(iprot, oprot);
      }
    }
    public Client(org.apache.thrift.protocol.TProtocol prot)
    {
      super(prot, prot);
    }
    public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
      super(iprot, oprot);
    }
    //sayHello方法調用入口
    public ResultCommon sayHello(String paramJson) throws org.apache.thrift.TException
    {
      send_sayHello(paramJson);//發送請求
      return recv_sayHello();//接收響應
    }
    //發送請求
    public void send_sayHello(String paramJson) throws org.apache.thrift.TException
    {
      sayHello_args args = new sayHello_args();//組裝參數
      args.setParamJson(paramJson);
      sendBase("sayHello", args);//調用父類的sendBase方法發送請求
    }
    //接收響應
    public ResultCommon recv_sayHello() throws org.apache.thrift.TException
    {
      sayHello_result result = new sayHello_result();
      receiveBase(result, "sayHello");//調用父類的receiveBase方法發送請求
      if (result.isSetSuccess()) {
        return result.success;
      }
      throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "sayHello failed: unknown result");
    }
  }

  異步客戶端

    異步客戶端實現比較復雜,通過回調實現,先看一個異步客戶端的例子。異步客戶端需要使用TNonblockingSocket,通過AsyncMethodCallback接收服務端的回調。

 1 String paramJson = "{\"wewe\":\"111\"}";
 2 TNonblockingSocket tNonblockingSocket = new TNonblockingSocket("127.0.0.1", 8090);//使用非阻塞TNonblockingSocket
 3 TAsyncClientManager tAsyncClientManager = new TAsyncClientManager();
 4 HelloService.AsyncClient asyncClient = new HelloService.AsyncClient.Factory(tAsyncClientManager, new TBinaryProtocol.Factory()).getAsyncClient(tNonblockingSocket);
 5 asyncClient.sayHello(paramJson, new AsyncMethodCallback<HelloService.AsyncClient.sayHello_call>() {
 6     @Override
 7     public void onError(Exception exception) {
 8         //...
 9     }
10     @Override
11     public void onComplete(sayHello_call response) {
12         ResultCommon resultCommon = response.getResult();
13         System.out.println(resultCommon.getDesc());
14     }
15 });

    涉及到的類結構圖如下:

    技術分享圖片

    TAsyncClient:異步客戶端抽象類,通過Thrift文件生成的AsyncClient需繼承該類;

    TAsyncClientManager:異步客戶端管理類,包含一個selector線程,用於轉換方法調用對象;

    TAsyncMethodCall:封裝了異步方法調用,Thrift文件定義的所有方法都會在AsyncClient中生成對應的繼承於TAsyncMethodCall的內部類(如sayHello_call);

    AsyncMethodCallback:接收服務端回調的接口,用戶需要定義實現該接口的類。

    TAsyncClient

      TAsyncClient為異步客戶端提供了公共的屬性和方法。

public abstract class TAsyncClient {
  protected final TProtocolFactory ___protocolFactory;
  protected final TNonblockingTransport ___transport;
  protected final TAsyncClientManager ___manager;//異步客戶端管理類
  protected TAsyncMethodCall ___currentMethod;//異步方法調用
  private Exception ___error;
  private long ___timeout;
  public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport) {
    this(protocolFactory, manager, transport, 0);
  }
  public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport, long timeout) {
    this.___protocolFactory = protocolFactory;
    this.___manager = manager;
    this.___transport = transport;
    this.___timeout = timeout;
  }
  public TProtocolFactory getProtocolFactory() {
    return ___protocolFactory;
  }
  public long getTimeout() {
    return ___timeout;
  }
  public boolean hasTimeout() {
    return ___timeout > 0;
  }
  public void setTimeout(long timeout) {
    this.___timeout = timeout;
  }
  //客戶端是否處於異常狀態
  public boolean hasError() {
    return ___error != null;
  }
  public Exception getError() {
    return ___error;
  }
  //檢查是否準備就緒,如果當前Cilent正在執行一個方法或處於error狀態則報異常
  protected void checkReady() {
    if (___currentMethod != null) {
      throw new IllegalStateException("Client is currently executing another method: " + ___currentMethod.getClass().getName());
    }
    if (___error != null) {
      throw new IllegalStateException("Client has an error!", ___error);
    }
  }
  //執行完成時delegate方法會調用該方法,將___currentMethod置為null
  protected void onComplete() {
    ___currentMethod = null;
  }
  //執行出現異常時delegate方法會調用該方法,
  protected void onError(Exception exception) {
    ___transport.close();//關閉連接
    ___currentMethod = null;//將___currentMethod置為null
    ___error = exception;//異常信息
  }
}

    AsyncClient

      AsyncClient類是通過Thrift文件自動生成的,在該類中含有每個方法的調用入口,並且為每個方法生成了一個方法調用類方法名_call,如sayHello_call。sayHello_call實現了父類TAsyncMethodCall的連個抽象方法:write_args和getResult,因為每個方法的參數和返回值不同,所以這兩個方法需要具體子類實現。

  public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
    //AsyncClient工廠類
    public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
      private org.apache.thrift.async.TAsyncClientManager clientManager;
      private org.apache.thrift.protocol.TProtocolFactory protocolFactory;
      public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
        this.clientManager = clientManager;
        this.protocolFactory = protocolFactory;
      }
      public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) {
        return new AsyncClient(protocolFactory, clientManager, transport);
      }
    }
    public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) {
      super(protocolFactory, clientManager, transport);
    }
    //sayHello方法調用入口
    public void sayHello(String paramJson, org.apache.thrift.async.AsyncMethodCallback<sayHello_call> resultHandler) throws org.apache.thrift.TException {
      checkReady();//檢查當前Client是否可用
      //創建方法調用實例
      sayHello_call method_call = new sayHello_call(paramJson, resultHandler, this, ___protocolFactory, ___transport);
      this.___currentMethod = method_call;
      //調用TAsyncClientManager的call方法
      ___manager.call(method_call);
    }
    public static class sayHello_call extends org.apache.thrift.async.TAsyncMethodCall {
      private String paramJson;
      public sayHello_call(String paramJson, org.apache.thrift.async.AsyncMethodCallback<sayHello_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
        super(client, protocolFactory, transport, resultHandler, false);
        this.paramJson = paramJson;
      }
      //發送請求
      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("sayHello", org.apache.thrift.protocol.TMessageType.CALL, 0));
        sayHello_args args = new sayHello_args();
        args.setParamJson(paramJson);
        args.write(prot);
        prot.writeMessageEnd();
      }
      //獲取返回結果
      public ResultCommon getResult() throws org.apache.thrift.TException {
        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
          throw new IllegalStateException("Method call not finished!");
        }
        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
        return (new Client(prot)).recv_sayHello();
      }
    }
  }  

    TAsyncClientManager

      TAsyncClientManager是異步客戶端管理類,它為維護了一個待處理的方法調用隊列pendingCalls,並通過SelectThread線程監聽selector事件,當有就緒事件時進行方法調用的處理。

public class TAsyncClientManager {
  private static final Logger LOGGER = LoggerFactory.getLogger(TAsyncClientManager.class.getName());
  private final SelectThread selectThread;
  //TAsyncMethodCall待處理隊列
  private final ConcurrentLinkedQueue<TAsyncMethodCall> pendingCalls = new ConcurrentLinkedQueue<TAsyncMethodCall>();
  //初始化TAsyncClientManager,新建selectThread線程並啟動
  public TAsyncClientManager() throws IOException {
    this.selectThread = new SelectThread();
    selectThread.start();
  }
  //方法調用
  public void call(TAsyncMethodCall method) throws TException {
    if (!isRunning()) {
      throw new TException("SelectThread is not running");
    }
    method.prepareMethodCall();//做方法調用前的準備
    pendingCalls.add(method);//加入待處理隊列
    selectThread.getSelector().wakeup();//喚醒selector,很重要,因為首次執行方法調用時select Thread還阻塞在selector.select()上
  }
  public void stop() {
    selectThread.finish();
  }
  public boolean isRunning() {
    return selectThread.isAlive();
  }
  //SelectThread線程類,處理方法調用的核心
  private class SelectThread extends Thread {
    private final Selector selector;
    private volatile boolean running;
    private final TreeSet<TAsyncMethodCall> timeoutWatchSet = new TreeSet<TAsyncMethodCall>(new TAsyncMethodCallTimeoutComparator());

    public SelectThread() throws IOException {
      this.selector = SelectorProvider.provider().openSelector();
      this.running = true;
      this.setName("TAsyncClientManager#SelectorThread " + this.getId());
      setDaemon(true);//非守護線程
    }
    public Selector getSelector() {
      return selector;
    }
    public void finish() {
      running = false;
      selector.wakeup();
    }
    public void run() {
      while (running) {
        try {
          try {
            
            if (timeoutWatchSet.size() == 0) {
              //如果超時TAsyncMethodCall監控集合為空,直接無限期阻塞監聽select()事件。TAsyncClientManager剛初始化時是空的
              selector.select();
            } else {
              //如果超時TAsyncMethodCall監控集合不為空,則計算Set中第一個元素的超時時間戳是否到期
              long nextTimeout = timeoutWatchSet.first().getTimeoutTimestamp();
              long selectTime = nextTimeout - System.currentTimeMillis();
              if (selectTime > 0) {
                //還沒有到期,超時監聽select()事件,超過selectTime自動喚醒selector
                selector.select(selectTime);
              } else {
                //已經到期,立刻監聽select()事件,不會阻塞selector
                selector.selectNow();
              }
            }
          } catch (IOException e) {
            LOGGER.error("Caught IOException in TAsyncClientManager!", e);
          }
          //監聽到就緒事件或者selector被喚醒會執行到此處
          transitionMethods();//處理就緒keys
          timeoutMethods();//超時方法調用處理
          startPendingMethods();//處理pending的方法調用
        } catch (Exception exception) {
          LOGGER.error("Ignoring uncaught exception in SelectThread", exception);
        }
      }
    }
    //監聽到就緒事件或者selector被喚醒,如果有就緒的SelectionKey就調用methodCall.transition(key);
    private void transitionMethods() {
      try {
        Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
        while (keys.hasNext()) {
          SelectionKey key = keys.next();
          keys.remove();
          if (!key.isValid()) {
            //跳過無效key,方法調用出現異常或key被取消等會導致無效key
            continue;
          }
          TAsyncMethodCall methodCall = (TAsyncMethodCall)key.attachment();
          //調用methodCall的transition方法,執行相關的動作並將methodCall的狀態轉換為下一個狀態
          methodCall.transition(key);
          //如果完成或發生錯誤,從timeoutWatchSet刪除該methodCall
          if (methodCall.isFinished() || methodCall.getClient().hasError()) {
            timeoutWatchSet.remove(methodCall);
          }
        }
      } catch (ClosedSelectorException e) {
        LOGGER.error("Caught ClosedSelectorException in TAsyncClientManager!", e);
      }
    }
    //超時方法調用處理
    private void timeoutMethods() {
      Iterator<TAsyncMethodCall> iterator = timeoutWatchSet.iterator();
      long currentTime = System.currentTimeMillis();
      while (iterator.hasNext()) {
        TAsyncMethodCall methodCall = iterator.next();
        if (currentTime >= methodCall.getTimeoutTimestamp()) {
          //如果超時,從timeoutWatchSet中刪除並調用onError()方法
          iterator.remove();
          methodCall.onError(new TimeoutException("Operation " + methodCall.getClass() + " timed out after " + (currentTime - methodCall.getStartTime()) + " ms."));
        } else {
          //如果沒有超時,說明之後的TAsyncMethodCall也不會超時,跳出循環,因為越早進入timeoutWatchSet的TAsyncMethodCall越先超時。
          break;
        }
      }
    }
    //開始等待的方法調用,循環處理pendingCalls中的methodCall
    private void startPendingMethods() {
      TAsyncMethodCall methodCall;
      while ((methodCall = pendingCalls.poll()) != null) {
        // Catch registration errors. method will catch transition errors and cleanup.
        try {
          //向selector註冊並設置初次狀態
          methodCall.start(selector);
          //如果客戶端指定了超時時間且transition成功,將methodCall加入到timeoutWatchSet
          TAsyncClient client = methodCall.getClient();
          if (client.hasTimeout() && !client.hasError()) {
            timeoutWatchSet.add(methodCall);
          }
        } catch (Exception exception) {
          //異常處理
          LOGGER.warn("Caught exception in TAsyncClientManager!", exception);
          methodCall.onError(exception);
        }
      }
    }
  }
  //TreeSet用的比較器,判斷是否是同一個TAsyncMethodCall實例
  private static class TAsyncMethodCallTimeoutComparator implements Comparator<TAsyncMethodCall> {
    public int compare(TAsyncMethodCall left, TAsyncMethodCall right) {
      if (left.getTimeoutTimestamp() == right.getTimeoutTimestamp()) {
        return (int)(left.getSequenceId() - right.getSequenceId());
      } else {
        return (int)(left.getTimeoutTimestamp() - right.getTimeoutTimestamp());
      }
    }
  }
}

    TAsyncMethodCall

      TAsyncMethodCall實現了對方法調用的封裝。一次方法調用過程就是一個TAsyncMethodCall實例的生命周期。TAsyncMethodCall實例在整個生命周期內有以下狀態,正常情況下的狀態狀態過程為:CONNECTING -> WRITING_REQUEST_SIZE -> WRITING_REQUEST_BODY -> READING_RESPONSE_SIZE -> READING_RESPONSE_BODY -> RESPONSE_READ,如果任何一個過程中發生了異常則直接轉換為ERROR狀態。

  public static enum State {
    CONNECTING,//連接狀態
    WRITING_REQUEST_SIZE,//寫請求size
    WRITING_REQUEST_BODY,//寫請求體
    READING_RESPONSE_SIZE,//讀響應size
    READING_RESPONSE_BODY,//讀響應體
    RESPONSE_READ,//讀響應完成
    ERROR;//異常狀態
  }

      TAsyncMethodCall的源碼分析如下:

public abstract class TAsyncMethodCall<T> {
  private static final int INITIAL_MEMORY_BUFFER_SIZE = 128;
  private static AtomicLong sequenceIdCounter = new AtomicLong(0);//序列號計數器private State state = null;//狀態在start()方法中初始化
  protected final TNonblockingTransport transport;
  private final TProtocolFactory protocolFactory;
  protected final TAsyncClient client;
  private final AsyncMethodCallback<T> callback;//回調實例
  private final boolean isOneway;
  private long sequenceId;//序列號
  
  private ByteBuffer sizeBuffer;//Java NIO概念,frameSize buffer
  private final byte[] sizeBufferArray = new byte[4];//4字節的消息Size字節數組
  private ByteBuffer frameBuffer;//Java NIO概念,frame buffer

  private long startTime = System.currentTimeMillis();

  protected TAsyncMethodCall(TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport, AsyncMethodCallback<T> callback, boolean isOneway) {
    this.transport = transport;
    this.callback = callback;
    this.protocolFactory = protocolFactory;
    this.client = client;
    this.isOneway = isOneway;
    this.sequenceId = TAsyncMethodCall.sequenceIdCounter.getAndIncrement();
  }
  protected State getState() {
    return state;
  }
  protected boolean isFinished() {
    return state == State.RESPONSE_READ;
  }
  protected long getStartTime() {
    return startTime;
  }
  protected long getSequenceId() {
    return sequenceId;
  }
  public TAsyncClient getClient() {
    return client;
  }
  public boolean hasTimeout() {
    return client.hasTimeout();
  }
  public long getTimeoutTimestamp() {
    return client.getTimeout() + startTime;
  }
  //將請求寫入protocol,由子類實現
  protected abstract void write_args(TProtocol protocol) throws TException;
  //方法調用前的準備處理,初始化frameBuffer和sizeBuffer
  protected void prepareMethodCall() throws TException {
    //TMemoryBuffer內存緩存傳輸類,繼承了TTransport
    TMemoryBuffer memoryBuffer = new TMemoryBuffer(INITIAL_MEMORY_BUFFER_SIZE);
    TProtocol protocol = protocolFactory.getProtocol(memoryBuffer);
    write_args(protocol);//將請求寫入protocol

    int length = memoryBuffer.length();
    frameBuffer = ByteBuffer.wrap(memoryBuffer.getArray(), 0, length);

    TFramedTransport.encodeFrameSize(length, sizeBufferArray);
    sizeBuffer = ByteBuffer.wrap(sizeBufferArray);
  }
  //向selector註冊並設置開始狀態,可能是連接狀態或寫狀態
  void start(Selector sel) throws IOException {
    SelectionKey key;
    if (transport.isOpen()) {
      state = State.WRITING_REQUEST_SIZE;
      key = transport.registerSelector(sel, SelectionKey.OP_WRITE);
    } else {
      state = State.CONNECTING;
      key = transport.registerSelector(sel, SelectionKey.OP_CONNECT);
      //如果是非阻塞連接初始化會立即成功,轉換為寫狀態並修改感興趣事件
      if (transport.startConnect()) {
        registerForFirstWrite(key);
      }
    }
    key.attach(this);//將本methodCall附加在key上
  }
  void registerForFirstWrite(SelectionKey key) throws IOException {
    state = State.WRITING_REQUEST_SIZE;
    key.interestOps(SelectionKey.OP_WRITE);
  }
  protected ByteBuffer getFrameBuffer() {
    return frameBuffer;
  }
  //轉換為下一個狀態,根據不同的狀態做不同的處理。該方法只會在selector thread中被調用,不用擔心並發
  protected void transition(SelectionKey key) {
    // 確保key是有效的
    if (!key.isValid()) {
      key.cancel();
      Exception e = new TTransportException("Selection key not valid!");
      onError(e);
      return;
    }
    try {
      switch (state) {
        case CONNECTING:
          doConnecting(key);//建連接
          break;
        case WRITING_REQUEST_SIZE:
          doWritingRequestSize();//寫請求size
          break;
        case WRITING_REQUEST_BODY:
          doWritingRequestBody(key);//寫請求體
          break;
        case READING_RESPONSE_SIZE:
          doReadingResponseSize();//讀響應size
          break;
        case READING_RESPONSE_BODY:
          doReadingResponseBody(key);//讀響應體
          break;
        default: // RESPONSE_READ, ERROR, or bug
          throw new IllegalStateException("Method call in state " + state
              + " but selector called transition method. Seems like a bug...");
      }
    } catch (Exception e) {
      key.cancel();
      key.attach(null);
      onError(e);
    }
  }
  //出現異常時的處理
  protected void onError(Exception e) {
    client.onError(e);//置Client異常信息
    callback.onError(e);//回調異常方法
    state = State.ERROR;//置當前對象為ERROR狀態
  }
  //讀響應消息體
  private void doReadingResponseBody(SelectionKey key) throws IOException {
    if (transport.read(frameBuffer) < 0) {
      throw new IOException("Read call frame failed");
    }
    if (frameBuffer.remaining() == 0) {
      cleanUpAndFireCallback(key);
    }
  }
  //方法調用完成的處理
  private void cleanUpAndFireCallback(SelectionKey key) {
    state = State.RESPONSE_READ;//狀態轉換為讀取response完成
    key.interestOps(0);//清空感興趣事件
    key.attach(null);//清理key的附加信息
    client.onComplete();//將client的___currentMethod置為null
    callback.onComplete((T)this);//回調onComplete方法
  }
  //讀響應size,同樣可能需要多多次直到把sizeBuffer讀滿
  private void doReadingResponseSize() throws IOException {
    if (transport.read(sizeBuffer) < 0) {
      throw new IOException("Read call frame size failed");
    }
    if (sizeBuffer.remaining() == 0) {
      state = State.READING_RESPONSE_BODY;
      //讀取FrameSize完成,為frameBuffer分配FrameSize大小的空間用於讀取響應體
      frameBuffer = ByteBuffer.allocate(TFramedTransport.decodeFrameSize(sizeBufferArray));
    }
  }
  //寫請求體
  private void doWritingRequestBody(SelectionKey key) throws IOException {
    if (transport.write(frameBuffer) < 0) {
      throw new IOException("Write call frame failed");
    }
    if (frameBuffer.remaining() == 0) {
      if (isOneway) {
        //如果是單向RPC,此時方法調用已經結束,清理key並進行回調
        cleanUpAndFireCallback(key);
      } else {
        //非單向RPC,狀態轉換為READING_RESPONSE_SIZE
        state = State.READING_RESPONSE_SIZE;
        //重置sizeBuffer,準備讀取frame size
        sizeBuffer.rewind();
        key.interestOps(SelectionKey.OP_READ);//修改感興趣事件
      }
    }
  }
  //寫請求size到transport,可能會寫多次直到sizeBuffer.remaining() == 0才轉換狀態
  private void doWritingRequestSize() throws IOException {
    if (transport.write(sizeBuffer) < 0) {
      throw new IOException("Write call frame size failed");
    }
    if (sizeBuffer.remaining() == 0) {
      state = State.WRITING_REQUEST_BODY;
    }
  }
  //建立連接
  private void doConnecting(SelectionKey key) throws IOException {
    if (!key.isConnectable() || !transport.finishConnect()) {
      throw new IOException("not connectable or finishConnect returned false after we got an OP_CONNECT");
    }
    registerForFirstWrite(key);
  }
}

  總結

    最後總結一下異步客戶端的處理流程,如下圖所示。

    需要註意的是,一個AsyncClient實例只能同時處理一個方法調用,必須等待前一個方法調用完成後才能使用該AsyncClient實例調用其他方法,疑問:和同步客戶端相比有什麽優勢?不用等返回結果,可以幹其他的活?又能幹什麽活呢?如果客戶端使用了連接池(也是AsyncClient實例池,一個AsyncClient實例對應一個連接),該線程不用等待前一個連接進行方法調用的返回結果,就可以去線程池獲取一個可用的連接,使用新的連接進行方法調用,而原來的連接在收到返回結果後,狀態變為可用,返回給連接池。這樣相對於同步客戶端單個線程串行發送請求的情況,異步客戶端單個線程進行發送請求的效率會大大提高,需要的線程數變小,但是可能需要的連接數會增大,單個請求的響應時間會變長。在線程數是性能瓶頸,或對請求的響應時間要求不高的情況下,使用異步客戶端比較合適。

    技術分享圖片

RPC-Thrift(四)