Kafka原始碼剖析 —— 網路I/O篇 —— 淺析KafkaChannel、NetworkReceive、Send
一、SocketChannel和KafkaChannel有什麼區別?
上篇文章說道KafkaSelector在建立一個連線的時候和普通的nioSelector並沒有什麼不同,它是基於nioSelector的封裝。我們知道建立連線的一系列操作都是由Channel去完成,而KafkaChannel實際上就是對它的進一步封裝:
KafkaChannel不僅封裝了SocketChannel,還封裝了Kafka自己的認證器Authenticator,和讀寫相關的NetworkReceive、Send。NetworkReceive和Send的底層都是通過ByteBuffer來實現的。
二、KafkaChannel的建立
實際上基本等同於KafkaSelector的建立:
按照普通的方式建立完通道後,將其註冊到NioSelector上,並關注OP_CONNECT,再以節點Id,SelectionKey來建立KafkaChannel,這裡先不詳細說明KafkaChannel,它是對通道的進一步封裝。在建立完KafkaChannel後,將KafkaChannel與SelectionKey、節點ID做進一步繫結。
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);// 將當前這個socketChannel註冊到nioSelector上,並關注OP_CONNECT事件
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);// 建立KafkaChannel
key.attach(channel);// 將channel繫結到key上
this.channels.put(id, channel);// 將 nodeId 和 Channel繫結
這樣有一個好處,首先KafkaChannel中包含了節點ID與SelectionKey,而我們也可以根據節點ID來拿到KafkaChannel,同樣可以根據SelectionKey來拿到KafkaChannel,這就意味著,我們只要拿到了KafkaChannel、SelectionKey、節點ID中的任意一個,都可以通過這些引用關係拿到彼此
三、預傳送
實際上就是將要傳送的ByteBuffer扔進KafkaChannel,此時並未進行IO操作,這裡的Send物件,實際上就是對ByteBuffer的進一步封裝,它主要包含了將要發往的節點ID、ByteBuffer大小、是否傳送完畢等資訊。我們這裡根據節點ID,從我們剛才的channels中,取出KafkaChannel。
public void send(Send send) {
// 看看send要發的這個nodeId在不在
KafkaChannel channel = channelOrFail(send.destination());
try {
// 把資料扔進KafkaChannel中(只能放一個,放多個會報錯),並關注write事件
channel.setSend(send);
} catch (CancelledKeyException e) {
// 失敗了加一條node_id的失敗記錄
this.failedSends.add(send.destination());
close(channel);
}
}
這個KafkaChannel的setSend方法實際上非常簡單,就是將要傳送的send物件的引用交給KafkaChannel中的send。並且使這個channel的SelectionKey去關注OP_WRITE事件。
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
四、nio中的io操作
在上篇文章裡,我們知道KafkaSelector也是通過輪詢器去進行IO操作,看一下原始的nioSelector是如何進行io操作的:
public class NioEchoServer {
private static final int BUF_SIZE = 256;
private static final int TIMEOUT = 3000;
public static void main(String args[]) throws Exception {
// 開啟服務端 Socket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 開啟 Selector
Selector selector = Selector.open();
// 服務端 Socket 監聽8080埠, 並配置為非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
// 將 channel 註冊到 selector 中.
// 通常我們都是先註冊一個 OP_ACCEPT 事件, 然後在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ
// 註冊到 Selector 中.
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 通過呼叫 select 方法, 阻塞地等待 channel I/O 可操作
if (selector.select(TIMEOUT) == 0) {
System.out.print(".");
continue;
}
// 獲取 I/O 操作就緒的 SelectionKey, 通過 SelectionKey 可以知道哪些 Channel 的哪類 I/O 操作已經就緒.
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
// 當獲取一個 SelectionKey 後, 就要將它刪除, 表示我們已經對這個 IO 事件進行了處理.
keyIterator.remove();
if (key.isAcceptable()) {
// 當 OP_ACCEPT 事件到來時, 我們就有從 ServerSocketChannel 中獲取一個 SocketChannel,
// 代表客戶端的連線
// 注意, 在 OP_ACCEPT 事件中, 從 key.channel() 返回的 Channel 是 ServerSocketChannel.
// 而在 OP_WRITE 和 OP_READ 中, 從 key.channel() 返回的是 SocketChannel.
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
//在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ 註冊到 Selector 中.
// 注意, 這裡我們如果沒有設定 OP_READ 的話, 即 interest set 仍然是 OP_CONNECT 的話, 那麼 select 方法會一直直接返回.
clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
}
if (key.isReadable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buf = (ByteBuffer) key.attachment();
long bytesRead = clientChannel.read(buf);
if (bytesRead == -1) {
clientChannel.close();
} else if (bytesRead > 0) {
key.interestOps(OP_READ | SelectionKey.OP_WRITE);
System.out.println("Get data length: " + bytesRead);
}
}
if (key.isValid() && key.isWritable()) {
ByteBuffer buf = (ByteBuffer) key.attachment();
buf.flip();
SocketChannel clientChannel = (SocketChannel) key.channel();
clientChannel.write(buf);
if (!buf.hasRemaining()) {
key.interestOps(OP_READ);
}
buf.compact();
}
}
}
}
}
五、kafkaChannel 如何進行io操作?
1、讀操作
首先,進行是否可以開始讀操作的判斷。1、channel.ready(),這裡做了兩個判斷,一個是Kafka的認證器是否認證通過,另一個則是是否握手成功。2、key.isReadable(),selectionKey是否關注了OP_READ。3、!hasStagedReceive(channel),判斷該channel是否在hasStagedReceive這個map裡面,如果該channel正在讀,那麼它會在這個map裡面,直到讀取完成。
// channel是否已經準備好從連線中讀取任何可讀資料
/* if channel is ready read from any connections that have readable data */
if (channel.ready() // 連線的三次握手完成,並且 todo 許可權驗證通過
&& key.isReadable() // key是否關注了read事件
&& !hasStagedReceive(channel)) {// todo 這個通道不能是正在讀資料的,因為在讀的時候,會把這個channel扔進stagedReceives裡面
NetworkReceive networkReceive;
/**
* 實際上這裡就是分多次去一個channel取資料,直到取完,並將其儲存在key:channel value:new ArrayDeque<NetworkReceive> 中
*/
while ((networkReceive = channel.read()) != null) {
// 將多次接收的資料放進stagedReceives下channel的Deque裡面
addToStagedReceives(channel, networkReceive);
}
}
剩下的channel.read()就比較簡單了,KafkaChannel裡面封裝了一個NetworkReceives,而NetworkReceives主要就是對ByteBuffer的封裝。
我們將該NioChannel傳入,呼叫channel.read(size)方法,這個size,其實就是一個ByteBuffer,它是kafka協議中用來判斷包體有多長的包頭。
第一步,先判斷byteBuffer(size)中是否還有剩餘空間
第二步,從nioChannel中將資料讀到byteBuffer中
第三步,判斷byteBuffer是不是裝滿了
第四步,如果裝滿了,證明size這個bytebuffer已經拿到了包體的長度,呼叫readInt獲取其capacity,再用這個capacity去申請一個用於接收包體的byteBuffer(buffer)。
第五步,正式地將channel中的資料中讀取到byteBuffer(buffer)
public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
int read = 0;
if (size.hasRemaining()) {
int bytesRead = channel.read(size);
if (bytesRead < 0) {
throw new EOFException();
}
read += bytesRead;
if (!size.hasRemaining()) {
size.rewind();
int receiveSize = size.getInt();
if (receiveSize < 0) {
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
}
if (maxSize != UNLIMITED && receiveSize > maxSize) {
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
}
this.buffer = ByteBuffer.allocate(receiveSize);
}
}
if (buffer != null) {
int bytesRead = channel.read(buffer);
if (bytesRead < 0) {
throw new EOFException();
}
read += bytesRead;
}
return read;
}
讀取完成之後,再做一下校驗:就會返回了,也就是上面while ((networkReceive = channel.read()) != null)拿到的這個networkReceives,裡面裝著包頭和包體。這裡Kafka有一個小操作,就是將kafkaChannel內的networkReceive的引用賦值給外面的這個networkReceive後,會將kafkaChannel內的networkReceive的引用置為空。
/**
* 接收資料,將資料儲存在 NetworkReceive
*/
public NetworkReceive read() throws IOException {
NetworkReceive result = null;
if (receive == null) {
receive = new NetworkReceive(maxReceiveSize, id);
}
receive(receive);// 這個方法就是上面說了一大堆第一步第二步第三步的那個方法。
if (receive.complete()) {
receive.payload()
.rewind();
result = receive;
receive = null;
}
return result;
}
2、寫操作
寫操作要比讀操作更加簡單,上面有一個預傳送操作,就是將要send的物件Send
/**
* 傳送時其實也有一次沒傳送完的情況,每傳送完的話,就不會出現在completedSends裡面
*/
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
// 如果channel已經ready 並且 我們有資料來準備好寫sockets
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
// 這裡會將KafkaChannel的send欄位傳送出去,
// 如果未完成傳送,或者沒發完,則返回null
// 傳送成功則返回send物件
if (send != null) {
this.completedSends.add(send);// 新增到completedSends集合
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
主要的傳送方法就是channel.write();
public Send write() throws IOException {
Send result = null;
if (send != null && send(send)) {
result = send;
send = null;
}
return result;
}
而write方法中最核心的方法則是send(send),這個send物件也是一個byteBuffer物件。底層中的底層還是呼叫了channel.write(byteBuffer方法)
@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
long written = channel.write(buffers);
if (written < 0) {
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
}
remaining -= written;
// This is temporary workaround. As Send , Receive interfaces are being used by BlockingChannel.
// Once BlockingChannel is removed we can make Send, Receive to work with transportLayer rather than
// GatheringByteChannel or ScatteringByteChannel.
// 這是一個臨時工作區,當傳送時,接收資料的介面一直被BlockingChannel使用著。
// 一旦BlockingChannel 被移除,我們就可以開始我們的傳送操作,接收通過 transportLayer 來工作而不是 GatheringByteChannel 或 ScatteringByteChannel
if (channel instanceof TransportLayer) {
pending = ((TransportLayer) channel).hasPendingWrites();
}
return written;
}
參考:
Java NIO 的前生今世 之四 NIO Selector 詳解
《Apache Kafka 原始碼剖析》 - 徐郡明著
Apache Kafka 原始碼 0.10.0.1