Flink執行時之基於Netty的網路通訊(下)
客戶端核心處理器
這一篇,我們分析一下客戶端協議棧中的核心的處理器PartitionRequestClientHandler,該處理器用於處理服務端的響應訊息。
我們以客戶端獲取到響應之後回撥該處理器的channelRead方法為入口來進行分析:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
//當沒有待解析的原始訊息時,直接解碼訊息,否則將訊息加入到stagedMessages佇列中,等待排隊處理
if (!bufferListener.hasStagedBufferOrEvent() && stagedMessages.isEmpty()) {
decodeMsg(msg);
}
else {
stagedMessages.add(msg);
}
}
catch (Throwable t) {
notifyAllChannelsOfErrorAndClose(t);
}
}
這裡涉及到兩個物件,首先是bufferListener,用於感知可用Buffer的事件偵聽器,它是內部實現的BufferListenerTask型別。其次是stagedMessages,用於接收原始未解碼訊息的佇列。
解碼方法decodeMsg的主要邏輯包含對兩種型別訊息的解析。一種是服務端的錯誤響應訊息ErrorResponse,另一種是正常的Buffer請求響應訊息BufferResponse。對於錯誤響應訊息會判斷是否是致命錯誤,如果是致命錯誤,則直接通知所有的InputChannel並關閉它們;如果不是,則讓該訊息對應的InputChannel按不同情況處理。我們重點關注對BufferResponse的處理:
if (msgClazz == NettyMessage.BufferResponse.class) {
NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;
//根據響應訊息裡的receiverId,從註冊map裡獲取到接收該訊息的RemoteInputChannel例項
RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
//如果該響應沒有對應的接收者,則釋放該Buffer,同時通知服務端取消該請求
if (inputChannel == null) {
bufferOrEvent.releaseBuffer();
cancelRequestFor(bufferOrEvent.receiverId);
return true;
}
//接下來才進入到真正的解析邏輯
return decodeBufferOrEvent(inputChannel, bufferOrEvent);
}
在decodeBufferOrEvent中,它會對該訊息具體是Buffer還是Event進行區分,如果是Buffer:
if (bufferOrEvent.isBuffer()) {
//空Buffer
if (bufferOrEvent.getSize() == 0) {
inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber);
return true;
}
//獲得Buffer提供者,如果為空,則通知服務端取消請求
BufferProvider bufferProvider = inputChannel.getBufferProvider();
if (bufferProvider == null) {
cancelRequestFor(bufferOrEvent.receiverId);
return false;
}
while (true) {
//從Buffer提供者請求Buffer,以放置響應結果資料
Buffer buffer = bufferProvider.requestBuffer();
//如果請求到Buffer,則讀取資料同時觸發InputChannel的onBuffer回撥
//該方法在前文分析輸入通道時我們早已提及過,它會將Buffer加入到佇列中
if (buffer != null) {
buffer.setSize(bufferOrEvent.getSize());
bufferOrEvent.getNettyBuffer().readBytes(buffer.getNioBuffer());
inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber);
return true;
}
//否則進入等待模式,當有Buffer可用時,會觸發bufferListener的onEvent方法
else if (bufferListener.waitForBuffer(bufferProvider, bufferOrEvent)) {
releaseNettyBuffer = false;
return false;
}
else if (bufferProvider.isDestroyed()) {
return false;
}
}
}
如果從Buffer提供者沒有獲取到Buffer,說明當前沒有可用的Buffer資源了,那麼將進入等待模式。這裡等待Buffer可用是基於事件偵聽機制,這個機制是如何實現的呢?在上面的waitForBuffer方法的實現中,通過將當前的BufferListenerTask的bufferListener例項反向註冊到Buffer提供者,當Buffer提供者中有Buffer可用時,將會觸發bufferListener的onEvent回撥方法。這裡需要注意的是,當Buffer提供者中的Buffer從無到有,說明有Buffer被回收了,所以onEvent方法是被回收Buffer的執行緒所呼叫,而非Netty的I/O執行緒。
到此,我們才獲取到可用的Buffer並讀取了響應訊息的原始資料,但資料還沒有被解碼。是不是解碼的過程也發生在onEvent方法中呢?其實不然,在onEvent方法裡,它將對原始訊息的處理權交還給了Netty的I/O執行緒:
if (buffer != null) {
if (availableBuffer.compareAndSet(null, buffer)) {
ctx.channel().eventLoop().execute(this);
success = true;
}
else {
throw new IllegalStateException("Received a buffer notification, " +
" but the previous one has not been handled yet.");
}
}
程式碼段中會通過上下文物件獲取到Channel所處的EventLoop,然後通過它的execute方法接收一個Runnable例項並在新執行緒執行。這裡接收的this就是當前的bufferListener例項(因為BufferListenerTask也實現了Runnable介面)。所以在BufferListenerTask的onEvent方法中其實存在著一個執行緒執行的橋接過程。
以上就是NettyClient接收到NettyServer的響應後的處理器邏輯。由於Buffer資源受限,這裡並沒有直接將原始訊息直接交與Netty的I/O執行緒並寫到Buffer中,而是採取了佇列快取原始訊息外加Buffer可用事件通知的機制來進行處理。
服務端核心處理器
服務端有兩個核心處理器,分別是PartitionRequestServerHandler和PartitionRequestQueue。其中,PartitionRequestServerHandler會依賴PartitionRequestQueue的例項。
我們先來看PartitionRequestServerHandler,它是一種通道流入處理器(ChannelInboundHandler),主要用於初始化資料傳輸同時分發事件。
首先,PartitionRequestServerHandler會在Channel啟動時建立一個容量至少為1的BufferPool。當然最關鍵的方法還是訊息的處理方法channelRead0。
Netty提供了一個簡化版的ChannelInboundHandler的實現,名為SimpleChannelInboundHandler。通過繼承這個類,你可以非常方便得專注於實現自己的業務邏輯。因此,SimpleChannelInboundHandler類已經對ChannelInboundHandler的channelRead介面方法提供了基礎實現,然後提供了名為channelRead0的抽象方法供派生類擴充套件。
從channelRead0方法的實現來看,客戶端的請求訊息被劃分為三類:
- 常規的結果分割槽請求;
- 任務事件請求;
- 其他請求;
我們分別來看針對這三類請求訊息的處理邏輯,首先是常規的結果分割槽請求:
if (msgClazz == PartitionRequest.class) {
PartitionRequest request = (PartitionRequest) msg;
try {
//構建結果子分割槽檢視物件,並將其“加入佇列”
ResultSubpartitionView subpartition =
partitionProvider.createSubpartitionView(
request.partitionId,
request.queueIndex,
bufferPool);
outboundQueue.enqueue(subpartition, request.receiverId);
}
catch (PartitionNotFoundException notFound) {
respondWithError(ctx, notFound, request.receiverId);
}
}
程式碼段中的outboundQueue是PartitionRequestQueue的例項,這裡注意不要被其類名誤導,它本身並不是一個佇列資料結構的實現,但它內部的處理機制確實藉助了佇列結構來排隊請求。outboundQueue同時也是在協議棧中緊隨著PartitionRequestServerHandler的流入處理器PartitionRequestQueue的例項,這一點下文還會提到。
接著是任務事件請求:
else if (msgClazz == TaskEventRequest.class) {
TaskEventRequest request = (TaskEventRequest) msg;
//針對事件請求,將會通過任務事件分發器進行分發,如果分發失敗,將會以錯誤訊息予以響應
if (!taskEventDispatcher.publish(request.partitionId, request.event)) {
respondWithError(ctx, new IllegalArgumentException("Task event receiver not found."),
request.receiverId);
}
}
什麼情況下會導致事件分發失敗呢?當事件分發時根據其partitionId如果找不到對應的偵聽者時,就會認為事件分發失敗。
除了上面兩種請求之外的其他請求:
//如果是取消請求,則呼叫佇列的取消方法
else if (msgClazz == CancelPartitionRequest.class) {
CancelPartitionRequest request = (CancelPartitionRequest) msg;
outboundQueue.cancel(request.receiverId);
}
//如果是關閉請求,則關閉佇列
else if (msgClazz == CloseRequest.class) {
outboundQueue.close();
}
else {
LOG.warn("Received unexpected client request: {}", msg);
}
從上面的程式碼段可見,PartitionRequestServerHandler主要起到訊息分發的作用。因此我們會重點分析訊息的處理者PartitionRequestQueue。
我們首先分析一下PartitionRequestServerHandler在處理訊息時呼叫的PartitionRequestQueue的例項方法enqueue和cancel起到了什麼作用。enqueue方法的實現如下:
public void enqueue(ResultSubpartitionView partitionQueue, InputChannelID receiverId) throws Exception {
ctx.pipeline().fireUserEventTriggered(new SequenceNumberingSubpartitionView(partitionQueue, receiverId));
}
可以看到它把原先的ResultSubpartitionView包裝為SequenceNumberingSubpartitionView。然後呼叫fireUserEventTriggered來觸發管道中的下一個ChannelInboundHandler的userEventTriggered方法。
SequenceNumberingSubpartitionView是什麼?它是PartitionRequestQueue內部實現的一個ResultSubpartitionView的包裝器。該包裝器對原始的ResultSubpartitionView做了兩件事:對每個即將返回的Buffer累加序列號同時儲存相應的接收者(InputChannel)編號。
Buffer的序列號主要用於跟客戶端校驗消費Buffer的過程是否跟服務端的處理過程保持一致,這主要用於防止Buffer丟失。
那麼下一個ChannelInboundHandler是誰呢?我們先回顧一下,在PartitionRequestProtocol協議中所組建的管道中的處理器的順序:
public ChannelHandler[] getServerChannelHandlers() {
PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue();
PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
partitionProvider, taskEventDispatcher, queueOfPartitionQueues, networkbufferPool);
return new ChannelHandler[] {
messageEncoder,
createFrameLengthDecoder(),
messageDecoder,
serverHandler,
queueOfPartitionQueues
};
}
從上面的程式碼可見,queueOfPartitionQueues這一例項既作為引數傳入PartitionRequestServerHandler的構造器又在ChannelHandler陣列中充當處理器。而此處的queueOfPartitionQueues跟PartitionRequestServerHandler中的outboundQueue指向同一個物件。而因為enqueue方法的呼叫者是PartitionRequestServerHandler的例項方法,所以,下一個ChannelInboundHandler的例項其實就是這裡的outboundQueue本身。
所以,fireUserEventTriggered方法的呼叫,將會觸發同一個PartitionRequestQueue例項的userEventTriggered方法。在userEventTriggered方法的實現中,也是按照不同的訊息型別來區分處理的。首先當然是SequenceNumberingSubpartitionView型別:
if (msg.getClass() == SequenceNumberingSubpartitionView.class) {
boolean triggerWrite = queue.isEmpty();
//將訊息強制轉型並加入佇列
queue.add((SequenceNumberingSubpartitionView) msg);
//如果佇列在訊息加入前是空的,則說明可以響應訊息給客戶端了
if (triggerWrite) {
writeAndFlushNextMessageIfPossible(ctx.channel());
}
}
看完了enqueue方法,下面我們來看cancel如何實現:
public void cancel(InputChannelID receiverId) {
ctx.pipeline().fireUserEventTriggered(receiverId);
}
該呼叫對應了userEventTriggered中的另一段處理邏輯:
else if (msg.getClass() == InputChannelID.class) {
InputChannelID toCancel = (InputChannelID) msg;
//如果當前InputChannelID已包含在釋放過的集合中,那麼直接返回
if (released.contains(toCancel)) {
return;
}
//如果當前的結果子分割槽檢視不為空且其接收者編號跟當前待取消的編號相等,則釋放相關資源,並將該編號加入已釋放集合
if (currentPartitionQueue != null && currentPartitionQueue.getReceiverId().equals(toCancel)) {
currentPartitionQueue.releaseAllResources();
markAsReleased(currentPartitionQueue.receiverId);
currentPartitionQueue = null;
}
else {
int size = queue.size();
//遍歷佇列,將接收者編號跟當前準備取消的InputChannelID進行比較,
//如果相等則對檢視的相關資源進行釋放同時將編號加入已釋放集合
for (int i = 0; i < size; i++) {
SequenceNumberingSubpartitionView curr = queue.poll();
if (curr.getReceiverId().equals(toCancel)) {
curr.releaseAllResources();
markAsReleased(curr.receiverId);
}
else {
queue.add(curr);
}
}
}
}
接下來,我們來分析一下處理器輸出響應訊息的writeAndFlushNextMessageIfPossible方法。在分析該方法的實現之前,我們先看一下,該方法何時會觸發?當前在PartitionRequestQueue中該方法共有三個呼叫點。
第一個呼叫點位於ChannelInboundHandler的channelWritabilityChanged事件回撥方法中。
channelWritabilityChanged方法是ChannelInboundHandler的介面方法,當Channel的可寫狀態發生改變時會被呼叫。Channel的isWritable()方法可以用來檢測其可寫性。可寫性的閾值範圍可以通過Channel.config().setWriteHighWaterMark()以及Channel.config().setWriteLowWaterMark()進行設定。
第二個呼叫點位於userEventTriggered回撥方法中,這在我們上文分析該方法時已經提及過。
第三個呼叫點處於PartitionRequestQueue內部對ChannelFutureListener介面的實現類WriteAndFlushNextMessageIfPossibleListener中。
ChannelFutureListener用於註冊到ChannelFuture中,當I/O操作完成之後,會觸發對其方法operationComplete的呼叫。
而WriteAndFlushNextMessageIfPossibleListener的實現,就是在其operationComplete方法中觸發了對writeAndFlushNextMessageIfPossible方法的呼叫。那麼WriteAndFlushNextMessageIfPossibleListener何時會被註冊到ChannelFuture呢,畢竟不註冊是不會觸發operationComplete的。而註冊點正好位於writeAndFlushNextMessageIfPossible的實現中。
現在,我們就來分析該方法的實現,其核心程式碼段如下:
//如果channel的狀態為可寫才會繼續執行如下邏輯
if (channel.isWritable()) {
while (true) {
//如果當前結果子分割槽檢視為空,同時佇列裡也沒有待處理的記錄了,則退出迴圈
if (currentPartitionQueue == null && (currentPartitionQueue = queue.poll()) == null) {
return;
}
//從結果子分割槽檢視獲得待響應的原始資料
buffer = currentPartitionQueue.getNextBuffer();
//如果為null,則不做響應,繼續迴圈處理佇列中的記錄
if (buffer == null) {
if (currentPartitionQueue.registerListener(null)) {
currentPartitionQueue = null;
}
else if (currentPartitionQueue.isReleased()) {
markAsReleased(currentPartitionQueue.getReceiverId());
Throwable cause = currentPartitionQueue.getFailureCause();
if (cause != null) {
ctx.writeAndFlush(new NettyMessage.ErrorResponse(
new ProducerFailedException(cause),
currentPartitionQueue.receiverId));
}
currentPartitionQueue = null;
}
}
//buffer不為null,給予客戶端響應
else {
//構建出最終的響應物件,這裡就能看出,為什麼要實現SequenceNumberingSubpartitionView這一包裝器了
//因為這裡用到了sequenceNumber以及receiverId
BufferResponse resp = new BufferResponse(buffer, currentPartitionQueue.getSequenceNumber(),
currentPartitionQueue.getReceiverId());
//如果該Buffer並不是資料,而是表示子分割槽消費結束的事件,則會進行特殊的處理
if (!buffer.isBuffer() &&
EventSerializer.fromBuffer(buffer, getClass().getClassLoader()).getClass() ==
EndOfPartitionEvent.class) {
//通知子分割槽消費完成,並釋放相關資源
currentPartitionQueue.notifySubpartitionConsumed();
currentPartitionQueue.releaseAllResources();
markAsReleased(currentPartitionQueue.getReceiverId());
currentPartitionQueue = null;
}
//將響應物件寫入網路準備傳送給請求客戶端,這裡就是第三個呼叫點中註冊ChannelFutureListener的位置了
//等到Netty的I/O執行緒處理完成後,將會觸發writeAndFlushNextMessageIfPossible被再次呼叫
//從而形成了處理資料與註冊回撥之間的迴圈
channel.writeAndFlush(resp).addListener(writeListener);
return;
}
}
}
以上就是PartitionRequestQueue的核心邏輯,它自身不是佇列結構的實現,但是它內部採用佇列來對用於響應資料的ResultSubpartitionView進行緩衝,從而保證了服務端的響應速度處於合適的範圍。
微信掃碼關注公眾號:Apache_Flink
QQ掃碼關注QQ群:Apache Flink學習交流群(123414680)