1. 程式人生 > >Netty4.0原始碼解析:NioServerSocketChannel

Netty4.0原始碼解析:NioServerSocketChannel

一、引言

Netty的Channel在JDK NIO的Channel基礎上做了一層封裝,提供了更多的功能。Netty的中的Channel實現類主要有:NioServerSocketChannel(用於服務端非阻塞地接收TCP連線)、NioSocketChannel(用於維持非阻塞的TCP連線)、NioDatagramChannel(用於非阻塞地處理UDP連線)、OioServerSocketChannel(用於服務端阻塞地接收TCP連線)、OioSocketChannel(用於阻塞地接收TCP連線)、OioDatagramChannel(用於阻塞地處理UDP連線):
在這裡插入圖片描述

一個EventLoop一般持有多個Channel,每個EventLoop持有一個對應的執行緒,有這個執行緒負責處理這些Channel發出的事件。

本篇文章我們先從服務端的NioServerSocketChannel開始分析:

二、初始化過程

NioServerSocketChannel在JDK的ServerSocketChannel的基礎上做了一層封裝。在NioServerSocketChannel的初始化過程中,可以簡要地分為三步:
1、例項化NioServerSocketChannel
2、完成NioServerSocketChannel的管道初始化的第一步驟(向管道新增初始的ChannelHandler)
3、將NioServerSocketChannel註冊到NioEventLoopGroup,在此期間完成管道初始化的第二步驟(比如執行ChannelInitializer的handlerAdd方法)
4、繫結埠,開始接受客戶端連線。

我們在ServerBootstrap進行引導的過程中,需要呼叫channel方法指定一個ServerChannel實現類的Class物件,channel方法隨即會生成一個ChannelFactory工廠於ServerBootstrap例項中。NioServerSocketChannel的例項化在ServerBootstrap的bind方法中完成。bind方法呼叫的doBind方法中的initAndRegister方法中,會通過ChannelFactory例項構造出一個NioServerSocketChannel:

final ChannelFuture initAndRegister() {
Channel channel = null; try { //通過ChannelFactory構造一個Channel例項 channel = channelFactory().newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { channel.unsafe().closeForcibly(); return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } //獲取bossGroup例項並呼叫register方法繫結 ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }

通過ChannelFactory構造的Channel都是通過無參構造方法構造的,我們來分析NioServerSocketChannel的構造方法:

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

這裡通過靜態newSocket方法構造了一個JDK的ServerSocketChannel:

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException("Failed to open a server socket.", e);
    }
}

這裡通過預設的SelectorProvider例項通過呼叫它的openServerSocketChannel構造了一個JDK的ServerSocketChannel例項,接著呼叫另外的過載的構造方法

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

NioServerSocketChannelConfig是NioServerSocketChannel的內部類,儲存了以下內容:NioServerSocketChannel(this)、和剛剛構造的ServerSocketChannel通過socket方法獲取的ServerSocket例項、引導過程通過options方法設定的引數。

NioServerSocketChannel父類AbstractNioMessageChannel的構造方法:

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}

AbstractNioMessageChannel父類AbstractNioChannel構造方法:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch; //傳入JDK ServerSocketChannel
    this.readInterestOp = readInterestOp; //感興趣通道事件為OP_ACCEPT
    try { //將ServerSocketChannel設為非阻塞
        ch.configureBlocking(false);
    } catch (IOException e) { //如果丟擲異常那麼關閉ServerSocketChannel
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled())
                logger.warn("Failed to close a partially initialized socket.", e2);
        }
        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

AbstractNioChannel父類AbstractChannel構造方法:

protected AbstractChannel(Channel parent) {
    this.parent = parent; //ServerSocketChannel不存在父Channel
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

ServerSocketChannel不存在父Channel,所以parent為null,只有SocketChannel存在(其parent為ServerSocketChannel例項)。
接著通過newUnsafe方法構造一個Channel.Unsafe介面實現類。在客戶端引導過程已經提到過,Unsafe是Netty到JDK NIO的橋樑,Unsafe介面定義了以下方法:

方法簽名 作用
SocketAddress localAddress() 獲取該Channel的本地地址
SocketAddress remoteAddress() 獲取該Channel連線地址
void register(EventLoop, ChannelPromise) 向EventLoop註冊這個Channel,並通知指定的ChannelPromise
void bind(SocketAddress, ChannelPromise) 指定一個本地地址,將Channel繫結在這個地址,並通知指定的ChannelPromise
void connect(SocketAddress, SocketAddress, ChannelPromise) 通過本地地址向指定的地址發起連線,並通知ChannelPromise
void disconnect(ChannelPromise) 取消連線,並通知指定的ChannelPromise
void close(ChannelPromise) 關閉Channel通道,並通知指定的ChannelPromise
void closeForcibly() 立刻關閉Channel通道,不通知ChannelPromise
void deregister(ChannelPromise) 將這個Channel從EventLoop取消註冊,並通知指定的ChannelPromise
void beginRead() 寫入緩衝區,以便進站處理器能夠讀到進站資料
void write(Object, ChannelPromise) 向緩衝區中寫入資料,並通知指定的ChannelPromise
void flush() 輸出呼叫write方法寫入的緩衝區資料
ChannelPromise voidPromise() 返回這個Channel的特殊ChannelPromise(Void Promise),用於將ChannelPromise作為引數但不希望得到通知的操作。
ChannelOutboundBuffer outboundBuffer() 獲取出站緩衝區

對於NioServerSocketChannel,其newUnsafe方法實現在父類AbstractNioMessageChannel中:

@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioMessageUnsafe();
}

newChannelPipeline方法則是比較簡單,直接構造一個DefaultChannelPipeline就完了。

回到ServerBootstrap:當initAndRegister方法執行結束後,init方法隨即會被呼叫並傳入剛才構造的ServerSocketChannel例項作為引數:

@Override
void init(Channel channel) throws Exception {
	//將通過option方法指定引數加入到Map中
    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }
    //將初始化屬性傳入到Map中
    final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
    ChannelPipeline p = channel.pipeline();
    //獲取workGroup、childGroup指定的ChannelHandler
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }
	//向管道尾部新增一個ChannelInitializer例項
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = handler();
            //新增使用者自定義的bossGroup的ChannelHandler
            if (handler != null)
                pipeline.addLast(handler);
			//向管道尾部新增一個ServerBootstrapAcceptor例項
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, 
                    		currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

init方法將引導過程通過options方法設定的引數匯入到NioServerSocketChannel的ChannelConfig中,然後再將初始屬性匯入到NioServerSocketChannel間接父類DefaultAttributeMap中。

完成上述步驟後,向ServerSocketChannel所屬的管道尾部新增一個ChannelInitializer例項,它重寫了initChannel方法,並指定向管道新增使用者自定義的ChannelHandler(通過handler方法指定的),然後再非同步新增一個ServerBootstrapAcceptor例項,用於將接收到的客戶端連線對應的NioSocketChannel轉交給workGroup進行管理。

前幾篇部落格我們已經提到過,ChannelInitializer在完成自己的工作(執行完initChannel方法)後會將自己從管道中移除。但是就目前而言還僅僅只是完成了管道初始化的第一步驟,因為ChannelInitializer的handlerAdd方法還尚未呼叫。

init方法執行完畢後,就會將這個ServerSocketChannel註冊到bossGroup中,最終是通過AbstractUnsafe的register方法進行註冊:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null)
        throw new NullPointerException("eventLoop");
    if (isRegistered()) { //不允許重複註冊或者同一個Channel註冊到不同的EventLoop中
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) { //如果不能註冊到這種型別的EventLoop中
        promise.setFailure(new IllegalStateException("incompatible event loop type: " +
        		eventLoop.getClass().getName()));
        return;
    }
    AbstractChannel.this.eventLoop = eventLoop;
    //如果當前執行緒就是eventLoop所屬的執行緒,那麼直接執行register0方法,否則非同步執行
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

register方法在進行一系列引數檢查和狀態檢查後,繼而會執行register0方法:

private void register0(ChannelPromise promise) {
    try {
        if (!promise.setUncancellable() || !ensureOpen(promise))
            return;
        boolean firstRegistration = neverRegistered; //預設為true
        doRegister(); //註冊到Selector中
        neverRegistered = false;
        registered = true;
        pipeline.invokeHandlerAddedIfNeeded();
        safeSetSuccess(promise);
        //向管道發出Channel註冊事件
        pipeline.fireChannelRegistered();
        if (isActive()) { //如果Channel是可用的
            if (firstRegistration) {
            	//向管道發出Channel可用事件
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

register0方法首先呼叫doRegister進行註冊:

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}

doRegister將ServerSocketChannel註冊到了EventLoop的Selector上,並將附加物件設為this。doRegister方法執行結束後,就可以認為Channel已經註冊到了EventLoop中,因為現在這個Channel持有的JDK Channel已經被EventLoop持有的Selector管理。
隨後,就會呼叫管道物件的invokeHandlerAddedIfNeeded方法:

final void invokeHandlerAddedIfNeeded() {
    assert channel.eventLoop().inEventLoop();
    if (firstRegistration) {
        firstRegistration = false;
        callHandlerAddedForAllHandlers();
    }
}

invokeHandlerAddedIfNeeded此時就會呼叫callHandlerAddedForAllHandlers執行回撥任務。

讓我們回顧下服務端的引導過程,當呼叫addLast方法將ChannelInitializer新增到管道中後,如果管道檢測到這是它第一次呼叫addLast方法,並不會馬上執行ChannelInitializer的handlerAdd方法去執行我們重寫的方法,而是註冊了一個回撥任務,並把這個回撥任務新增到了DefaultChannelPipeline的成員變數pendingHandlerCallbackHead中,如果有多個回撥任務,那麼它可以通過它的成員變數next去維護一個單向連結串列。

瞭解這些後,我們再來看callHandlerAddedForAllHandlers這個方法的實現:

private void callHandlerAddedForAllHandlers() {
    final PendingHandlerCallback pendingHandlerCallbackHead;
    synchronized (this) {
        assert !registered;
        registered = true; 
        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
        this.pendingHandlerCallbackHead = null;
    }
    //遍歷該連結串列,依次執行回撥任務
    PendingHandlerCallback task = pendingHandlerCallbackHead;
    while (task != null) {
        task.execute();
        task = task.next;
    }
}

這樣,我們指定的ChannelInitializer的handlerAdd方法隨即會被呼叫,管道初始化的第二步驟也就隨之完成。

回到register0方法:
接下來會依次向管道傳遞ChannelRegister、ChannelActive事件。傳遞完成後,NioServerSocketChannel開始進入埠繫結過程。
回到ServerBootstrap的doBind方法,doBind方法會開始進行埠的繫結過程,最終會呼叫到NioServerSocketChannel的doBind方法:

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) { //如果JDK版本在1.7以上
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

這裡直接呼叫到了JDK ServerSocketChannel的bind方法直接繫結指定的埠,並指定了最大連線數(預設為128),可以通過呼叫options並傳入ChannelOption.SO_BACKLOG,然後指定一個最大連線數。當連線數超出後,客戶端的連線請求會被阻塞。

至此,NioServerSocketChannel初始化過程完成。

三、事件處理

初始化過程完成後,此時的NioServerSocketChannel已經被EventLoop的Selector所管理,Selector則由EventLoop所屬的執行緒進行輪詢。這個執行緒執行NioEventLoop的run方法,通過一個無限迴圈不停地處理IO事件和一般任務。

當有客戶端發起連線時,ServerSocketChannel會發出OP_ACCEPT事件,就會通過Unsafe的read方法(實現在AbstractNioMessageUnsafe)處理這個事件:

@Override
public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    
    if (!config.isAutoRead() && !isReadPending()) {
        // ChannelConfig.setAutoRead(false) was called in the meantime
    	//從事件集中移除這個事件
        removeReadOp();
        return;
    }
    //獲取每個迴圈讀取訊息的最大位元組數
    final int maxMessagesPerRead = config.getMaxMessagesPerRead();
    final ChannelPipeline pipeline = pipeline();
    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            for (;;) {
            	//將資料讀到readBuf中,返回讀取到的位元組數
                int localRead = doReadMessages(readBuf);
                if (localRead == 0)
                    break;
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                if (!config.isAutoRead())
                    break;
                //如果readBuf的長度大於maxMessagesPerRead,退出迴圈
                if (readBuf.size() >= maxMessagesPerRead)
                    break;
            }
        } catch (Throwable t) {
            exception = t;
        }
        setReadPending(false);
        //獲取讀取到的ByteBuf數量,並通過迴圈將這些ByteBuf傳遞給ChannelInboundHandler
        int size = readBuf.size
            
           

相關推薦

Netty4.0原始碼解析NioServerSocketChannel

一、引言 Netty的Channel在JDK NIO的Channel基礎上做了一層封裝,提供了更多的功能。Netty的中的Channel實現類主要有:NioServerSocketChannel(用於服務端非阻塞地接收TCP連線)、NioSocketChanne

Netty4.0原始碼解析位元組容器UnpooledHeapByteBuf

一、引言 Java NIO提供了ByteBuffer作為位元組容器,供Channel讀入和寫入資料。但ByteBuffer使用過於繁瑣,靈活性不夠強。Netty實現了ByteBuf來替代JDK的ByteBuffer。 ByteBuf有以下幾大優點: 1、它可以被

Netty4.0原始碼解析TCP粘包半包問題的解決方案

一、引言 TCP是一個基於流的協議,TCP作為傳輸層協議並不知道應用層協議的具體含義,它會根據TCP緩衝區的實際情況進行資料包的劃分,所以在應用層上認為是一個完整的包,可能會被TCP拆分成多個包進行傳送,也有可能把多個小的包封裝成一個大的資料包傳送,這就是所謂的

Redis原始碼解析15Resis主從複製之從節點流程

Redis原始碼解析:15Resis主從複製之從節點流程   版權宣告:本文為博主原創文章,未經博主允許不得轉載。 https://blog.csdn.net/gqtcgq/article/details/51172085        

Java集合類原始碼解析AbstractMap

目錄 引言 原始碼解析 抽象函式entrySet() 兩個集合檢視 操作方法 兩個子類 參考: 引言 今天學習一個Java集合的一個抽象類 AbstractMap ,AbstractMap 是Map介面的 實現類之一,也是HashMap、T

Java集合類原始碼解析HashMap (基於JDK1.8)

目錄 前言 HashMap的資料結構 深入原始碼 兩個引數 成員變數 四個構造方法 插入資料的方法:put() 雜湊函式:hash() 動態擴容:resize() 節點樹化、紅黑樹的拆分 節點樹化

Java集合類原始碼解析Vector

引言 之前的文章我們學習了一個集合類 ArrayList,今天講它的一個兄弟 Vector。 為什麼說是它兄弟呢?因為從容器的構造來說,Vector 簡直就是 ArrayList 的翻版,也是基於陣列的資料結構,不同的是,Vector的每個方法都加了 synchronized 修飾符,是執行緒安全的。 類

jQuery原始碼解析變數與函式

 //原始碼剖析都基於jQuery-2.0.3版本,主要考慮到相容IE 2行:jQuery javaScript Library v2.0.3——jQuery版本 3行:http://jQuery.com——官網 5~6行:Includes Sizzle.js;http://sizzlejs.

Spark2.2.2原始碼解析 3.啟動worker節點啟動流程分析

本文啟動worker節點啟動流程分析   啟動命令: ${SPARK_HOME}/sbin/start-slave.sh spark://sysadmindeMacBook-Pro.local:7077   檢視start-slave.sh  

Spark2.2.2原始碼解析 2.啟動master節點流程分析

本文主要說明在啟動master節點的時候,程式碼的流程走向。   授予檔案執行許可權 chmod755  兩個目錄裡的檔案: /workspace/spark-2.2.2/bin  --所有檔案 /workspace/spark-2.2.2/sb

Spring4原始碼解析BeanDefinition架構及實現

一、架構圖 首先共同看下總體的 Java Class Diagrams 圖: 二、具體類實現 2.1 AttributeAccessor 介面定義了一個通用的可對任意物件獲取、修改等操作元資料的附加契約。主要方法如下: public interface AttributeAcce

Tomcat原始碼解析Container中的Pipeline和Valve

前言:     我們在上一篇部落格 中分析了關於tomcat處理請求的全過程,在最後的時候交給了當前Engine的pipeline去處理。     Engine.pipeline獲取了first_valve,然後執行其invoke方法,即完成了請求

Tomcat原始碼解析Jsp檔案的編譯、實現

1.Jsp簡介     jsp(java server page),其根本是一個簡化的Servlet技術,是一種動態網頁技術標準。     它是在傳統的網頁HTML頁面中插入java程式碼段,從而形成jsp檔案,字尾為.jsp。  

Tomcat原始碼解析Web請求處理過程

前言:     Catalina是Tomcat提供的Servlet容器實現,它負責處理來自客戶端的請求並處理響應。     但是僅有Servlet容器伺服器是無法對外提供服務的,還需要由聯結器接收來自客戶端的請求,並按照既定協議進行解析,然後交由S

Tomcat原始碼解析Catalina原始碼解析

1.Catalina     對於Tomcat來說,Catalina是其核心元件,所有基於JSP/Servlet的Java Web應用均需要依託Servlet容器執行並對外提供服務。     4.0版本後,Tomcat完全重新設計了其Servlet

Redis5.0原始碼解析(七)----------字串物件

基於Redis5.0 字串物件 字串物件的編碼可以是 int 、 raw 或者 embstr 如果一個字串物件儲存的是整數值, 並且這個整數值可以用 long 型別來表示, 那麼字串物件會將整數值儲存在字串物件結構的 ptr 屬性裡面(將 void* 轉

Redis5.0原始碼解析(六)----------Redis物件

基於Redis5.0 之前介紹了 Redis 用到的所有主要資料結構, 比如簡單動態字串(SDS)、雙端連結串列、字典、跳躍表、整數集合, 等等,但Redis 並沒有直接使用這些資料結構來實現鍵值對資料庫, 而是基於這些資料結構建立了一個物件系統, 這個系統包含字串物件

Redis5.0原始碼解析(五)----------整數集合

基於redis5.0 整數集合(intset)是集合鍵的底層實現之一: 當一個集合只包含整數值元素, 並且這個集合的元素數量不多時, Redis 就會使用整數集合作為集合鍵的底層實現 redis> SADD numbers 1 3 5 7 9 (integer

Redis5.0原始碼解析(四)----------跳躍表

基於Redis5.0 跳躍表(skiplist)是一種有序資料結構, 它通過在每個節點中維持多個指向其他節點的指標, 從而達到快速訪問節點的目的 跳躍表支援平均 O(log N) 最壞 O(N) 複雜度的節點查詢, 還可以通過順序性操作來批量處理節點。 在大部分

Redis5.0原始碼解析(三)----------字典(詳細)

基於Redis5.0 在字典中, 一個鍵(key)可以和一個值(value)進行關聯(或者說將鍵對映為值), 這些關聯的鍵和值就被稱為鍵值對 字典中的每個鍵都是獨一無二的, 程式可以在字典中根據鍵查詢與之關聯的值, 或者通過鍵來更新值, 又或者根據鍵來刪除整個鍵值對