1. 程式人生 > >Netty原始碼解讀(三)Channel與Pipeline

Netty原始碼解讀(三)Channel與Pipeline

Channel是理解和使用Netty的核心。Channel的涉及內容較多,這裡我使用由淺入深的介紹方法。在這篇文章中,我們主要介紹Channel部分中Pipeline實現機制。為了避免枯燥,借用一下《盜夢空間》的“夢境”概念,希望大家喜歡。

一層夢境:Channel實現概覽

在Netty裡,Channel是通訊的載體,而ChannelHandler負責Channel中的邏輯處理。

那麼ChannelPipeline是什麼呢?我覺得可以理解為ChannelHandler的容器:一個Channel包含一個ChannelPipeline,所有ChannelHandler都會註冊到ChannelPipeline中,並按順序組織起來。

在Netty中,ChannelEvent是資料或者狀態的載體,例如傳輸的資料對應MessageEvent,狀態的改變對應ChannelStateEvent。當對Channel進行操作時,會產生一個ChannelEvent,併發送到ChannelPipeline。ChannelPipeline會選擇一個ChannelHandler進行處理。這個ChannelHandler處理之後,可能會產生新的ChannelEvent,並流轉到下一個ChannelHandler。

channel pipeline

例如,一個數據最開始是一個MessageEvent,它附帶了一個未解碼的原始二進位制訊息ChannelBuffer,然後某個Handler將其解碼成了一個數據物件,並生成了一個新的MessageEvent

,並傳遞給下一步進行處理。

到了這裡,可以看到,其實Channel的核心流程位於ChannelPipeline中。於是我們進入ChannelPipeline的深層夢境裡,來看看它具體的實現。

二層夢境:ChannelPipeline的主流程

Netty的ChannelPipeline包含兩條線路:Upstream和Downstream。Upstream對應上行,接收到的訊息、被動的狀態改變,都屬於Upstream。Downstream則對應下行,傳送的訊息、主動的狀態改變,都屬於Downstream。ChannelPipeline介面包含了兩個重要的方法:sendUpstream(ChannelEvent e)

sendDownstream(ChannelEvent e),就分別對應了Upstream和Downstream。

對應的,ChannelPipeline裡包含的ChannelHandler也包含兩類:ChannelUpstreamHandlerChannelDownstreamHandler。每條線路的Handler是互相獨立的。它們都很簡單的只包含一個方法:ChannelUpstreamHandler.handleUpstreamChannelDownstreamHandler.handleDownstream

Netty官方的javadoc裡有一張圖(ChannelPipeline接口裡),非常形象的說明了這個機制(我對原圖進行了一點修改,加上了ChannelSink,因為我覺得這部分對理解程式碼流程會有些幫助):

channel pipeline

什麼叫ChannelSink呢?ChannelSink包含一個重要方法ChannelSink.eventSunk,可以接受任意ChannelEvent。“sink”的意思是”下沉”,那麼”ChannelSink”好像可以理解為”Channel下沉的地方”?實際上,它的作用確實是這樣,也可以換個說法:“處於末尾的萬能Handler”。最初讀到這裡,也有些困惑,這麼理解之後,就感覺簡單許多。只有Downstream包含ChannelSink,這裡會做一些建立連線、繫結埠等重要操作。為什麼UploadStream沒有ChannelSink呢?我只能認為,一方面,不符合”sink”的意義,另一方面,也沒有什麼處理好做的吧!

這裡有個值得注意的地方:在一條“流”裡,一個ChannelEvent並不會主動的”流”經所有的Handler,而是由上一個Handler顯式的呼叫ChannelPipeline.sendUp(Down)stream產生,並交給下一個Handler處理。也就是說,每個Handler接收到一個ChannelEvent,並處理結束後,如果需要繼續處理,那麼它需要呼叫sendUp(Down)stream新發起一個事件。如果它不再發起事件,那麼處理就到此結束,即使它後面仍然有Handler沒有執行。這個機制可以保證最大的靈活性,當然對Handler的先後順序也有了更嚴格的要求。

下面我們從程式碼層面來對這裡面發生的事情進行深入分析,這部分涉及到一些細節,需要開啟專案原始碼,對照來看,會比較有收穫。

三層夢境:深入ChannelPipeline內部

DefaultChannelPipeline的內部結構

ChannelPipeline的主要的實現程式碼在DefaultChannelPipeline類裡。列一下DefaultChannelPipeline的主要欄位:

public class DefaultChannelPipeline implements ChannelPipeline {

    private volatile Channel channel;
    private volatile ChannelSink sink;
    private volatile DefaultChannelHandlerContext head;
    private volatile DefaultChannelHandlerContext tail;
    private final Map<String, DefaultChannelHandlerContext> name2ctx =
        new HashMap<String, DefaultChannelHandlerContext>(4);
}

這裡需要介紹一下ChannelHandlerContext這個介面。顧名思義,ChannelHandlerContext儲存了Netty與Handler相關的的上下文資訊。而咱們這裡的DefaultChannelHandlerContext,則是對ChannelHandler的一個包裝。一個DefaultChannelHandlerContext內部,除了包含一個ChannelHandler,還儲存了”next”和”prev”兩個指標,從而形成一個雙向連結串列。

因此,在DefaultChannelPipeline中,我們看到的是對DefaultChannelHandlerContext的引用,而不是對ChannelHandler的直接引用。這裡包含”head”和”tail”兩個引用,分別指向連結串列的頭和尾。而name2ctx則是一個按名字索引DefaultChannelHandlerContext使用者的一個map,主要在按照名稱刪除或者新增ChannelHandler時使用。

sendUpstream和sendDownstream

前面提到了,ChannelPipeline介面的兩個重要的方法:sendUpstream(ChannelEvent e)sendDownstream(ChannelEvent e)。所有事件的發起都是基於這兩個方法進行的。Channels類有一系列fireChannelBound之類的fireXXXX方法,其實都是對這兩個方法的facade包裝。

下面來看一下這兩個方法的實現。先看sendUpstream(對程式碼做了一些簡化,保留主邏輯):

public void sendUpstream(ChannelEvent e) {
    DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
    head.getHandler().handleUpstream(head, e);
}

private DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
    DefaultChannelHandlerContext realCtx = ctx;
    while (!realCtx.canHandleUpstream()) {
        realCtx = realCtx.next;
        if (realCtx == null) {
            return null;
        }
    }
    return realCtx;
}

這裡最終呼叫了ChannelUpstreamHandler.handleUpstream來處理這個ChannelEvent。有意思的是,這裡我們看不到任何”將Handler向後移一位”的操作,但是我們總不能每次都用同一個Handler來進行處理啊?實際上,我們更為常用的是ChannelHandlerContext.handleUpstream方法(實現是DefaultChannelHandlerContext.sendUpstream方法):

public void sendUpstream(ChannelEvent e) {
	DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
	DefaultChannelPipeline.this.sendUpstream(next, e);
}

可以看到,這裡最終仍然呼叫了ChannelPipeline.sendUpstream方法,但是它會將Handler指標後移。

我們接下來看看DefaultChannelHandlerContext.sendDownstream:

public void sendDownstream(ChannelEvent e) {
	DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
	if (prev == null) {
		try {
			getSink().eventSunk(DefaultChannelPipeline.this, e);
		} catch (Throwable t) {
			notifyHandlerException(e, t);
		}
	} else {
		DefaultChannelPipeline.this.sendDownstream(prev, e);
	}
}

與sendUpstream好像不大相同哦?這裡有兩點:一是到達末尾時,就如夢境二所說,會呼叫ChannelSink進行處理;二是這裡指標是往前移的,所以我們知道了:

UpstreamHandler是從前往後執行的,DownstreamHandler是從後往前執行的。在ChannelPipeline裡新增時需要注意順序了!

DefaultChannelPipeline裡還有些機制,像新增/刪除/替換Handler,以及ChannelPipelineFactory等,比較好理解,就不細說了。

回到現實:Pipeline解決的問題

好了,深入分析完程式碼,有點頭暈了,我們回到最開始的地方,來想一想,Netty的Pipeline機制解決了什麼問題?

我認為至少有兩點:

一是提供了ChannelHandler的程式設計模型,基於ChannelHandler開發業務邏輯,基本不需要關心網路通訊方面的事情,專注於編碼/解碼/邏輯處理就可以了。Handler也是比較方便的開發模式,在很多框架中都有用到。

二是實現了所謂的”Universal Asynchronous API”。這也是Netty官方標榜的一個功能。用過OIO和NIO的都知道,這兩套API風格相差極大,要從一個遷移到另一個成本是很大的。即使是NIO,非同步和同步程式設計差距也很大。而Netty遮蔽了OIO和NIO的API差異,通過Channel提供對外介面,並通過ChannelPipeline將其連線起來,因此替換起來非常簡單。

universal API

理清了ChannelPipeline的主流程,我們對Channel部分的大致結構算是弄清楚了。可是到了這裡,我們依然對一個連線具體怎麼處理沒有什麼概念。在下篇文章,我們會分析一下,在Netty中,究竟是如何處理連線的建立、資料的傳輸這些事情的。

參考資料: