1. 程式人生 > 實用技巧 >網路程式設計-Netty-writeAndFlush方法原理分析 以及 close以後是否還能寫入資料?

網路程式設計-Netty-writeAndFlush方法原理分析 以及 close以後是否還能寫入資料?

前言

在上一講網路程式設計-關閉連線(2)-Java的NIO在關閉socket時,究竟用了哪個系統呼叫函式?中,我們做了個實驗,研究了java nio的close函式究竟呼叫了哪個系統呼叫,答案是close,但在真實的測試程式碼中,其實我犯了一個小錯誤,在close之後並沒有return,所以在測試close之後,還做了writeAndFlush操作傳送了一條資料,並且執行過程並沒有報錯。這件事讓我關注起了close和之後的writeAndFlush之間的關係。為什麼在close之後”看起來“還可以繼續寫入呢?

原始程式碼如下:

@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //寫入本地檔案測試字元,然後關閉channel
        FileWriter fileWriter = new FileWriter("/root/test.txt");
        fileWriter.write("test test hold on");
        fileWriter.flush();
        fileWriter.close();

        //呼叫同步方法關閉
        ChannelFuture sync = ctx.channel().close().sync();
        if(sync.isSuccess()){
            System.out.println("關閉成功!");
        }else{
            System.out.println("關閉失敗!");
        }

        //這裡開始,是誤執行的語句
        this.ctx = ctx;
        //傳送心跳指令
        if (count.intValue() > 150) {
            count.set(1);
        }
        Command0C04 command0C04 = new Command0C04(count.intValue());
        byte[] encode = command0C04.encode();
        logger.info("心跳指令:" + HexStringUtils.toHexString(encode));
        ctx.channel().writeAndFlush(encode).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                System.out.println("success:"+channelFuture.isSuccess());
                System.out.println("cancelled:"+channelFuture.isCancelled());
                System.out.println("done:"+channelFuture.isDone());
                System.out.println("isCancellable:"+channelFuture.isCancellable());
            }
        });
        count.getAndIncrement();
}

我們知道,close系統呼叫會關閉讀和寫兩個方向的操作,那麼writeAndFlush在close之後具體是如何執行的?netty是怎麼確保不會寫入到傳送緩衝區中呢?

想研究清楚這個問題,需要先看writeAndFlush操作做了什麼,涉及到什麼底層的資料結構。

writeAndFlush原理

簡言之,writeAndFlush,在底層會做兩個操作

  • write操作
  • flush操作

首先分析write操作。

write操作

netty底層會維護一個重要的資料結構,ChannelOutboundBuffer,這是一個單向連結串列。我們呼叫寫的方法其實會把資料先快取到這個資料結構中,等呼叫flush之後,就會真正的把資料寫入到傳送緩衝區當中。

ChannelOutBoundBuffer中有以下幾個重要的指標:

  • Entry代表了我們傳送的資料
  • flushedEntry代表需要寫入到傳送緩衝區的第一個Entry
  • unflushedEntry代表第一個等待寫入傳送緩衝區的Entry

當第一次呼叫addMessage方法往ChannelOutBoundBuffer中新增資料時

第二次呼叫addMessage方法時,資料指標如下

如果不呼叫Flush,那麼flushedEntry指標一直為null,資料會一直寫入到後面的連結串列中。

Flush操作

當呼叫Flush操作後,指標情況如圖:

之後的程式碼,就是遍歷這段節點資料,寫入到傳送緩衝區中,並且寫入後釋放節點記憶體。

判斷緩衝區是否可寫(小知識)

在實際flush之前,netty呼叫isFlushPending判斷,這個channel是否註冊了可寫事件,如果有可寫事件就等會再發送。如果沒有,就會呼叫父類的flush0方法直接寫。

  • 注:如果到達傳送緩衝區的水位線了,傳送緩衝區本身就不可寫了,這個時候會(XX會)註冊一個可寫事件到selector中,netty就是使用這個可寫判斷是否可以真正的傳送。

protected final void flush0() {
    if (!isFlushPending()) {
        super.flush0();
    }
}


private boolean isFlushPending() {
    SelectionKey selectionKey = selectionKey();
    return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}

OOM?

如果接收端消費速度很慢,接收緩衝區滿了以後,會導致傳送緩衝區無法繼續傳送資料,在一直髮送資料的前提下,ChannelOutboundBuffer會一直上漲,可能會引起OOM問題。

Netty官方提供了兩個ChannelOutBoundBuffer配置引數、一個Channel屬性和一個使用者回撥方法來幫助我們識別和解決這件事。

兩個ChannelOutBoundBuffer配置引數:

  • Channel.config().setWriteBufferHighWaterMark:高水位,預設64 kb

  • Channel.config().setWriteBufferLowWaterMark :低水位:預設32 kb

一個Channel屬性:isWritable

一個使用者回撥方法:fireChannelWritabilityChanged

內部邏輯如下:

  • 當本次需要新增到ChannelOutBoundBuffer的資料量超過了高水位,會改變isWritable對應的屬性值從0變為1,並且觸發一個ChannelWritabilityChanged事件。
  • 當flush或者remove後,如果資料恢復到最低水位下了,會改變isWritable對應的屬性值從1變為0,並且觸發一個ChannelWritabilityChanged事件。

使用者可以通過屬性和回撥方法來檢查是否可寫,做相關的業務處理。

writeAndFlush總結

在呼叫寫入方法後,netty並不會直接把資料寫入到傳送緩衝區中,而是儲存在了ChannelOutboundBuffer中,等到呼叫flush操作後,再把資料真正寫入Socket的傳送緩衝區中。

close以後是否還能寫入資料?

跟蹤close原始碼,最後會跟蹤到io.netty.channel.AbstractChannel 的內部類 AbstractUnsafe中的close方法,方法程式碼如下(部分程式碼省略,只保留這個問題相關的核心程式碼):

private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify) {

            final boolean wasActive = isActive();
            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
        }

可以看到,這裡有一句this.outboundBuffer = null; 相當於把上文分析的ChannelOutboundBuffer置空。

結合同在AbstractUnsafe中的write程式碼中的這一部分來看(同樣省略了非問題關注的程式碼)

 @Override
        public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                // If the outboundBuffer is null we know the channel was closed and so
                // need to fail the future right away. If it is not null the handling of the rest
                // will be done in flush0()
                // See https://github.com/netty/netty/issues/2362
                safeSetFailure(promise, newWriteException(initialCloseCause));
                // release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);
                return;
            }
}

在write之前,會做判斷,如果如果ChannelOutboundBuffer為空為空,那麼釋放記憶體,不傳送資料並返回。

總結

首先我們瞭解了,在傳送過程中比較重要的資料結構ChannelOutboundBuffer,然後我們瞭解了在close的時候,會把如果ChannelOutboundBuffer置空,並且在write的時候,會判斷該buffer是否為空,為空則不傳送,並設定失敗,到此我們的問題就研究明白了。