深入淺出Netty:write
上一章節中,分析了Netty如何處理read事件,本節分析Netty如何把資料寫會客戶端。
把資料返回客戶端,需要經歷三個步驟:
- 1、申請一塊快取buf,寫入資料。
- 2、將buf儲存到ChannelOutboundBuffer中。
- 3、將ChannelOutboundBuffer中的buff輸出到socketChannel中。
1234567891011121314151617 | publicvoidchannelRead(ChannelHandlerContext ctx,Objectmsg)throwsException{ReferenceCountUtil.release(msg);ByteBuf buf1=ctx.alloc().buffer(4);buf1.writeInt(1);ByteBuf buf2=ctx.alloc().buffer(4);buf2.writeInt(2);ByteBuf buf3=ctx.alloc |
為什麼需要把buf儲存到ChannelOutboundBuffer?
ctx.write()實現:
1234567891011121314151617181920212223 | //AbstractChannelHandlerContext.javapublicChannelFuture write(Objectmsg){returnwrite(msg,newPromise());}privatevoidwrite(Objectmsg,booleanflush,ChannelPromise promise){AbstractChannelHandlerContext next=findContextOutbound();EventExecutor executor=next.executor();if(executor.inEventLoop()){next.invokeWrite(msg,promise);if(flush){next.invokeFlush();}}else{AbstractWriteTask task;if(flush){task=WriteAndFlushTask.newInstance(next,msg,promise);}else{task=WriteTask.newInstance(next,msg,promise);}safeExecute(executor,task,promise,msg);}} |
預設情況下,findContextOutbound()會找到pipeline的head節點,觸發write方法。
1234567891011121314151617181920212223242526272829 | //HeadContext.javapublicvoidwrite(ChannelHandlerContext ctx,Objectmsg,ChannelPromise promise)throwsException{unsafe.write(msg,promise);}//AbstractUnsafepublicfinalvoidwrite(Objectmsg,ChannelPromise promise){ChannelOutboundBuffer outboundBuffer=this.outboundBuffer;if(outboundBuffer==null){safeSetFailure(promise,CLOSED_CHANNEL_EXCEPTION);ReferenceCountUtil.release(msg);return;}intsize;try{msg=filterOutboundMessage(msg);size=estimatorHandle().size(msg);if(size<0){size=0;}}catch(Throwablet){safeSetFailure(promise,t);ReferenceCountUtil.release(msg);return;}outboundBuffer.addMessage(msg,size,promise);} |
outboundBuffer 隨著Unsafe一起例項化,最終將msg通過outboundBuffer封裝起來。
ChannelOutboundBuffer內部維護了一個Entry連結串列,並使用Entry封裝msg。
1、unflushedEntry:指向連結串列頭部
2、tailEntry:指向連結串列尾部
3、totalPendingSize:儲存msg的位元組數
4、unwritable:不可寫標識
123456789101112131415161718 | publicvoidaddMessage(Objectmsg,intsize,ChannelPromise promise){Entry entry=Entry.newInstance(msg,size,total(msg),promise);if(tailEntry==null){flushedEntry=null;tailEntry=entry;}else{Entry tail=tailEntry;tail.next=entry;tailEntry=entry;}if(unflushedEntry==null){unflushedEntry=entry;}// increment pending bytes after adding message to the unflushed arrays.// See https://github.com/netty/netty/issues/1619incrementPendingOutboundBytes(size,false);} |
通過Entry.newInstance返回Entry例項,Netty對Entry採用了快取策略,使用完的Entry例項需要清空並回收,難道是因為Entry例項化比較耗時?
新的entry預設插入連結串列尾部,並讓tailEntry指向它。
123456789 | privatevoidincrementPendingOutboundBytes(longsize,booleaninvokeLater){if(size==0){return;}longnewWriteBufferSize=TOTAL_PENDING_SIZE_UPDATER.addAndGet(this,size);if(newWriteBufferSize>=channel.config().getWriteBufferHighWaterMark()){setUnwritable(invokeLater);}} |
方法incrementPendingOutboundBytes主要採用CAS更新totalPendingSize欄位,並判斷當前totalPendingSize是否超過閾值writeBufferHighWaterMark,預設是65536。如果totalPendingSize >= 65536,則採用CAS更新unwritable為1,並觸發ChannelWritabilityChanged事件。
到此為止,全部的buf資料已經儲存在outboundBuffer中。
ctx.flush()實現:
1234567891011121314151617181920 | publicChannelHandlerContext flush(){finalAbstractChannelHandlerContext next=findContextOutbound();EventExecutor executor=next.executor();if(executor.inEventLoop()){next.invokeFlush();}else{Runnable task=next.invokeFlushTask;if(task==null){next.invokeFlushTask=task=newRunnable(){@Overridepublicvoidrun(){next.invokeFlush();}};}safeExecute(executor,task,channel().voidPromise(),null);}returnthis;} |
預設情況下,findContextOutbound()會找到pipeline的head節點,觸發flush方法。
123456789101112131415 | //HeadContext.javapublicvoidflush(ChannelHandlerContext ctx)throwsException{unsafe.flush();}//AbstractUnsafepublicfinalvoidflush(){assertEventLoop();ChannelOutboundBuffer outboundBuffer=this.outboundBuffer;if(outboundBuffer==null){return;}outboundBuffer.addFlush();flush0();} |
方法addFlush主要對write過程新增的msg進行flush標識,其實我不清楚,這個標識過程有什麼意義。
直接看flush0方法:
1234567891011121314 | protectedfinalvoidflush0(){// Flush immediately only when there's no pending flush.// If there's a pending flush operation, event loop will call forceFlush() later,// and thus there's no need to call it now.if(isFlushPending()){return;}super.flush0();}privatebooleanisFlushPending(){SelectionKey selectionKey=selectionKey();returnselectionKey.isValid()&&(selectionKey.interestOps()&SelectionKey.OP_WRITE)!=0;} |
1、如果當前selectionKey 是寫事件,說明有執行緒執行flush過程,則直接返回。
2、否則直接執行flush操作。