netty開發tcp資料傳輸編解碼框架使用
作為一個高效能的非同步、NIO通訊框架,編解碼框架是Netty的重要組成部分。儘管站在微核心的角度看,編解碼框架並不是Netty微核心的組成部分,但是通過ChannelHandler定製擴展出的編解碼框架卻是不可或缺的。
下面我們從幾個角度詳細談下這個話題,首先一起看下Netty的邏輯架構圖:
圖2-1 Netty邏輯架構圖
從網路讀取的inbound訊息,需要經過解碼,將二進位制的資料報轉換成應用層協議訊息或者業務訊息,才能夠被上層的應用邏輯識別和處理;同理,使用者傳送到網路的outbound業務訊息,需要經過編碼轉換成二進位制位元組陣列(對於Netty就是ByteBuf)才能夠傳送到網路對端。編碼和解碼功能是NIO框架的有機組成部分,無論是由業務定製擴充套件實現,還是NIO框架內建編解碼能力,該功能是必不可少的。
為了降低使用者的開發難度,Netty對常用的功能和API做了裝飾,以遮蔽底層的實現細節。編解碼功能的定製,對於熟悉Netty底層實現的開發者而言,直接基於ChannelHandler擴充套件開發,難度並不是很大。但是對於大多數初學者或者不願意去了解底層實現細節的使用者,需要提供給他們更簡單的類庫和API,而不是ChannelHandler。
Netty在這方面做得非常出色,針對編解碼功能,它既提供了通用的編解碼框架供使用者擴充套件,又提供了常用的編解碼類庫供使用者直接使用。在保證定製擴充套件性的基礎之上,儘量降低使用者的開發工作量和開發門檻,提升開發效率。
Netty預置的編解碼功能列表如下:base64、Protobuf、JBoss Marshalling、spdy等。
圖2-2 Netty預置的編解碼功能列表
2.2. 常用的解碼器
2.2.1. LineBasedFrameDecoder解碼器
LineBasedFrameDecoder是回車換行解碼器,如果使用者傳送的訊息以回車換行符作為訊息結束的標識,則可以直接使用Netty的LineBasedFrameDecoder對訊息進行解碼,只需要在初始化Netty服務端或者客戶端時將LineBasedFrameDecoder正確的新增到ChannelPipeline中即可,不需要自己重新實現一套換行解碼器。
LineBasedFrameDecoder的工作原理是它依次遍歷ByteBuf中的可讀位元組,判斷看是否有“\n”或者“\r\n”,如果有,就以此位置為結束位置,從可讀索引到結束位置區間的位元組就組成了一行。它是以換行符為結束標誌的解碼器,支援攜帶結束符或者不攜帶結束符兩種解碼方式,同時支援配置單行的最大長度。如果連續讀取到最大長度後仍然沒有發現換行符,就會丟擲異常,同時忽略掉之前讀到的異常碼流。防止由於資料報沒有攜帶換行符導致接收到ByteBuf無限制積壓,引起系統記憶體溢位。
它的使用效果如下:
解碼之前: +------------------------------------------------------------------+ 接收到的資料報 “This is a netty example for using the nio framework.\r\n When you“ +------------------------------------------------------------------+ 解碼之後的ChannelHandler接收到的Object如下: +------------------------------------------------------------------+ 解碼之後的文字訊息 “This is a netty example for using the nio framework.“ +------------------------------------------------------------------+
通常情況下,LineBasedFrameDecoder會和StringDecoder配合使用,組合成按行切換的文字解碼器,對於文字類協議的解析,文字換行解碼器非常實用,例如對HTTP訊息頭的解析、FTP協議訊息的解析等。
下面我們簡單給出文字換行解碼器的使用示例:
@Override protected void initChannel(SocketChannel arg0) throws Exception { arg0.pipeline().addLast(new LineBasedFrameDecoder(1024)); arg0.pipeline().addLast(new StringDecoder()); arg0.pipeline().addLast(new UserServerHandler()); }
初始化Channel的時候,首先將LineBasedFrameDecoder新增到ChannelPipeline中,然後再依次新增字串解碼器StringDecoder,業務Handler。
2.2.2. DelimiterBasedFrameDecoder解碼器
DelimiterBasedFrameDecoder是分隔符解碼器,使用者可以指定訊息結束的分隔符,它可以自動完成以分隔符作為碼流結束標識的訊息的解碼。回車換行解碼器實際上是一種特殊的DelimiterBasedFrameDecoder解碼器。
分隔符解碼器在實際工作中也有很廣泛的應用,筆者所從事的電信行業,很多簡單的文字私有協議,都是以特殊的分隔符作為訊息結束的標識,特別是對於那些使用長連線的基於文字的私有協議。
分隔符的指定:與大家的習慣不同,分隔符並非以char或者string作為構造引數,而是ByteBuf,下面我們就結合實際例子給出它的用法。
假如訊息以“$_”作為分隔符,服務端或者客戶端初始化ChannelPipeline的程式碼例項如下:
@Override public void initChannel(SocketChannel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$_" .getBytes()); ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new UserServerHandler()); }
首先將“$_”轉換成ByteBuf物件,作為引數構造DelimiterBasedFrameDecoder,將其新增到ChannelPipeline中,然後依次新增字串解碼器(通常用於文字解碼)和使用者Handler,請注意解碼器和Handler的新增順序,如果順序顛倒,會導致訊息解碼失敗。
DelimiterBasedFrameDecoder原理分析:解碼時,判斷當前已經讀取的ByteBuf中是否包含分隔符ByteBuf,如果包含,則擷取對應的ByteBuf返回,原始碼如下:
詳細分析下indexOf(buffer, delim)方法的實現,程式碼如下:
該演算法與Java String中的搜尋演算法類似,對於原字串使用兩個指標來進行搜尋,如果搜尋成功,則返回索引位置,否則返回-1。
2.2.3. FixedLengthFrameDecoder解碼器
FixedLengthFrameDecoder是固定長度解碼器,它能夠按照指定的長度對訊息進行自動解碼,開發者不需要考慮TCP的粘包/拆包等問題,非常實用。
對於定長訊息,如果訊息實際長度小於定長,則往往會進行補位操作,它在一定程度上導致了空間和資源的浪費。但是它的優點也是非常明顯的,編解碼比較簡單,因此在實際專案中仍然有一定的應用場景。
利用FixedLengthFrameDecoder解碼器,無論一次接收到多少資料報,它都會按照建構函式中設定的固定長度進行解碼,如果是半包訊息,FixedLengthFrameDecoder會快取半包訊息並等待下個包到達後進行拼包,直到讀取到一個完整的包。
假如單條訊息的長度是20位元組,使用FixedLengthFrameDecoder解碼器的效果如下:
解碼前: +------------------------------------------------------------------+ 接收到的資料報 “HELLO NETTY FOR USER DEVELOPER“ +------------------------------------------------------------------+ 解碼後: +------------------------------------------------------------------+ 解碼後的資料報 “HELLO NETTY FOR USER“ +------------------------------------------------------------------+
2.2.4. LengthFieldBasedFrameDecoder解碼器
瞭解TCP通訊機制的讀者應該都知道TCP底層的粘包和拆包,當我們在接收訊息的時候,顯示不能認為讀取到的報文就是個整包訊息,特別是對於採用非阻塞I/O和長連線通訊的程式。
如何區分一個整包訊息,通常有如下4種做法:
1) 固定長度,例如每120個位元組代表一個整包訊息,不足的前面補位。解碼器在處理這類定常訊息的時候比較簡單,每次讀到指定長度的位元組後再進行解碼;
2) 通過回車換行符區分訊息,例如HTTP協議。這類區分訊息的方式多用於文字協議;
3) 通過特定的分隔符區分整包訊息;
4) 通過在協議頭/訊息頭中設定長度欄位來標識整包訊息。
前三種解碼器之前的章節已經做了詳細介紹,下面讓我們來一起學習最後一種通用解碼器-LengthFieldBasedFrameDecoder。
大多數的協議(私有或者公有),協議頭中會攜帶長度欄位,用於標識訊息體或者整包訊息的長度,例如SMPP、HTTP協議等。由於基於長度解碼需求的通用性,以及為了降低使用者的協議開發難度,Netty提供了LengthFieldBasedFrameDecoder,自動遮蔽TCP底層的拆包和粘包問題,只需要傳入正確的引數,即可輕鬆解決“讀半包“問題。
下面我們看看如何通過引數組合的不同來實現不同的“半包”讀取策略。第一種常用的方式是訊息的第一個欄位是長度欄位,後面是訊息體,訊息頭中只包含一個長度欄位。它的訊息結構定義如圖所示:
圖2-3 解碼前的位元組緩衝區(14位元組)
使用以下引數組合進行解碼:
1) lengthFieldOffset = 0;
2) lengthFieldLength = 2;
3) lengthAdjustment = 0;
4) initialBytesToStrip = 0。
解碼後的位元組緩衝區內容如圖所示:
圖2-4 解碼後的位元組緩衝區(14位元組)
通過ByteBuf.readableBytes()方法我們可以獲取當前訊息的長度,所以解碼後的位元組緩衝區可以不攜帶長度欄位,由於長度欄位在起始位置並且長度為2,所以將initialBytesToStrip設定為2,引數組合修改為:
1) lengthFieldOffset = 0;
2) lengthFieldLength = 2;
3) lengthAdjustment = 0;
4) initialBytesToStrip = 2。
解碼後的位元組緩衝區內容如圖所示:
圖2-5 跳過長度欄位解碼後的位元組緩衝區(12位元組)
解碼後的位元組緩衝區丟棄了長度欄位,僅僅包含訊息體,對於大多數的協議,解碼之後訊息長度沒有用處,因此可以丟棄。
在大多數的應用場景中,長度欄位僅用來標識訊息體的長度,這類協議通常由訊息長度欄位+訊息體組成,如上圖所示的幾個例子。但是,對於某些協議,長度欄位還包含了訊息頭的長度。在這種應用場景中,往往需要使用lengthAdjustment進行修正。由於整個訊息(包含訊息頭)的長度往往大於訊息體的長度,所以,lengthAdjustment為負數。圖2-6展示了通過指定lengthAdjustment欄位來包含訊息頭的長度:
1) lengthFieldOffset = 0;
2) lengthFieldLength = 2;
3) lengthAdjustment = -2;
4) initialBytesToStrip = 0。
解碼之前的碼流:
圖2-6 包含長度欄位自身的碼流
解碼之後的碼流:
圖2-7 解碼後的碼流
由於協議種類繁多,並不是所有的協議都將長度欄位放在訊息頭的首位,當標識訊息長度的欄位位於訊息頭的中間或者尾部時,需要使用lengthFieldOffset欄位進行標識,下面的引數組合給出瞭如何解決訊息長度欄位不在首位的問題:
1) lengthFieldOffset = 2;
2) lengthFieldLength = 3;
3) lengthAdjustment = 0;
4) initialBytesToStrip = 0。
其中lengthFieldOffset表示長度欄位在訊息頭中偏移的位元組數,lengthFieldLength 表示長度欄位自身的長度,解碼效果如下:
解碼之前:
圖2-8 長度欄位偏移的原始碼流
解碼之後:
圖2-9長度欄位偏移解碼後的碼流
由於訊息頭1的長度為2,所以長度欄位的偏移量為2;訊息長度欄位Length為3,所以lengthFieldLength值為3。由於長度欄位僅僅標識訊息體的長度,所以lengthAdjustment和initialBytesToStrip都為0。
最後一種場景是長度欄位夾在兩個訊息頭之間或者長度欄位位於訊息頭的中間,前後都有其它訊息頭欄位,在這種場景下如果想忽略長度欄位以及其前面的其它訊息頭欄位,則可以通過initialBytesToStrip引數來跳過要忽略的位元組長度,它的組合配置示意如下:
1) lengthFieldOffset = 1;
2) lengthFieldLength = 2;
3) lengthAdjustment = 1;
4) initialBytesToStrip = 3。
解碼之前的碼流(16位元組):
圖2-10長度欄位夾在訊息頭中間的原始碼流(16位元組)
解碼之後的碼流(13位元組):
圖2-11長度欄位夾在訊息頭中間解碼後的碼流(13位元組)
由於HDR1的長度為1,所以長度欄位的偏移量lengthFieldOffset為1;長度欄位為2個位元組,所以lengthFieldLength為2。由於長度欄位是訊息體的長度,解碼後如果攜帶訊息頭中的欄位,則需要使用lengthAdjustment進行調整,此處它的值為1,代表的是HDR2的長度,最後由於解碼後的緩衝區要忽略長度欄位和HDR1部分,所以lengthAdjustment為3。解碼後的結果為13個位元組,HDR1和Length欄位被忽略。
事實上,通過4個引數的不同組合,可以達到不同的解碼效果,使用者在使用過程中可以根據業務的實際情況進行靈活調整。
由於TCP存在粘包和組包問題,所以通常情況下使用者需要自己處理半包訊息。利用LengthFieldBasedFrameDecoder解碼器可以自動解決半包問題,它的習慣用法如下:
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65536,0,2)); pipeline.addLast("UserDecoder", new UserDecoder());
在pipeline中增加LengthFieldBasedFrameDecoder解碼器,指定正確的引數組合,它可以將Netty的ByteBuf解碼成整包訊息,後面的使用者解碼器拿到的就是個完整的資料報,按照邏輯正常進行解碼即可,不再需要額外考慮“讀半包”問題,降低了使用者的開發難度。
2.3. 常用的編碼器
Netty並沒有提供與2.2章節匹配的編碼器,原因如下:
1) 2.2章節介紹的4種常用的解碼器本質都是解析一個完整的資料報給後端,主要用於解決TCP底層粘包和拆包;對於編碼,就是將POJO物件序列化為ByteBuf,不需要與TCP層面打交道,也就不存在半包編碼問題。從應用場景和需要解決的實際問題角度看,雙方是非對等的;
2) 很難抽象出合適的編碼器,對於不同的使用者和應用場景,序列化技術不盡相同,在Netty底層統一抽象封裝也並不合適。
Netty預設提供了豐富的編解碼框架供使用者整合使用,本文對較常用的Java序列化編碼器進行講解。其它的編碼器,實現方式大同小異。
2.3.1. ObjectEncoder編碼器
ObjectEncoder是Java序列化編碼器,它負責將實現Serializable介面的物件序列化為byte [],然後寫入到ByteBuf中用於訊息的跨網路傳輸。
下面我們一起分析下它的實現:
首先,我們發現它繼承自MessageToByteEncoder,它的作用就是將物件編碼成ByteBuf:
如果要使用Java序列化,物件必須實現Serializable介面,因此,它的泛型型別為Serializable。
MessageToByteEncoder的子類只需要實現encode(ChannelHandlerContext ctx, I msg, ByteBuf out)方法即可,下面我們重點關注encode方法的實現:
首先建立ByteBufOutputStream和ObjectOutputStream,用於將Object物件序列化到ByteBuf中,值得注意的是在writeObject之前需要先將長度欄位(4個位元組)預留,用於後續長度欄位的更新。
依次寫入長度佔位符(4位元組)、序列化之後的Object物件,之後根據ByteBuf的writeIndex計算序列化之後的碼流長度,最後呼叫ByteBuf的setInt(int index, int value)更新長度佔位符為實際的碼流長度。
有個細節需要注意,更新碼流長度欄位使用了setInt方法而不是writeInt,原因就是setInt方法只更新內容,並不修改readerIndex和writerIndex。
3. Netty編解碼框架可定製性
儘管Netty預置了豐富的編解碼類庫功能,但是在實際的業務開發過程中,總是需要對編解碼功能做一些定製。使用Netty的編解碼框架,可以非常方便的進行協議定製。本章節將對常用的支援定製的編解碼類庫進行講解,以期讓讀者能夠儘快熟悉和掌握編解碼框架。
3.1. 解碼器
3.1.1. ByteToMessageDecoder抽象解碼器
使用NIO進行網路程式設計時,往往需要將讀取到的位元組陣列或者位元組緩衝區解碼為業務可以使用的POJO物件。為了方便業務將ByteBuf解碼成業務POJO物件,Netty提供了ByteToMessageDecoder抽象工具解碼類。
使用者自定義解碼器繼承ByteToMessageDecoder,只需要實現void decode(ChannelHandler Context ctx, ByteBuf in, List<Object> out)抽象方法即可完成ByteBuf到POJO物件的解碼。
由於ByteToMessageDecoder並沒有考慮TCP粘包和拆包等場景,使用者自定義解碼器需要自己處理“讀半包”問題。正因為如此,大多數場景不會直接繼承ByteToMessageDecoder,而是繼承另外一些更高階的解碼器來遮蔽半包的處理。
實際專案中,通常將LengthFieldBasedFrameDecoder和ByteToMessageDecoder組合使用,前者負責將網路讀取的資料報解碼為整包訊息,後者負責將整包訊息解碼為最終的業務物件。
除了和其它解碼器組合形成新的解碼器之外,ByteToMessageDecoder也是很多基礎解碼器的父類,它的繼承關係如下圖所示:
圖3-1 ByteToMessageDecoder繼承關係圖
3.1.2. MessageToMessageDecoder抽象解碼器
MessageToMessageDecoder實際上是Netty的二次解碼器,它的職責是將一個物件二次解碼為其它物件。
為什麼稱它為二次解碼器呢?我們知道,從SocketChannel讀取到的TCP資料報是ByteBuffer,實際就是位元組陣列。我們首先需要將ByteBuffer緩衝區中的資料報讀取出來,並將其解碼為Java物件;然後對Java物件根據某些規則做二次解碼,將其解碼為另一個POJO物件。因為MessageToMessageDecoder在ByteToMessageDecoder之後,所以稱之為二次解碼器。
二次解碼器在實際的商業專案中非常有用,以HTTP+XML協議棧為例,第一次解碼往往是將位元組陣列解碼成HttpRequest物件,然後對HttpRequest訊息中的訊息體字串進行二次解碼,將XML格式的字串解碼為POJO物件,這就用到了二次解碼器。類似這樣的場景還有很多,不再一一列舉。
事實上,做一個超級複雜的解碼器將多個解碼器組合成一個大而全的MessageToMessageDecoder解碼器似乎也能解決多次解碼的問題,但是採用這種方式的程式碼可維護性會非常差。例如,如果我們打算在HTTP+XML協議棧中增加一個列印碼流的功能,即首次解碼獲取HttpRequest物件之後列印XML格式的碼流。如果採用多個解碼器組合,在中間插入一個列印訊息體的Handler即可,不需要修改原有的程式碼;如果做一個大而全的解碼器,就需要在解碼的方法中增加列印碼流的程式碼,可擴充套件性和可維護性都會變差。
使用者的解碼器只需要實現void decode(ChannelHandlerContext ctx, I msg, List<Object> out)抽象方法即可,由於它是將一個POJO解碼為另一個POJO,所以一般不會涉及到半包的處理,相對於ByteToMessageDecoder更加簡單些。它的繼承關係圖如下所示:
圖3-2 MessageToMessageDecoder 解碼器繼承關係圖
3.2. 編碼器
3.2.1. MessageToByteEncoder抽象編碼器
MessageToByteEncoder負責將POJO物件編碼成ByteBuf,使用者的編碼器繼承Message ToByteEncoder,實現void encode(ChannelHandlerContext ctx, I msg, ByteBuf out)介面介面,示例程式碼如下:
public class IntegerEncoder extends MessageToByteEncoder<Integer> { @Override public void encode(ChannelHandlerContext ctx, Integer msg,ByteBuf out) throws Exception { out.writeInt(msg); } }
它的實現原理如下:呼叫write操作時,首先判斷當前編碼器是否支援需要傳送的訊息,如果不支援則直接透傳;如果支援則判斷緩衝區的型別,對於直接記憶體分配ioBuffer(堆外記憶體),對於堆記憶體通過heapBuffer方法分配,原始碼如下:
編碼使用的緩衝區分配完成之後,呼叫encode抽象方法進行編碼,方法定義如下:它由子類負責具體實現。
編碼完成之後,呼叫ReferenceCountUtil的release方法釋放編碼物件msg。對編碼後的ByteBuf進行以下判斷:
1) 如果緩衝區包含可傳送的位元組,則呼叫ChannelHandlerContext的write方法傳送ByteBuf;
2) 如果緩衝區沒有包含可寫的位元組,則需要釋放編碼後的ByteBuf,寫入一個空的ByteBuf到ChannelHandlerContext中。
傳送操作完成之後,在方法退出之前釋放編碼緩衝區ByteBuf物件。
3.2.2. MessageToMessageEncoder抽象編碼器
將一個POJO物件編碼成另一個物件,以HTTP+XML協議為例,它的一種實現方式是:先將POJO物件編碼成XML字串,再將字串編碼為HTTP請求或者應答訊息。對於複雜協議,往往需要經歷多次編碼,為了便於功能擴充套件,可以通過多個編碼器組合來實現相關功能。
使用者的解碼器繼承MessageToMessageEncoder解碼器,實現void encode(Channel HandlerContext ctx, I msg, List<Object> out)方法即可。注意,它與MessageToByteEncoder的區別是輸出是物件列表而不是ByteBuf,示例程式碼如下:
public class IntegerToStringEncoder extends MessageToMessageEncoder <Integer> { @Override public void encode(ChannelHandlerContext ctx, Integer message, List<Object> out) throws Exception { out.add(message.toString()); } }
MessageToMessageEncoder編碼器的實現原理與之前分析的MessageToByteEncoder相似,唯一的差別是它編碼後的輸出是個中間物件,並非最終可傳輸的ByteBuf。
簡單看下它的原始碼實現:建立RecyclableArrayList物件,判斷當前需要編碼的物件是否是編碼器可處理的型別,如果不是,則忽略,執行下一個ChannelHandler的write方法。
具體的編碼方法實現由使用者子類編碼器負責完成,如果編碼後的RecyclableArrayList為空,說明編碼沒有成功,釋放RecyclableArrayList引用。
如果編碼成功,則通過遍歷RecyclableArrayList,迴圈傳送編碼後的POJO物件,程式碼如下所示:
3.2.3. LengthFieldPrepender編碼器
如果協議中的第一個欄位為長度欄位,Netty提供了LengthFieldPrepender編碼器,它可以計算當前待發送訊息的二進位制位元組長度,將該長度新增到ByteBuf的緩衝區頭中,如圖所示:
圖3-3 LengthFieldPrepender編碼器
通過LengthFieldPrepender可以將待發送訊息的長度寫入到ByteBuf的前2個位元組,編碼後的訊息組成為長度欄位+原訊息的方式。
通過設定LengthFieldPrepender為true,訊息長度將包含長度本身佔用的位元組數,開啟LengthFieldPrepender後,圖3-3示例中的編碼結果如下圖所示:
圖3-4 開啟LengthFieldPrepender開關後編碼效果
LengthFieldPrepender工作原理分析如下:首先對長度欄位進行設定,如果需要包含訊息長度自身,則在原來長度的基礎之上再加上lengthFieldLength的長度。
如果調整後的訊息長度小於0,則丟擲引數非法異常。對訊息長度自身所佔的位元組數進行判斷,以便採用正確的方法將長度欄位寫入到ByteBuf中,共有以下6種可能:
1) 長度欄位所佔位元組為1:如果使用1個Byte位元組代表訊息長度,則最大長度需要小於256個位元組。對長度進行校驗,如果校驗失敗,則丟擲引數非法異常;若校驗通過,則建立新的ByteBuf並通過writeByte將長度值寫入到ByteBuf中;
2) 長度欄位所佔位元組為2:如果使用2個Byte位元組代表訊息長度,則最大長度需要小於65536個位元組,對長度進行校驗,如果校驗失敗,則丟擲引數非法異常;若校驗通過,則建立新的ByteBuf並通過writeShort將長度值寫入到ByteBuf中;
3) 長度欄位所佔位元組為3:如果使用3個Byte位元組代表訊息長度,則最大長度需要小於16777216個位元組,對長度進行校驗,如果校驗失敗,則丟擲引數非法異常;若校驗通過,則建立新的ByteBuf並通過writeMedium將長度值寫入到ByteBuf中;
4) 長度欄位所佔位元組為4:建立新的ByteBuf,並通過writeInt將長度值寫入到ByteBuf中;
5) 長度欄位所佔位元組為8:建立新的ByteBuf,並通過writeLong將長度值寫入到ByteBuf中;
6) 其它長度值:直接丟擲Error。
相關程式碼如下:
最後將原需要傳送的ByteBuf複製到List<Object> out中,完成編碼: