Thrift原始碼系列----2.TTransport層原始碼分析
前言
前一章為大家介紹了Thrift類體系,給大家一個整體上的認識,本章開始對每一層的實現細節進行研究,這裡我們從與業務分離、處於最底層的TTransport層原始碼開始。
阻塞與非阻塞TTransport
看原始碼前,有必要先向大家說明一下,阻塞、非阻塞服務的服務端與客戶端是如何搭配使用TTransport,這裡還是隻研究我們常用的類,下圖是TTransport類間的關係圖:
- 服務端為阻塞式服務時,使用TServerSocket,接收到客戶端的請求建立一個TSocket通訊,客戶端使用TSocket配合即可。
- 服務端為非阻塞式服務時,使用TNonblockingServerSocket,接收到客戶端的請求建立一個TNonblockingSocket通訊,在讀完客戶端的請求資料後,儲存為本地一個TTransport物件,然後封裝為TFramedTransport物件進行處理(詳見AbstractNonblockingServer.Args類原始碼),客戶端使用TSocket,並且使用TFramedTransport封裝。
備註:當客戶端是非同步時的使用情況較複雜,這裡暫不討論。
TTransport類體系
TTransport的原始碼
作為所有TTransport層的父類,還是先看看它的原始碼。
public abstract class TTransport implements Closeable {
//當前連線是否已開啟
public abstract boolean isOpen();
//是否還有資料需要讀,當連線關閉時認為無資料可讀
public boolean peek() {
return isOpen();
}
//開啟當前連線,可用於IO讀寫
public abstract void open()
throws TTransportException;
//關閉當前連線
public abstract void close();
//向buf位元組陣列中寫入資料,從off開始,最多讀len長度的位元組,最後返回實際向buf寫入的位元組數
public abstract int read(byte[] buf, int off, int len)
throws TTransportException;
//確保向buf中從off開始寫入,len長度的位元組,這裡通過迴圈呼叫上面的方法實現最後返回向buf寫入的位元組數
public int readAll(byte[] buf, int off, int len)
throws TTransportException {
int got = 0;
int ret = 0;
while (got < len) {//沒有讀完繼續下一次讀取,直接讀到的資料大於等於需要的len長度
ret = read(buf, off+got, len-got);
if (ret <= 0) {
throw new TTransportException(
"Cannot read. Remote side has closed. Tried to read "
+ len
+ " bytes, but only got "
+ got
+ " bytes. (This is often indicative of an internal error on the server side. Please check your server logs.)");
}
got += ret;
}
return got;
}
//將buf中的資料全部發送出去
public void write(byte[] buf) throws TTransportException {
write(buf, 0, buf.length);
}
//將buf的資料,從off開始,傳送len長度的資料出去
public abstract void write(byte[] buf, int off, int len)
throws TTransportException;
//清空transport中的資料快取
public void flush()
throws TTransportException {}
//下面四個方法,與ByteBuffer的原理類似
//獲取到本地快取的資料,沒有快取直接返回空
public byte[] getBuffer() {
return null;
}
//返回本地快取下一個讀取位置,沒有快取返回0即可
public int getBufferPosition() {
return 0;
}
//獲取本地快取中的位元組數,沒有快取返回-1
public int getBytesRemainingInBuffer() {
return -1;
}
//從本地快取中消費n個位元組
public void consumeBuffer(int len) {}
}
TTransport主要作用是定義了IO讀寫操作以及本地快取的操作,下面來看TIOStreamTransport是如何實現的。
TIOStreamTransport
顧名思義,這是一個面向流的Transport類,下面是原始碼。
public class TIOStreamTransport extends TTransport {
private static final Logger LOGGER = LoggerFactory.getLogger(TIOStreamTransport.class.getName());
protected InputStream inputStream_ = null;//輸入流
protected OutputStream outputStream_ = null;//輸出流
protected TIOStreamTransport() {}
public TIOStreamTransport(InputStream is) {
inputStream_ = is;
}
public TIOStreamTransport(OutputStream os) {
outputStream_ = os;
}
public TIOStreamTransport(InputStream is, OutputStream os) {
inputStream_ = is;
outputStream_ = os;
}
//所有的流,必須在構造時就被開啟,所以這裡一直返回true
public boolean isOpen() {
return true;
}
//因為流必須已經被開啟,所以這裡什麼也不做
public void open() throws TTransportException {}
//同時將輸出、輸入流關閉
public void close() {
if (inputStream_ != null) {
try {
inputStream_.close();
} catch (IOException iox) {
LOGGER.warn("Error closing input stream.", iox);
}
inputStream_ = null;
}
if (outputStream_ != null) {
try {
outputStream_.close();
} catch (IOException iox) {
LOGGER.warn("Error closing output stream.", iox);
}
outputStream_ = null;
}
}
//使用輸入流讀取
public int read(byte[] buf, int off, int len) throws TTransportException {
if (inputStream_ == null) {
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot read from null inputStream");
}
int bytesRead;
try {
bytesRead = inputStream_.read(buf, off, len);
} catch (IOException iox) {
throw new TTransportException(TTransportException.UNKNOWN, iox);
}
if (bytesRead < 0) {
throw new TTransportException(TTransportException.END_OF_FILE);
}
return bytesRead;
}
//使用輸出流寫出
public void write(byte[] buf, int off, int len) throws TTransportException {
if (outputStream_ == null) {
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream");
}
try {
outputStream_.write(buf, off, len);
} catch (IOException iox) {
throw new TTransportException(TTransportException.UNKNOWN, iox);
}
}
//清空輸出流
public void flush() throws TTransportException {
if (outputStream_ == null) {
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot flush null outputStream");
}
try {
outputStream_.flush();
} catch (IOException iox) {
throw new TTransportException(TTransportException.UNKNOWN, iox);
}
}
}
該類中的原始碼很簡單,有IO程式設計經驗的同學都可以輕鬆看懂。
TSocket
TSocket是直接繼承TIOStreamTransport,也是我們在阻塞型服務中直接使用到的TTransport類,下面是原始碼:
//繼承自TIOStreamTransport,父類已經將IO流的相關操作封裝好
public class TSocket extends TIOStreamTransport {
private static final Logger LOGGER = LoggerFactory.getLogger(TSocket.class.getName());
private Socket socket_;//java中的Socket
private String host_;//服務端host
private int port_;//服務端服務埠
private int socketTimeout_;//Socket讀超時時間
private int connectTimeout_;//連線超時時間
public TSocket(Socket socket) throws TTransportException {
socket_ = socket;
try {
socket_.setSoLinger(false, 0);
socket_.setTcpNoDelay(true);
socket_.setKeepAlive(true);
} catch (SocketException sx) {
LOGGER.warn("Could not configure socket.", sx);
}
//當前Socket已開啟時,初始化本地輸入、輸出流變數
if (isOpen()) {
try {
inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
} catch (IOException iox) {
close();
throw new TTransportException(TTransportException.NOT_OPEN, iox);
}
}
}
public TSocket(String host, int port) {
this(host, port, 0);
}
public TSocket(String host, int port, int timeout) {
this(host, port, timeout, timeout);
}
public TSocket(String host, int port, int socketTimeout, int connectTimeout) {
host_ = host;
port_ = port;
socketTimeout_ = socketTimeout;
connectTimeout_ = connectTimeout;
initSocket();//根據上面的引數建立Socket
}
//初始化Socket
private void initSocket() {
socket_ = new Socket();
try {
socket_.setSoLinger(false, 0);
socket_.setTcpNoDelay(true);
socket_.setKeepAlive(true);
socket_.setSoTimeout(socketTimeout_);
} catch (SocketException sx) {
LOGGER.error("Could not configure socket.", sx);
}
}
public void setTimeout(int timeout) {
this.setConnectTimeout(timeout);
this.setSocketTimeout(timeout);
}
public void setConnectTimeout(int timeout) {
connectTimeout_ = timeout;
}
public void setSocketTimeout(int timeout) {
socketTimeout_ = timeout;
try {
socket_.setSoTimeout(timeout);
} catch (SocketException sx) {
LOGGER.warn("Could not set socket timeout.", sx);
}
}
//Socket為空時初始化
public Socket getSocket() {
if (socket_ == null) {
initSocket();
}
return socket_;
}
//Socket是否已經開啟
public boolean isOpen() {
if (socket_ == null) {
return false;
}
return socket_.isConnected();
}
//開啟連線,主要作用是初始化Socket、輸入輸出流
public void open() throws TTransportException {
if (isOpen()) {
throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected.");
}
if (host_ == null || host_.length() == 0) {
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host.");
}
if (port_ <= 0 || port_ > 65535) {
throw new TTransportException(TTransportException.NOT_OPEN, "Invalid port " + port_);
}
if (socket_ == null) {
initSocket();
}
try {
socket_.connect(new InetSocketAddress(host_, port_), connectTimeout_);
inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
} catch (IOException iox) {
close();
throw new TTransportException(TTransportException.NOT_OPEN, iox);
}
}
//關閉當前連線
public void close() {
super.close();//先關閉輸入輸出流
//再關閉Socket
if (socket_ != null) {
try {
socket_.close();
} catch (IOException iox) {
LOGGER.warn("Could not close socket.", iox);
}
socket_ = null;
}
}
}
至此,TSocket的實現介紹完畢,主要的read、write操作都是使用父類的方法,而該類只需要根據Socket初始化父類的輸入、輸出流變數即可。
TFramedTransport
TFramedTransport該類主要是起到封裝和緩衝作用,真正IO讀寫還是依賴內部的TTransport成員變數,在服務端為非阻塞的情況下使用,這是由於非阻塞的服務端都是使用了Java Nio,而Java Nio是無法知道什麼時候讀完了一次請求的所有資料,所以使用TFramedTransport來設定前四個位元組表示此次資料請求的size,這樣,服務端讀取資料時設定size+4的ByteBuffer即可,讀滿ByteBuffer代表此次請求的資料全部讀完,由於TFramedTransport中使用到了TMemoryInputTransport,所以先看下TMemoryInputTransport的原始碼。
public final class TMemoryInputTransport extends TTransport {
private byte[] buf_; //儲存位元組的陣列
private int pos_;//可讀的位置
private int endPos_;//最後可讀的位置
public TMemoryInputTransport() {
}
public TMemoryInputTransport(byte[] buf) {
reset(buf);
}
public TMemoryInputTransport(byte[] buf, int offset, int length) {
reset(buf, offset, length);
}
//重置陣列物件
public void reset(byte[] buf) {
reset(buf, 0, buf.length);
}
public void reset(byte[] buf, int offset, int length) {
buf_ = buf;
pos_ = offset;
endPos_ = offset + length;
}
public void clear() {
buf_ = null;
}
@Override
public void close() {}
@Override
public boolean isOpen() {
return true;
}
@Override
public void open() throws TTransportException {}
//獲取到buf_位元組陣列物件
@Override
public byte[] getBuffer() {
return buf_;
}
//拿到可以開始讀的位置
public int getBufferPosition() {
return pos_;
}
//位元組陣列中還有多少位元組可讀
public int getBytesRemainingInBuffer() {
return endPos_ - pos_;
}
//pos_向後移len個位置
public void consumeBuffer(int len) {
pos_ += len;
}
//先看陣列中還有多少可讀位元組,取請求讀長度和實際可讀位元組中較小者後,如果該值大於0,則將buf_中從pos_可讀位置開始拷貝amtToRead個位元組到buf中,同時將可讀位置向後調整amtToRead長度,返回讀了amtToRead個位元組
@Override
public int read(byte[] buf, int off, int len) throws TTransportException {
int bytesRemaining = getBytesRemainingInBuffer();
int amtToRead = (len > bytesRemaining ? bytesRemaining : len);
if (amtToRead > 0) {
System.arraycopy(buf_, pos_, buf, off, amtToRead);
consumeBuffer(amtToRead);
}
return amtToRead;
}
//不支援寫
@Override
public void write(byte[] buf, int off, int len) throws TTransportException {
throw new UnsupportedOperationException("No writing allowed!");
}
}
該類的原始碼還是比較簡單的,原理和ByteBuffer類似,不過僅能用作讀。需要理解read方法的作用,這裡會有一個下標pos_控制可讀取資料位置,一旦讀完到endPos_,再調read則一直返回0,下面貼TFramedTransport的原始碼。
public class TFramedTransport extends TTransport {
protected static final int DEFAULT_MAX_LENGTH = 16384000;//預設的本地快取最大位元組數
private int maxLength_;//當前物件的本地快取最大位元組數
private TTransport transport_ = null;//當前物件通過該物件實現資料的讀取與寫入
private final TByteArrayOutputStream writeBuffer_ =
new TByteArrayOutputStream(1024);//本地快取,用於輸出,繼承自ByteArrayOutputStream,知道可以將位元組陣列輸出即可
private TMemoryInputTransport readBuffer_ = new TMemoryInputTransport(new byte[0]);//用於資料讀取的本地快取
//工廠類,將一個TTransport物件封裝為一個TFramedTransport物件
public static class Factory extends TTransportFactory {
private int maxLength_;
public Factory() {
maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
}
public Factory(int maxLength) {
maxLength_ = maxLength;
}
@Override
public TTransport getTransport(TTransport base) {
return new TFramedTransport(base, maxLength_);
}
}
public TFramedTransport(TTransport transport, int maxLength) {
transport_ = transport;
maxLength_ = maxLength;
}
public TFramedTransport(TTransport transport) {
transport_ = transport;
maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
}
//下面三個方法都呼叫transport相應方法實現,作用相同
public void open() throws TTransportException {
transport_.open();
}
public boolean isOpen() {
return transport_.isOpen();
}
public void close() {
transport_.close();
}
//在一次客戶端的請求過程或服務端處理請求的過程中,肯定要多次呼叫該方法讀資料,但只有第一次呼叫時由於readBuffer_位元組為0或儲存著上次快取的內容且已經讀完,所以從readBuffer_肯定讀出的got<=0,從而進入readFrame()方法,將資料從transport_讀到本地快取readBuffer_,後續資料的讀取got肯定大於0,直到本次的請求處理完畢
public int read(byte[] buf, int off, int len) throws TTransportException {
//readBuffer_已初始化
if (readBuffer_ != null) {
int got = readBuffer_.read(buf, off, len);//原始碼在上面,清楚兩點即可,readBuffer_已讀完或位元組數為0時 肯定返回got<0。也就是說,在一次客戶端的請求中第一次呼叫該方法時,肯定返回got<0,就可以進入 readFrame()方法。
if (got > 0) {
return got;
}
}
readFrame();//從transport_讀到本地快取readBuffer_
return readBuffer_.read(buf, off, len);//向buf讀入資料
}
private final byte[] i32buf = new byte[4];//儲存本次請求資料的長度
//將資料從transport_讀到本地快取readBuffer_中
private void readFrame() throws TTransportException {
transport_.readAll(i32buf, 0, 4);//先讀前4個位元組,代表資料的長度
int size = decodeFrameSize(i32buf);//由於客戶端服務端會對 資料size進行序列化,所以這裡需要反序列化
if (size < 0) {
close();
throw new TTransportException(TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!");
}
if (size > maxLength_) {
close();
throw new TTransportException(TTransportException.CORRUPTED_DATA,
"Frame size (" + size + ") larger than max length (" + maxLength_ + ")!");
}
byte[] buff = new byte[size];
transport_.readAll(buff, 0, size);//將完整的資料讀到buff
readBuffer_.reset(buff);//重置本地快取
}
//下面四個方法都是呼叫readBuffer_實現
@Override
public byte[] getBuffer() {
return readBuffer_.getBuffer();
}
@Override
public int getBufferPosition() {
return readBuffer_.getBufferPosition();
}
@Override
public int getBytesRemainingInBuffer() {
return readBuffer_.getBytesRemainingInBuffer();
}
@Override
public void consumeBuffer(int len) {
readBuffer_.consumeBuffer(len);
}
//下面兩個方法需要結合使用,write是向本地快取寫入資料,寫完後,所有的呼叫方都要對輸出流呼叫flush進行清空,所以下面一定會進入到flush方法,再通過transport_將本地快取的資料寫出去
public void write(byte[] buf, int off, int len) throws TTransportException {
writeBuffer_.write(buf, off, len);
}
@Override
public void flush() throws TTransportException {
byte[] buf = writeBuffer_.get();
int len = writeBuffer_.len();
writeBuffer_.reset();//清空,為下次作準備
encodeFrameSize(len, i32buf);//序列化操作,與上面readFrame的反序列化對應
transport_.write(i32buf, 0, 4);//先返回資料大小,與readFrame的先讀4位元組對應
transport_.write(buf, 0, len);//再返回真實資料
transport_.flush();//清空
}
//序列化反序列化資料大小size,序列化是將每個位元組高位都位移到低位組成byte陣列,反序列化反之
public static final void encodeFrameSize(final int frameSize, final byte[] buf) {
buf[0] = (byte)(0xff & (frameSize >> 24));
buf[1] = (byte)(0xff & (frameSize >> 16));
buf[2] = (byte)(0xff & (frameSize >> 8));
buf[3] = (byte)(0xff & (frameSize));
}
public static final int decodeFrameSize(final byte[] buf) {
return
((buf[0] & 0xff) << 24) |
((buf[1] & 0xff) << 16) |
((buf[2] & 0xff) << 8) |
((buf[3] & 0xff));
}
}
原始碼還是較易懂,TFramedTransport通過將前四位元組表示資料長度,對本地快取進行讀寫,從而對上層提供的讀寫方法可保證讀入、寫出的資料的完整性,下面再上一張圖,介紹客戶端一次請求的完整流程,幫助理解TFramedTransport在非阻塞服務中的功能,可能會牽扯到之前講過的一些原始碼,忘記的同學可以回顧下,TProtocol、TProcessor等會簡單略過。
TNonblockingSocket
接下來看下該類原始碼,顧名思義,該類用於非阻塞型Socket通訊,在服務端、客戶端均能使用(客戶端使用時為非同步客戶端,過於複雜,這裡不討論,同步客戶端使用TFrameTransport即可),還是需要看看其父類TNonblockingTransport的原始碼:
public abstract class TNonblockingTransport extends TTransport {
//@see java.nio.channels.SocketChannel#connect(SocketAddress remote) 這是原始碼的註釋,即可理解為該方法是需要呼叫SocketChannel.connect
public abstract boolean startConnect() throws IOException;
//完成連線,同樣要參考SocketChannel.finishConnect()
public abstract boolean finishConnect() throws IOException;
//很熟悉的方法,對,就是將當前物件註冊到selector上
public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException;
//向buffer中讀資料
public abstract int read(ByteBuffer buffer) throws IOException;
//向buffer中寫資料
public abstract int write(ByteBuffer buffer) throws IOException;
}
這裡不解釋太多,使用到Java Nio中的概念,不懂得同學還請自行百度,下面看TNonblockingSocket原始碼:
public class TNonblockingSocket extends TNonblockingTransport {
private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingSocket.class.getName());
private final SocketAddress socketAddress_;//host port資訊,在連線懶載入時使用
private final SocketChannel socketChannel_;//Java Nio 非阻塞讀寫
//下面兩個構造方法都只是初始化成員變數,並不open connecttion
public TNonblockingSocket(String host, int port) throws IOException {
this(host, port, 0);
}
public TNonblockingSocket(String host, int port, int timeout) throws IOException {
this(SocketChannel.open(), timeout, new InetSocketAddress(host, port));
}
//這個構造方法要求socketChannel已經開啟connection,非阻塞服務使用該方法建立連線
public TNonblockingSocket(SocketChannel socketChannel) throws IOException {
this(socketChannel, 0, null);
if (!socketChannel.isConnected()) throw new IOException("Socket must already be connected");
}
//socketChannel相關引數設定
private TNonblockingSocket(SocketChannel socketChannel, int timeout, SocketAddress socketAddress)
throws IOException {
socketChannel_ = socketChannel;
socketAddress_ = socketAddress;
socketChannel.configureBlocking(false);
Socket socket = socketChannel.socket();
socket.setSoLinger(false, 0);
socket.setTcpNoDelay(true);
socket.setKeepAlive(true);
setTimeout(timeout);
}
//將變數socketChannel_註冊到selector,返回SelectionKey
public SelectionKey registerSelector(Selector selector, int interests) throws IOException {
return socketChannel_.register(selector, interests);
}
//設定超時時間
public void setTimeout(int timeout) {
try {
socketChannel_.socket().setSoTimeout(timeout);
} catch (SocketException sx) {
LOGGER.warn("Could not set socket timeout.", sx);
}
}
public SocketChannel getSocketChannel() {
return socketChannel_;
}
//連線是否已經處於連線狀態,isConnected方法在已經呼叫了close後會返回false,但isOpen方法會返回true
public boolean isOpen() {
return socketChannel_.isOpen() && socketChannel_.isConnected();
}
//不要呼叫該方法,提供了 懶載入startConnect()方法來開啟連線
public void open() throws TTransportException {
throw new RuntimeException("open() is not implemented for TNonblockingSocket");
}
//往buffer讀資料,非阻塞服務端呼叫該方法
public int read(ByteBuffer buffer) throws IOException {
return socketChannel_.read(buffer);
}
//方法作用見父類,這裡用socketChannel_實現
public int read(byte[] buf, int off, int len) throws TTransportException {
if ((socketChannel_.validOps() & SelectionKey.OP_READ) != SelectionKey.OP_READ) {
throw new TTransportException(TTransportException.NOT_OPEN,
"Cannot read from write-only socket channel");
}
try {
return socketChannel_.read(ByteBuffer.wrap(buf, off, len));
} catch (IOException iox) {
throw new TTransportException(TTransportException.UNKNOWN, iox);
}
}
//向buffer寫資料,非阻塞服務端呼叫該方法
public int write(ByteBuffer buffer) throws IOException {
return socketChannel_.write(buffer);
}
//方法作用見父類,這裡用socketChannel_實現
public void write(byte[] buf, int off, int len) throws TTransportException {
if ((socketChannel_.validOps() & SelectionKey.OP_WRITE) != SelectionKey.OP_WRITE) {
throw new TTransportException(TTransportException.NOT_OPEN,
"Cannot write to write-only socket channel");
}
try {
socketChannel_.write(ByteBuffer.wrap(buf, off, len));
} catch (IOException iox) {
throw new TTransportException(TTransportException.UNKNOWN, iox);
}
}
//SocketChannel不支援,為空即可
public void flush() throws TTransportException {
}
//關閉socketChannel_
public void close() {
try {
socketChannel_.close();
} catch (IOException iox) {
LOGGER.warn("Could not close socket.", iox);
}
}
//開始連線
public boolean startConnect() throws IOException {
return socketChannel_.connect(socketAddress_);
}
//連線是否完成
public boolean finishConnect() throws IOException {
return socketChannel_.finishConnect();
}
}
對TNonblockingSocket,這裡我們清楚其底層是依賴SocketChannel實現即可,後並無難點,TTransport體系常用類原始碼就介紹到此,下一節看看TServerTransport類體系。
TServerTransport
TServerTransport
這一塊的類都是用於服務端監聽埠,慣例,還是先看看頂層父類的原始碼:
public abstract class TServerTransport implements Closeable {
//抽象引數類
public static abstract class AbstractServerTransportArgs<T extends AbstractServerTransportArgs<T>> {
int backlog = 0; //接收請求的佇列大小
int clientTimeout = 0;//客戶端超時時間
InetSocketAddress bindAddr;//監聽地址
public T backlog(int backlog) {
this.backlog = backlog;
return (T) this;
}
public T clientTimeout(int clientTimeout) {
this.clientTimeout = clientTimeout;
return (T) this;
}
public T port(int port) {
this.bindAddr = new InetSocketAddress(port);
return (T) this;
}
public T bindAddr(InetSocketAddress bindAddr) {
this.bindAddr = bindAddr;
return (T) this;
}
}
//開始監聽客戶端請求
public abstract void listen() throws TTransportException;
//監聽到請求後,建立一個TTransport物件
public final TTransport accept() throws TTransportException {
TTransport transport = acceptImpl();
if (transport == null) {
throw new TTransportException("accept() may not return NULL");
}
return transport;
}
//建立TTransport物件的真正實現
protected abstract TTransport acceptImpl() throws TTransportException;
//停止監聽
public abstract void close();
//可選,作用是將當前物件從accept、listen方法的阻塞狀態中打斷
public void interrupt() {}
}
TServerSocket
阻塞服務監聽客戶端請求時使用的TServerSocket相對簡單,所以先看其原始碼:
public class TServerSocket extends TServerTransport {
private static final Logger LOGGER = LoggerFactory.getLogger(TServerSocket.class.getName());
private ServerSocket serverSocket_ = null; //很熟悉吧,真正監聽客戶端請求的物件
private int clientTimeout_ = 0;//接收到客戶端請求後,建立連線時的超時時間
//引數類,多了ServerSocket物件
public static class ServerSocketTransportArgs extends AbstractServerTransportArgs<ServerSocketTransportArgs> {
ServerSocket serverSocket;
public ServerSocketTransportArgs serverSocket(ServerSocket serverSocket) {
this.serverSocket = serverSocket;
return this;
}
}
//下面多個構造方法這裡就不詳述了
public TServerSocket(ServerSocket serverSocket) throws TTransportException {
this(serverSocket, 0);
}
public TServerSocket(ServerSocket serverSocket, int clientTimeout) throws TTransportException {
this(new ServerSocketTransportArgs().serverSocket(serverSocket).clientTimeout(clientTimeout));
}
public TServerSocket(int port) throws TTransportException {
this(port, 0);
}
public TServerSocket(int port, int clientTimeout) throws TTransportException {
this(new InetSocketAddress(port), clientTimeout);
}
public TServerSocket(InetSocketAddress bindAddr) throws TTransportException {
this(bindAddr, 0);
}
public TServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
this(new ServerSocketTransportArgs().bindAddr(bindAddr).clientTimeout(clientTimeout));
}
//上面所有的構造方法都會調到這裡,不多講,ServerSocket相關設定
public TServerSocket(ServerSocketTransportArgs args) throws TTransportException {
clientTimeout_ = args.clientTimeout;
if (args.serverSocket != null) {
this.serverSocket_ = args.serverSocket;
return;
}
try {
serverSocket_ = new ServerSocket();
serverSocket_.setReuseAddress(true);
serverSocket_.bind(args.bindAddr, args.backlog);
} catch (IOException ioe) {
close();
throw new TTransportException("Could not create ServerSocket on address " + args.bindAddr.toString() + ".", ioe);
}
}
//確定在accept的時候不要阻塞
public void listen() throws TTransportException {
if (serverSocket_ !=