1. 程式人生 > >Thrift原始碼系列----2.TTransport層原始碼分析

Thrift原始碼系列----2.TTransport層原始碼分析

前言

        前一章為大家介紹了Thrift類體系,給大家一個整體上的認識,本章開始對每一層的實現細節進行研究,這裡我們從與業務分離、處於最底層的TTransport層原始碼開始。

阻塞與非阻塞TTransport

        看原始碼前,有必要先向大家說明一下,阻塞、非阻塞服務的服務端與客戶端是如何搭配使用TTransport,這裡還是隻研究我們常用的類,下圖是TTransport類間的關係圖:

這裡寫圖片描述

  1. 服務端為阻塞式服務時,使用TServerSocket,接收到客戶端的請求建立一個TSocket通訊,客戶端使用TSocket配合即可。
  2. 服務端為非阻塞式服務時,使用TNonblockingServerSocket,接收到客戶端的請求建立一個TNonblockingSocket通訊,在讀完客戶端的請求資料後,儲存為本地一個TTransport物件,然後封裝為TFramedTransport物件進行處理(詳見AbstractNonblockingServer.Args類原始碼),客戶端使用TSocket,並且使用TFramedTransport封裝。

        備註:當客戶端是非同步時的使用情況較複雜,這裡暫不討論。

TTransport類體系

TTransport的原始碼

        作為所有TTransport層的父類,還是先看看它的原始碼。

public abstract class TTransport implements Closeable {
  //當前連線是否已開啟
  public abstract boolean isOpen();
  //是否還有資料需要讀,當連線關閉時認為無資料可讀
  public boolean peek() {
    return isOpen();
  }
  //開啟當前連線,可用於IO讀寫
public abstract void open() throws TTransportException; //關閉當前連線 public abstract void close(); //向buf位元組陣列中寫入資料,從off開始,最多讀len長度的位元組,最後返回實際向buf寫入的位元組數 public abstract int read(byte[] buf, int off, int len) throws TTransportException; //確保向buf中從off開始寫入,len長度的位元組,這裡通過迴圈呼叫上面的方法實現最後返回向buf寫入的位元組數
public int readAll(byte[] buf, int off, int len) throws TTransportException { int got = 0; int ret = 0; while (got < len) {//沒有讀完繼續下一次讀取,直接讀到的資料大於等於需要的len長度 ret = read(buf, off+got, len-got); if (ret <= 0) { throw new TTransportException( "Cannot read. Remote side has closed. Tried to read " + len + " bytes, but only got " + got + " bytes. (This is often indicative of an internal error on the server side. Please check your server logs.)"); } got += ret; } return got; } //將buf中的資料全部發送出去 public void write(byte[] buf) throws TTransportException { write(buf, 0, buf.length); } //將buf的資料,從off開始,傳送len長度的資料出去 public abstract void write(byte[] buf, int off, int len) throws TTransportException; //清空transport中的資料快取 public void flush() throws TTransportException {} //下面四個方法,與ByteBuffer的原理類似 //獲取到本地快取的資料,沒有快取直接返回空 public byte[] getBuffer() { return null; } //返回本地快取下一個讀取位置,沒有快取返回0即可 public int getBufferPosition() { return 0; } //獲取本地快取中的位元組數,沒有快取返回-1 public int getBytesRemainingInBuffer() { return -1; } //從本地快取中消費n個位元組 public void consumeBuffer(int len) {} }

        TTransport主要作用是定義了IO讀寫操作以及本地快取的操作,下面來看TIOStreamTransport是如何實現的。

TIOStreamTransport

        顧名思義,這是一個面向流的Transport類,下面是原始碼。

public class TIOStreamTransport extends TTransport {

  private static final Logger LOGGER = LoggerFactory.getLogger(TIOStreamTransport.class.getName());

  protected InputStream inputStream_ = null;//輸入流
  protected OutputStream outputStream_ = null;//輸出流

  protected TIOStreamTransport() {}

  public TIOStreamTransport(InputStream is) {
    inputStream_ = is;
  }

  public TIOStreamTransport(OutputStream os) {
    outputStream_ = os;
  }

  public TIOStreamTransport(InputStream is, OutputStream os) {
    inputStream_ = is;
    outputStream_ = os;
  }

  //所有的流,必須在構造時就被開啟,所以這裡一直返回true
  public boolean isOpen() {
    return true;
  }

  //因為流必須已經被開啟,所以這裡什麼也不做
  public void open() throws TTransportException {}

  //同時將輸出、輸入流關閉
  public void close() {
    if (inputStream_ != null) {
      try {
        inputStream_.close();
      } catch (IOException iox) {
        LOGGER.warn("Error closing input stream.", iox);
      }
      inputStream_ = null;
    }
    if (outputStream_ != null) {
      try {
        outputStream_.close();
      } catch (IOException iox) {
        LOGGER.warn("Error closing output stream.", iox);
      }
      outputStream_ = null;
    }
  }

  //使用輸入流讀取
  public int read(byte[] buf, int off, int len) throws TTransportException {
    if (inputStream_ == null) {
      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot read from null inputStream");
    }
    int bytesRead;
    try {
      bytesRead = inputStream_.read(buf, off, len);
    } catch (IOException iox) {
      throw new TTransportException(TTransportException.UNKNOWN, iox);
    }
    if (bytesRead < 0) {
      throw new TTransportException(TTransportException.END_OF_FILE);
    }
    return bytesRead;
  }

  //使用輸出流寫出
  public void write(byte[] buf, int off, int len) throws TTransportException {
    if (outputStream_ == null) {
      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream");
    }
    try {
      outputStream_.write(buf, off, len);
    } catch (IOException iox) {
      throw new TTransportException(TTransportException.UNKNOWN, iox);
    }
  }

 //清空輸出流
  public void flush() throws TTransportException {
    if (outputStream_ == null) {
      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot flush null outputStream");
    }
    try {
      outputStream_.flush();
    } catch (IOException iox) {
      throw new TTransportException(TTransportException.UNKNOWN, iox);
    }
  }
}

        該類中的原始碼很簡單,有IO程式設計經驗的同學都可以輕鬆看懂。

TSocket

        TSocket是直接繼承TIOStreamTransport,也是我們在阻塞型服務中直接使用到的TTransport類,下面是原始碼:

//繼承自TIOStreamTransport,父類已經將IO流的相關操作封裝好
public class TSocket extends TIOStreamTransport {

  private static final Logger LOGGER = LoggerFactory.getLogger(TSocket.class.getName());

  private Socket socket_;//java中的Socket
  private String host_;//服務端host
  private int port_;//服務端服務埠
  private int socketTimeout_;//Socket讀超時時間
  private int connectTimeout_;//連線超時時間

  public TSocket(Socket socket) throws TTransportException {
    socket_ = socket;
    try {
      socket_.setSoLinger(false, 0);
      socket_.setTcpNoDelay(true);
      socket_.setKeepAlive(true);
    } catch (SocketException sx) {
      LOGGER.warn("Could not configure socket.", sx);
    }
    //當前Socket已開啟時,初始化本地輸入、輸出流變數
    if (isOpen()) {
      try {
        inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
        outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
      } catch (IOException iox) {
        close();
        throw new TTransportException(TTransportException.NOT_OPEN, iox);
      }
    }
  }

  public TSocket(String host, int port) {
    this(host, port, 0);
  }

  public TSocket(String host, int port, int timeout) {
    this(host, port, timeout, timeout);
  }

  public TSocket(String host, int port, int socketTimeout, int connectTimeout) {
    host_ = host;
    port_ = port;
    socketTimeout_ = socketTimeout;
    connectTimeout_ = connectTimeout;
    initSocket();//根據上面的引數建立Socket
  }
  //初始化Socket
  private void initSocket() {
    socket_ = new Socket();
    try {
      socket_.setSoLinger(false, 0);
      socket_.setTcpNoDelay(true);
      socket_.setKeepAlive(true);
      socket_.setSoTimeout(socketTimeout_);
    } catch (SocketException sx) {
      LOGGER.error("Could not configure socket.", sx);
    }
  }

  public void setTimeout(int timeout) {
    this.setConnectTimeout(timeout);
    this.setSocketTimeout(timeout);
  }

  public void setConnectTimeout(int timeout) {
    connectTimeout_ = timeout;
  }

  public void setSocketTimeout(int timeout) {
    socketTimeout_ = timeout;
    try {
      socket_.setSoTimeout(timeout);
    } catch (SocketException sx) {
      LOGGER.warn("Could not set socket timeout.", sx);
    }
  }
  //Socket為空時初始化
  public Socket getSocket() {
    if (socket_ == null) {
      initSocket();
    }
    return socket_;
  }
  //Socket是否已經開啟
  public boolean isOpen() {
    if (socket_ == null) {
      return false;
    }
    return socket_.isConnected();
  }

  //開啟連線,主要作用是初始化Socket、輸入輸出流
  public void open() throws TTransportException {
    if (isOpen()) {
      throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected.");
    }

    if (host_ == null || host_.length() == 0) {
      throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host.");
    }
    if (port_ <= 0 || port_ > 65535) {
      throw new TTransportException(TTransportException.NOT_OPEN, "Invalid port " + port_);
    }

    if (socket_ == null) {
      initSocket();
    }

    try {
      socket_.connect(new InetSocketAddress(host_, port_), connectTimeout_);
      inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
      outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
    } catch (IOException iox) {
      close();
      throw new TTransportException(TTransportException.NOT_OPEN, iox);
    }
  }

 //關閉當前連線
  public void close() {
    super.close();//先關閉輸入輸出流
    //再關閉Socket
    if (socket_ != null) {
      try {
        socket_.close();
      } catch (IOException iox) {
        LOGGER.warn("Could not close socket.", iox);
      }
      socket_ = null;
    }
  }
}

        至此,TSocket的實現介紹完畢,主要的read、write操作都是使用父類的方法,而該類只需要根據Socket初始化父類的輸入、輸出流變數即可。

TFramedTransport

        TFramedTransport該類主要是起到封裝和緩衝作用,真正IO讀寫還是依賴內部的TTransport成員變數,在服務端為非阻塞的情況下使用,這是由於非阻塞的服務端都是使用了Java Nio,而Java Nio是無法知道什麼時候讀完了一次請求的所有資料,所以使用TFramedTransport來設定前四個位元組表示此次資料請求的size,這樣,服務端讀取資料時設定size+4的ByteBuffer即可,讀滿ByteBuffer代表此次請求的資料全部讀完,由於TFramedTransport中使用到了TMemoryInputTransport,所以先看下TMemoryInputTransport的原始碼。

public final class TMemoryInputTransport extends TTransport {

  private byte[] buf_; //儲存位元組的陣列
  private int pos_;//可讀的位置
  private int endPos_;//最後可讀的位置

  public TMemoryInputTransport() {
  }

  public TMemoryInputTransport(byte[] buf) {
    reset(buf);
  }

  public TMemoryInputTransport(byte[] buf, int offset, int length) {
    reset(buf, offset, length);
  }
  //重置陣列物件
  public void reset(byte[] buf) {
    reset(buf, 0, buf.length);
  }

  public void reset(byte[] buf, int offset, int length) {
    buf_ = buf;
    pos_ = offset;
    endPos_ = offset + length;
  }

  public void clear() {
    buf_ = null;
  }

  @Override
  public void close() {}

  @Override
  public boolean isOpen() {
    return true;
  }

  @Override
  public void open() throws TTransportException {}

  //獲取到buf_位元組陣列物件
  @Override
  public byte[] getBuffer() {
    return buf_;
  }
  //拿到可以開始讀的位置
  public int getBufferPosition() {
    return pos_;
  }
  //位元組陣列中還有多少位元組可讀
  public int getBytesRemainingInBuffer() {
    return endPos_ - pos_;
  }
  //pos_向後移len個位置
  public void consumeBuffer(int len) {
    pos_ += len;
  }

  //先看陣列中還有多少可讀位元組,取請求讀長度和實際可讀位元組中較小者後,如果該值大於0,則將buf_中從pos_可讀位置開始拷貝amtToRead個位元組到buf中,同時將可讀位置向後調整amtToRead長度,返回讀了amtToRead個位元組
  @Override
  public int read(byte[] buf, int off, int len) throws TTransportException {
    int bytesRemaining = getBytesRemainingInBuffer();
    int amtToRead = (len > bytesRemaining ? bytesRemaining : len);
    if (amtToRead > 0) {
      System.arraycopy(buf_, pos_, buf, off, amtToRead);
      consumeBuffer(amtToRead);
    }
    return amtToRead;
  }
  //不支援寫
  @Override
  public void write(byte[] buf, int off, int len) throws TTransportException {
    throw new UnsupportedOperationException("No writing allowed!");
  }

}

        該類的原始碼還是比較簡單的,原理和ByteBuffer類似,不過僅能用作讀。需要理解read方法的作用,這裡會有一個下標pos_控制可讀取資料位置,一旦讀完到endPos_,再調read則一直返回0,下面貼TFramedTransport的原始碼。

public class TFramedTransport extends TTransport {
  protected static final int DEFAULT_MAX_LENGTH = 16384000;//預設的本地快取最大位元組數
  private int maxLength_;//當前物件的本地快取最大位元組數
  private TTransport transport_ = null;//當前物件通過該物件實現資料的讀取與寫入

  private final TByteArrayOutputStream writeBuffer_ =
    new TByteArrayOutputStream(1024);//本地快取,用於輸出,繼承自ByteArrayOutputStream,知道可以將位元組陣列輸出即可
  private TMemoryInputTransport readBuffer_ = new TMemoryInputTransport(new byte[0]);//用於資料讀取的本地快取
  //工廠類,將一個TTransport物件封裝為一個TFramedTransport物件
  public static class Factory extends TTransportFactory {
    private int maxLength_;
    public Factory() {
      maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
    }
    public Factory(int maxLength) {
      maxLength_ = maxLength;
    }
    @Override
    public TTransport getTransport(TTransport base) {
      return new TFramedTransport(base, maxLength_);
    }
  }
  public TFramedTransport(TTransport transport, int maxLength) {
    transport_ = transport;
    maxLength_ = maxLength;
  }
  public TFramedTransport(TTransport transport) {
    transport_ = transport;
    maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
  }
  //下面三個方法都呼叫transport相應方法實現,作用相同
  public void open() throws TTransportException {
    transport_.open();
  }
  public boolean isOpen() {
    return transport_.isOpen();
  }
  public void close() {
    transport_.close();
  }
  //在一次客戶端的請求過程或服務端處理請求的過程中,肯定要多次呼叫該方法讀資料,但只有第一次呼叫時由於readBuffer_位元組為0或儲存著上次快取的內容且已經讀完,所以從readBuffer_肯定讀出的got<=0,從而進入readFrame()方法,將資料從transport_讀到本地快取readBuffer_,後續資料的讀取got肯定大於0,直到本次的請求處理完畢
  public int read(byte[] buf, int off, int len) throws TTransportException {
    //readBuffer_已初始化
    if (readBuffer_ != null) {
      int got = readBuffer_.read(buf, off, len);//原始碼在上面,清楚兩點即可,readBuffer_已讀完或位元組數為0時 肯定返回got<0。也就是說,在一次客戶端的請求中第一次呼叫該方法時,肯定返回got<0,就可以進入 readFrame()方法。
      if (got > 0) {
        return got;
      }
    }
    readFrame();//從transport_讀到本地快取readBuffer_
    return readBuffer_.read(buf, off, len);//向buf讀入資料
  }

  private final byte[] i32buf = new byte[4];//儲存本次請求資料的長度
  //將資料從transport_讀到本地快取readBuffer_中
  private void readFrame() throws TTransportException {
    transport_.readAll(i32buf, 0, 4);//先讀前4個位元組,代表資料的長度
    int size = decodeFrameSize(i32buf);//由於客戶端服務端會對 資料size進行序列化,所以這裡需要反序列化

    if (size < 0) {
      close();
      throw new TTransportException(TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!");
    }

    if (size > maxLength_) {
      close();
      throw new TTransportException(TTransportException.CORRUPTED_DATA,
          "Frame size (" + size + ") larger than max length (" + maxLength_ + ")!");
    }

    byte[] buff = new byte[size];
    transport_.readAll(buff, 0, size);//將完整的資料讀到buff
    readBuffer_.reset(buff);//重置本地快取
  }
  //下面四個方法都是呼叫readBuffer_實現
  @Override
  public byte[] getBuffer() {
    return readBuffer_.getBuffer();
  }
  @Override
  public int getBufferPosition() {
    return readBuffer_.getBufferPosition();
  }
  @Override
  public int getBytesRemainingInBuffer() {
    return readBuffer_.getBytesRemainingInBuffer();
  }
  @Override
  public void consumeBuffer(int len) {
    readBuffer_.consumeBuffer(len);
  }

  //下面兩個方法需要結合使用,write是向本地快取寫入資料,寫完後,所有的呼叫方都要對輸出流呼叫flush進行清空,所以下面一定會進入到flush方法,再通過transport_將本地快取的資料寫出去
  public void write(byte[] buf, int off, int len) throws TTransportException {
    writeBuffer_.write(buf, off, len);
  }
  @Override
  public void flush() throws TTransportException {
    byte[] buf = writeBuffer_.get();
    int len = writeBuffer_.len();
    writeBuffer_.reset();//清空,為下次作準備

    encodeFrameSize(len, i32buf);//序列化操作,與上面readFrame的反序列化對應
    transport_.write(i32buf, 0, 4);//先返回資料大小,與readFrame的先讀4位元組對應
    transport_.write(buf, 0, len);//再返回真實資料
    transport_.flush();//清空
  }

  //序列化反序列化資料大小size,序列化是將每個位元組高位都位移到低位組成byte陣列,反序列化反之
  public static final void encodeFrameSize(final int frameSize, final byte[] buf) {
    buf[0] = (byte)(0xff & (frameSize >> 24));
    buf[1] = (byte)(0xff & (frameSize >> 16));
    buf[2] = (byte)(0xff & (frameSize >> 8));
    buf[3] = (byte)(0xff & (frameSize));
  }
  public static final int decodeFrameSize(final byte[] buf) {
    return
      ((buf[0] & 0xff) << 24) |
      ((buf[1] & 0xff) << 16) |
      ((buf[2] & 0xff) <<  8) |
      ((buf[3] & 0xff));
  }
}

        原始碼還是較易懂,TFramedTransport通過將前四位元組表示資料長度,對本地快取進行讀寫,從而對上層提供的讀寫方法可保證讀入、寫出的資料的完整性,下面再上一張圖,介紹客戶端一次請求的完整流程,幫助理解TFramedTransport在非阻塞服務中的功能,可能會牽扯到之前講過的一些原始碼,忘記的同學可以回顧下,TProtocol、TProcessor等會簡單略過。

這裡寫圖片描述

TNonblockingSocket

        接下來看下該類原始碼,顧名思義,該類用於非阻塞型Socket通訊,在服務端、客戶端均能使用(客戶端使用時為非同步客戶端,過於複雜,這裡不討論,同步客戶端使用TFrameTransport即可),還是需要看看其父類TNonblockingTransport的原始碼:

public abstract class TNonblockingTransport extends TTransport {
  //@see java.nio.channels.SocketChannel#connect(SocketAddress remote) 這是原始碼的註釋,即可理解為該方法是需要呼叫SocketChannel.connect
  public abstract boolean startConnect() throws IOException;

  //完成連線,同樣要參考SocketChannel.finishConnect()
  public abstract boolean finishConnect() throws IOException;

  //很熟悉的方法,對,就是將當前物件註冊到selector上
  public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException;
  //向buffer中讀資料
  public abstract int read(ByteBuffer buffer) throws IOException;
  //向buffer中寫資料
  public abstract int write(ByteBuffer buffer) throws IOException;
}

        這裡不解釋太多,使用到Java Nio中的概念,不懂得同學還請自行百度,下面看TNonblockingSocket原始碼:

public class TNonblockingSocket extends TNonblockingTransport {

  private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingSocket.class.getName());

  private final SocketAddress socketAddress_;//host port資訊,在連線懶載入時使用

  private final SocketChannel socketChannel_;//Java Nio 非阻塞讀寫
  //下面兩個構造方法都只是初始化成員變數,並不open connecttion
  public TNonblockingSocket(String host, int port) throws IOException {
    this(host, port, 0);
  }
  public TNonblockingSocket(String host, int port, int timeout) throws IOException {
    this(SocketChannel.open(), timeout, new InetSocketAddress(host, port));
  }
  //這個構造方法要求socketChannel已經開啟connection,非阻塞服務使用該方法建立連線
  public TNonblockingSocket(SocketChannel socketChannel) throws IOException {
    this(socketChannel, 0, null);
    if (!socketChannel.isConnected()) throw new IOException("Socket must already be connected");
  }
  //socketChannel相關引數設定
  private TNonblockingSocket(SocketChannel socketChannel, int timeout, SocketAddress socketAddress)
      throws IOException {
    socketChannel_ = socketChannel;
    socketAddress_ = socketAddress;

    socketChannel.configureBlocking(false);

    Socket socket = socketChannel.socket();
    socket.setSoLinger(false, 0);
    socket.setTcpNoDelay(true);
    socket.setKeepAlive(true);
    setTimeout(timeout);
  }

  //將變數socketChannel_註冊到selector,返回SelectionKey
  public SelectionKey registerSelector(Selector selector, int interests) throws IOException {
    return socketChannel_.register(selector, interests);
  }

  //設定超時時間
  public void setTimeout(int timeout) {
    try {
      socketChannel_.socket().setSoTimeout(timeout);
    } catch (SocketException sx) {
      LOGGER.warn("Could not set socket timeout.", sx);
    }
  }

  public SocketChannel getSocketChannel() {
    return socketChannel_;
  }

  //連線是否已經處於連線狀態,isConnected方法在已經呼叫了close後會返回false,但isOpen方法會返回true
  public boolean isOpen() {
    return socketChannel_.isOpen() && socketChannel_.isConnected();
  }

  //不要呼叫該方法,提供了 懶載入startConnect()方法來開啟連線
  public void open() throws TTransportException {
    throw new RuntimeException("open() is not implemented for TNonblockingSocket");
  }
  //往buffer讀資料,非阻塞服務端呼叫該方法
  public int read(ByteBuffer buffer) throws IOException {
    return socketChannel_.read(buffer);
  }

  //方法作用見父類,這裡用socketChannel_實現
  public int read(byte[] buf, int off, int len) throws TTransportException {
    if ((socketChannel_.validOps() & SelectionKey.OP_READ) != SelectionKey.OP_READ) {
      throw new TTransportException(TTransportException.NOT_OPEN,
        "Cannot read from write-only socket channel");
    }
    try {
      return socketChannel_.read(ByteBuffer.wrap(buf, off, len));
    } catch (IOException iox) {
      throw new TTransportException(TTransportException.UNKNOWN, iox);
    }
  }
  //向buffer寫資料,非阻塞服務端呼叫該方法
  public int write(ByteBuffer buffer) throws IOException {
    return socketChannel_.write(buffer);
  }
  //方法作用見父類,這裡用socketChannel_實現
  public void write(byte[] buf, int off, int len) throws TTransportException {
    if ((socketChannel_.validOps() & SelectionKey.OP_WRITE) != SelectionKey.OP_WRITE) {
      throw new TTransportException(TTransportException.NOT_OPEN,
        "Cannot write to write-only socket channel");
    }
    try {
      socketChannel_.write(ByteBuffer.wrap(buf, off, len));
    } catch (IOException iox) {
      throw new TTransportException(TTransportException.UNKNOWN, iox);
    }
  }

  //SocketChannel不支援,為空即可
  public void flush() throws TTransportException {
  }
  //關閉socketChannel_
  public void close() {
    try {
      socketChannel_.close();
    } catch (IOException iox) {
      LOGGER.warn("Could not close socket.", iox);
    }
  }
  //開始連線
  public boolean startConnect() throws IOException {
    return socketChannel_.connect(socketAddress_);
  }
  //連線是否完成
  public boolean finishConnect() throws IOException {
    return socketChannel_.finishConnect();
  }

}

        對TNonblockingSocket,這裡我們清楚其底層是依賴SocketChannel實現即可,後並無難點,TTransport體系常用類原始碼就介紹到此,下一節看看TServerTransport類體系。

TServerTransport

TServerTransport

        這一塊的類都是用於服務端監聽埠,慣例,還是先看看頂層父類的原始碼:

public abstract class TServerTransport implements Closeable {
  //抽象引數類
  public static abstract class AbstractServerTransportArgs<T extends AbstractServerTransportArgs<T>> {
    int backlog = 0; //接收請求的佇列大小
    int clientTimeout = 0;//客戶端超時時間
    InetSocketAddress bindAddr;//監聽地址

    public T backlog(int backlog) {
      this.backlog = backlog;
      return (T) this;
    }
    public T clientTimeout(int clientTimeout) {
      this.clientTimeout = clientTimeout;
      return (T) this;
    }
    public T port(int port) {
      this.bindAddr = new InetSocketAddress(port);
      return (T) this;
    }

    public T bindAddr(InetSocketAddress bindAddr) {
      this.bindAddr = bindAddr;
      return (T) this;
    }
  }
  //開始監聽客戶端請求
  public abstract void listen() throws TTransportException;
  //監聽到請求後,建立一個TTransport物件
  public final TTransport accept() throws TTransportException {
    TTransport transport = acceptImpl();
    if (transport == null) {
      throw new TTransportException("accept() may not return NULL");
    }
    return transport;
  }
  //建立TTransport物件的真正實現
  protected abstract TTransport acceptImpl() throws TTransportException;
  //停止監聽
  public abstract void close();
  //可選,作用是將當前物件從accept、listen方法的阻塞狀態中打斷
  public void interrupt() {}
}

TServerSocket

        阻塞服務監聽客戶端請求時使用的TServerSocket相對簡單,所以先看其原始碼:

public class TServerSocket extends TServerTransport {

  private static final Logger LOGGER = LoggerFactory.getLogger(TServerSocket.class.getName());

  private ServerSocket serverSocket_ = null;  //很熟悉吧,真正監聽客戶端請求的物件
  private int clientTimeout_ = 0;//接收到客戶端請求後,建立連線時的超時時間
  //引數類,多了ServerSocket物件
  public static class ServerSocketTransportArgs extends AbstractServerTransportArgs<ServerSocketTransportArgs> {
    ServerSocket serverSocket;

    public ServerSocketTransportArgs serverSocket(ServerSocket serverSocket) {
      this.serverSocket = serverSocket;
      return this;
    }
  }
  //下面多個構造方法這裡就不詳述了
  public TServerSocket(ServerSocket serverSocket) throws TTransportException {
    this(serverSocket, 0);
  }
  public TServerSocket(ServerSocket serverSocket, int clientTimeout) throws TTransportException {
    this(new ServerSocketTransportArgs().serverSocket(serverSocket).clientTimeout(clientTimeout));
  }
  public TServerSocket(int port) throws TTransportException {
    this(port, 0);
  }
  public TServerSocket(int port, int clientTimeout) throws TTransportException {
    this(new InetSocketAddress(port), clientTimeout);
  }

  public TServerSocket(InetSocketAddress bindAddr) throws TTransportException {
    this(bindAddr, 0);
  }
  public TServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
    this(new ServerSocketTransportArgs().bindAddr(bindAddr).clientTimeout(clientTimeout));
  }

  //上面所有的構造方法都會調到這裡,不多講,ServerSocket相關設定
  public TServerSocket(ServerSocketTransportArgs args) throws TTransportException {
    clientTimeout_ = args.clientTimeout;
    if (args.serverSocket != null) {
      this.serverSocket_ = args.serverSocket;
      return;
    }
    try {
      serverSocket_ = new ServerSocket();
      serverSocket_.setReuseAddress(true);
      serverSocket_.bind(args.bindAddr, args.backlog);
    } catch (IOException ioe) {
      close();
      throw new TTransportException("Could not create ServerSocket on address " + args.bindAddr.toString() + ".", ioe);
    }
  }
  //確定在accept的時候不要阻塞
  public void listen() throws TTransportException {
    if (serverSocket_ !=