ZooKeeper-客戶端連線ServerCnxn之NIOServerCnxn
背景
ServerCnxn
代表了一個客戶端與一個server的連線,其有兩種實現,分別是NIOServerCnxn
和NettyServerCnxn
,類圖如下:
本文介紹ZooKeeper是如何通過NIOServerCnxn
實現網路IO的.
處理read事件
發生時機
當SocketChannel
上有資料可讀時,worker thread呼叫NIOServerCnxn.doIO()
進行讀操作
粘包拆包問題
處理讀事件比較麻煩的問題就是通過TCP傳送的報文會出現粘包拆包問題,Zookeeper為了解決此問題,在設計通訊協議時將報文分為3個部分:
- 請求頭和請求體的長度(4個位元組)
- 請求頭
- 請求體
注:(1)請求頭和請求體也細分為更小的部分,但在此不做深入研究,只需知道請求的前4個位元組是請求頭和請求體的長度即可.(2)將請求頭和請求體稱之為payload
在報文頭增加了4個位元組的長度欄位,表示整個報文除長度欄位之外的長度.服務端可根據該長度將粘包拆包的報文分離或組合為完整的報文.NIOServerCnxn
讀取資料流程如下:
- NIOServerCnxn中有兩個屬性,一個是lenBuffer,容量為4個位元組,用於讀取長度資訊.一個是incomingBuffer,其初始化時即為lenBuffer,但是讀取長度資訊後,就為incomingBuffer分配對應的空間用於讀取payload
- 根據請求報文的長度分配incomingBuffer的大小
- 將讀到的位元組存放在incomingBuffer中,直至讀滿(由於第2步中為incomingBuffer分配的長度剛好是報文的長度,此時incomingBuffer中剛好時一個報文)
- 處理報文
程式碼如下:
void doIO(SelectionKey k) throws InterruptedException {
try {
...
/*
處理讀操作的流程
1.最開始incomingBuffer就是lenBuffer,容量為4.第一次讀取4個位元組,即此次請求報文的長度
2.根據請求報文的長度分配incomingBuffer的大小
3.將讀到的位元組存放在incomingBuffer中,直至讀滿
(由於第2步中為incomingBuffer分配的長度剛好是報文的長度,此時incomingBuffer中剛好時一個報文)
4.處理報文
*/
if (k.isReadable()) {
//若是客戶端請求,此時觸發讀事件
//初始化時incomingBuffer即時lengthBuffer,只分配了4個位元組,供使用者讀取一個int(此int值就是此次請求報文的總長度)
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
}
/*
只有incomingBuffer.remaining() == 0,才會進行下一步的處理,否則一直讀取資料直到incomingBuffer讀滿,此時有兩種可能:
1.incomingBuffer就是lenBuffer,此時incomingBuffer的內容是此次請求報文的長度.
根據lenBuffer為incomingBuffer分配空間後呼叫readPayload().
在readPayload()中會立馬進行一次資料讀取,(1)若可以將incomingBuffer讀滿,則incomingBuffer中就是一個完整的請求,處理該請求;
(2)若不能將incomingBuffer讀滿,說明出現了拆包問題,此時不能構造一個完整的請求,只能等待客戶端繼續傳送資料,等到下次socketChannel可讀時,繼續將資料讀取到incomingBuffer中
2.incomingBuffer不是lenBuffer,說明上次讀取時出現了拆包問題,incomingBuffer中只有一個請求的部分資料.
而這次讀取的資料加上上次讀取的資料湊成了一個完整的請求,呼叫readPayload()
*/
if (incomingBuffer.remaining() == 0) {
boolean isPayload;
if (incomingBuffer == lenBuffer) {
// start of next request
//解析上文中讀取的報文總長度,同時為"incomingBuffer"分配len的空間供讀取全部報文
incomingBuffer.flip();
//為incomeingBuffer分配空間時還包括了判斷是否是"4字命令"的邏輯
isPayload = readLength(k);
incomingBuffer.clear();
} else {
//2.incomingBuffer不是lenBuffer,此時incomingBuffer的內容是payload
// continuation
isPayload = true;
}
if (isPayload) {
// not the case for 4letterword
//處理報文
readPayload();
} else {
// four letter words take care
// need not do anything else
return;
}
}
}
...
} catch (CancelledKeyException e) {
...
}
}
/**
* 有兩種情況會呼叫此方法:
* 1.根據lengthBuffer的值為incomingBuffer分配空間後,此時尚未將資料從socketChannel讀取至incomingBuffer中
* 2.已經將資料從socketChannel中讀取至incomingBuffer,且讀取完畢
* <p>
* Read the request payload (everything following the length prefix)
*/
private void readPayload() throws IOException, InterruptedException {
// have we read length bytes?
if (incomingBuffer.remaining() != 0) {
// sock is non-blocking, so ok
//對應情況1,此時剛為incomingBuffer分配空間,incomingBuffer為空,進行一次資料讀取
//(1)若將incomingBuffer讀滿,則直接進行處理;
//(2)若未將incomingBuffer讀滿,則說明此次傳送的資料不能構成一個完整的請求,則等待下一次資料到達後呼叫doIo()時再次將資料
//從socketChannel讀取至incomingBuffer
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
}
}
// have we read length bytes?
if (incomingBuffer.remaining() == 0) {
//不管是情況1還是情況2,此時incomingBuffer已讀滿,其中內容必是一個request,處理該request
//更新統計值
packetReceived();
incomingBuffer.flip();
if (!initialized) {
//處理連線請求
readConnectRequest();
} else {
//處理普通請求
readRequest();
}
//請求處理結束,重置lenBuffer和incomingBuffer
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
}
解決粘包拆包的思路如上所述,程式碼中增加了很多註釋.
思考
個人認為,上述資料讀取過程一次至多讀取一個請求,即使在此次可讀取的資料中包含多個請求也是如此.而TCP報文的MSS一般為1460,客戶端的請求為50~100位元組,在客戶端請求非常頻繁時,一個TCP報文完全可以包含多個請求.
為了解決該問題,可以增加一個屬性outgoingIncomingBuffer
,其資料型別為List<ByteBuffer>
用於存放此次讀取的完整的請求,這樣就可將此次可讀取的資料全部讀取完畢,無需等到下一次selector.select()
,減輕了selector.select()
的負擔.
處理write事件
發生時機
當SocketChannel
可寫時,worker thread呼叫NIOServerCnxn.doIO()
進行寫操作
DirectByteBuffer
由於Zookeeper中使用了DirectByteBuffer
進行IO操作,在此簡單介紹下DirectByteBuffer
和HeapByteBuffer
的區別.
HeapByteBuffer
是在堆上分配的記憶體,而DirectByteBuffer
是在堆外分配的記憶體,又稱直接記憶體.使用HeapByteBuffer
進行IO時,比如呼叫FileChannel.write(HeapByteBuffer)
將資料寫到File中時,有兩個步驟:
- 將
HeapByteBuffer
的資料拷貝到DirectByteBuffer
- 再從堆外記憶體將資料寫入到檔案中.
問題1:為什麼要將HeapByteBuffer
的資料拷貝到DirectByteBuffer
呢?不能將資料直接從HeapByteBuffer
拷貝到檔案中嗎?
並不是說作業系統無法直接訪問jvm中分配的記憶體區域,顯然作業系統是可以訪問所有的本機記憶體區域的,但是為什麼對io的操作都需要將jvm記憶體區的資料拷貝到堆外記憶體呢?是因為jvm需要進行GC,如果io裝置直接和jvm堆上的資料進行互動,這個時候jvm進行了GC,那麼有可能會導致沒有被回收的資料進行了壓縮,位置被移動到了連續的儲存區域,這樣會導致正在進行的io操作相關的資料全部亂套,顯然是不合理的,所以對io的操作會將jvm的資料拷貝至堆外記憶體,然後再進行處理,將不會被jvm上GC的操作影響。
問題2:DirectByteBuffer
是相當於固定的核心buffer還是JVM程序內的堆外記憶體?
不管是Java堆還是直接記憶體,都是JVM程序通過malloc
申請的記憶體,其都是使用者空間的記憶體,只不過是JVM程序將這兩塊使用者空間的記憶體用作不同的用處罷了.Java記憶體模型如下:
問題3:將HeapByteBuffer
的資料拷貝到DirectByteBuffer
這一過程是作業系統執行還是JVM執行?
在問題2中已經回答,DirectByteBuffer
是JVM程序申請的使用者空間記憶體,其使用和分配都是由JVM程序管理,因此這一過程是JVM執行的.也正是因為JVM知道堆記憶體會經常GC,資料地址經常移動,而底層通過write,read,pwrite,pread等函式進行系統呼叫時,需要傳入buffer的起始地址和buffer count作為引數,因此JVM在執行讀寫時會做判斷,若是HeapByteBuffer
,就將其拷貝到直接記憶體後再呼叫系統呼叫執行步驟2.
程式碼在sun.nio.ch.IOUtil.write()
和sun.nio.ch.IOUtil.read()
中,我們看下write()
的程式碼:
知乎不能複製,程式碼地址如下:Java NIO direct buffer的優勢在哪兒?,第一個答案中有程式碼.
問題4:在將資料寫到檔案的過程中需要將資料拷貝到核心空間嗎?
需要.在步驟3中,是不能直接將資料從直接記憶體拷貝到檔案中的,需要將資料從直接記憶體->核心空間->檔案,因此使用DirectByteBuffer
代替HeapByteBuffer
也只是減少了資料拷貝的一個步驟,但對效能已經有提升了.
問題5:還有其他減少資料拷貝的方法嗎?
有,我目前知道的有兩種,分別是sendFile
系統呼叫和記憶體對映.
比如想要將資料從磁碟檔案傳送到socket,使用read/write
系統呼叫需要將資料從磁碟檔案->read buffer(核心空間中)->使用者空間->socket buffer(也在核心空間中)->NIC buffer(網絡卡),而使用sendFile
(即FileChannel.transferTo()
)系統呼叫就可減少複製到使用者空間的過程,變為資料從磁碟檔案->read buffer(核心空間中)->socket buffer(也在核心空間中)->NIC buffer(網絡卡),當然,還會有其他的優化手段,詳見什麼是Zero-Copy?
記憶體對映我也不是很清楚,詳見JAVA NIO之淺談記憶體對映檔案原理與DirectMemory
問題6:netty中使用了哪幾種方式實現高效IO?
netty中使用了3種方式實現其zero-copy
機制,如下:
- 使用
DirectByteBuffer
- 使用
FileChannel.transferTo()
- ntty提供了組合Buffer物件,可以聚合多個ByteBuffer物件,使用者可以像操作一個Buffer那樣方便的對組合Buffer進行操作,避免了傳統通過記憶體拷貝的方式將幾個小Buffer合併成一個大的Buffer
NIOServerCnxnFactory中的直接記憶體
/**
* 使用其執行高效的socket I/O,由於I/O由worker thread執行,因此將直接記憶體設定為ThreadLocal的.
* 各連線可以在共享直接記憶體的同時無需擔心併發問題.
* <p>
* We use this buffer to do efficient socket I/O. Because I/O is handled
* by the worker threads (or the selector threads directly, if no worker
* thread pool is created), we can create a fixed set of these to be
* shared by connections.
*/
private static final ThreadLocal<ByteBuffer> directBuffer =
new ThreadLocal<ByteBuffer>() {
@Override
protected ByteBuffer initialValue() {
return ByteBuffer.allocateDirect(directBufferBytes);
}
};
在NIOServerCnxnFactory
中,設定了ThreadLocal
型別的DirectByteBuffer
,其容量由系統屬性zookeeper.nio.directBufferBytes
控制,預設為64K.
原始碼
/**
* 當{@link #sock}可寫時呼叫該方法
*
* @param k {@link #sock}關聯的SelectionKey
*/
void handleWrite(SelectionKey k) throws IOException, CloseRequestException {
if (outgoingBuffers.isEmpty()) {
return;
}
/*
* 嘗試獲取直接記憶體
*/
ByteBuffer directBuffer = NIOServerCnxnFactory.getDirectBuffer();
if (directBuffer == null) {
//不使用直接記憶體
ByteBuffer[] bufferList = new ByteBuffer[outgoingBuffers.size()];
sock.write(outgoingBuffers.toArray(bufferList));
// Remove the buffers that we have sent
ByteBuffer bb;
while ((bb = outgoingBuffers.peek()) != null) {
if (bb == ServerCnxnFactory.closeConn) {
throw new CloseRequestException("close requested");
}
if (bb.remaining() > 0) {
break;
}
packetSent();
outgoingBuffers.remove();
}
} else {
//使用直接記憶體
directBuffer.clear();
for (ByteBuffer b : outgoingBuffers) {
if (directBuffer.remaining() < b.remaining()) {
/*
* 若directBuffer的剩餘可寫空間不足以容納b的所有資料,則修改b的limit為directBuffer的剩餘可寫空間.
* 這樣下面的複製程式碼剛好將directBuffer的可寫空間寫滿
*/
b = (ByteBuffer) b.slice().limit(directBuffer.remaining());
}
/*
* put()會修改b和directBuffer的position值,但是我們不能修改b的position值,
* 因為下文需要position的值將已傳送的資料移出outgoingBuffers,因此在複製結束後重置position值.
*
*/
int p = b.position();
//將b中的資料複製到directBuffer中
directBuffer.put(b);
b.position(p);
if (directBuffer.remaining() == 0) {
break;
}
}
/*
* Do the flip: limit becomes position, position gets set to
* 0. This sets us up for the write.
*/
directBuffer.flip();
//返回傳送的位元組數,下文據此移除已傳送的資料
int sent = sock.write(directBuffer);
ByteBuffer bb;
// 將已傳送的buffers從outgoingBuffers中移除
while ((bb = outgoingBuffers.peek()) != null) {
if (bb == ServerCnxnFactory.closeConn) {
throw new CloseRequestException("close requested");
}
if (sent < bb.remaining()) {
/*
* 只發送了此Buffer的部分資料,因此修改position的值並退出迴圈
*/
bb.position(bb.position() + sent);
break;
}
packetSent();
//該buffer的資料已經全部發送,將buffer從outgoingBuffers中移除
sent -= bb.remaining();
outgoingBuffers.remove();
}
}
}
從程式碼中可以看出,若分配了直接記憶體,則優先使用直接記憶體傳送資料.此外,從outgoingBuffers
中獲取待發送的資料,outgoingBuffers
作用是將構造響應和傳送響應解耦(即處理請求獲取響應和將響應傳送給客戶端兩個操作非同步執行).響應構造成功後就新增至outgoingBuffers
中,當可以傳送資料時,就從outgoingBuffers
中獲取資料傳送.
通過sendBuffer()
將待發送的資料新增至outgoingBuffers
中,很多方法都會呼叫sendBuffer()
,如NIOServerCnxn.sendResponse()
,NIOServerCnxn.sendCloseSession()
,ZookeeperServer.finishSessionInit()
等,其中FinalRequestProcessor
處理完請求後呼叫NIOServerCnxn.sendResponse()
.
/**
* sendBuffer pushes a byte buffer onto the outgoing buffer queue for
* asynchronous writes.
*/
@Override
public void sendBuffer(ByteBuffer bb) {
if (LOG.isTraceEnabled()) {
LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
+ " is valid: " + sk.isValid());
}
outgoingBuffers.add(bb);
requestInterestOpsUpdate();
}
總結
- 粘包拆包問題的解決
- 直接記憶體的使用
- 非同步的思想:在請求處理鏈執行緒中構造響應,在worker thread中傳送響應,執行緒間通過
outgoingBuffers
通訊,將構造響應和傳送響應非同步化