Netty學習-02(粘包 解包 FrameDecoder)
阿新 • • 發佈:2019-02-03
1、簡介
Java1.4提供了NIO使開發者可以使用Java編寫高效能的服務端程式,但使用原生的NIO API就像Linux C中網路程式設計一樣,還是需要做IO處理、協議處理等低層次工作。所以,就像C服務端程式大量使用libevent作為網路應用框架一樣,Java社群也不斷湧現出基於NIO的網路應用框架。在這其中,Jboss出品的Netty就是個中翹楚。Netty是個非同步的事件驅動網路應用框架,具有高效能、高擴充套件性等特性。Netty提供了統一的底層協議介面,使得開發者從底層的網路協議(比如TCP/IP、UDP)中解脫出來。就使用來說,開發者只要參考 Netty提供的若干例子和它的指南文件,就可以放手開發基於Netty的服務端程式了。
在Java社群,最知名的開源Java NIO框架要屬Mina和Netty,而且兩者淵源頗多,對兩者的比較自然不少。實際上,Netty的作者原來就是Mina作者之一,所以可以想到,Netty和Mina在設計理念上會有很多共同點。我對Mina沒什麼研究,但其作者介紹,Netty的設計對開發者有更友好的擴充套件性,並且效能方面要優於Mina,而Netty完善的文件也很吸引人。 所以,如果你在尋找Java NIO框架,Netty是個很不錯的選擇。本文的內容就是圍繞一個demo介紹使用Netty的點點滴滴。
2、服務端程式
2.1、ChannelHandler
服務端程式通常的處理過程是:解碼請求資料、業務邏輯處理、編碼響應。從框架角度來說,可以提供3個介面來控制並排程該處理過程;從更通用的角度來說,並不特化處理其中的每一步,而把每一步當做過濾器鏈中的一環,這也是Netty的做法。Netty對請求處理過程實現了過濾器鏈模式(ChannelPipeline),每個過濾器實現了ChannelHandler介面。Netty中有兩種請求事件流型別也做了細分:
1)downstream event :其對應的ChannelHandler子介面是ChannelDownstreamHandler。downstream event是說從頭到尾執行ChannelPipeline中的ChannelDownstreamHandler,這一過程相當於向外傳送資料的過程。 downstream event有:”write”、”bind”、”unbind”、 “connect”、 “disconnect”、”close”。
2)upstream event:其對應的ChannelHandler子介面是ChannelUpstreamHandler。upstream event處理的事件方向和downstream event相反,這一過程相當於接收處理外來請求的過程。 upstream event有:”messageReceived”、 “exceptionCaught”、”channelOpen”、”channelClosed”、 “channelBound”、”channelUnbound”、 “channelConnected”、”writeComplete”、”channelDisconnected”、”channelInterestChanged”。
Netty中有個註釋@interface ChannelPipelineCoverage,它表示被註釋的ChannelHandler是否能新增到多個ChannelPipeline中,其可選的值是”all”和”one”。”all”表示ChannelHandler是無狀態的,可被多個ChannelPipeline共享,而”one”表示ChannelHandler只作用於單個ChannelPipeline中。但ChannelPipelineCoverage只是個註釋而已,並沒有實際的檢查作用。對於ChannelHandler是”all”還是”one”,還是根據邏輯需要而定。比如,像解碼請求handler,因為可能解碼的資料不完整,需要等待下一次讀事件來了之後再繼續解析,所以解碼請求handler就需要是”one”的(否則多個Channel共享資料就亂了)。而像業務邏輯處理hanlder通常是”all”的。
下面以一個簡單的例子說明如何編寫“解碼請求資料、業務邏輯處理、編碼響應”這一過程中涉及的ChannelHandler。該例子實現的協議格式很簡單,請求和響應流中[color=red]頭4個位元組表示後面跟的內容長度,根據該長度可得到內容體[/color]。
首先看下解碼器的實現:
MessageDecoder繼承自FrameDecoder,FrameDecoder是Netty codec包中的輔助類,它是個ChannelUpstreamHandler,decode方法是FrameDecoder子類需要實現的。在上面的程式碼中,有:
(1)檢查ChannelBuffer中的位元組數,如果ChannelBuffer可讀的位元組數少於4,則返回null等待下次讀事件。
(2)繼續檢查ChannelBuffer中的位元組數,如果ChannelBuffer可讀的位元組數少於dataLength + 4,則返回null等待下次讀事件。
(3)越過dataLength的位元組。
(4)構造解碼的字串返回。
MessageServerHandler是服務端業務處理handler,其繼承自SimpleChannelUpstreamHandler,並主要實現messageReceived事件。關於該類,有如下註解:
(1)該upstream事件流中,首先經過MessageDecoder,其會將decode返回的解碼後的資料構造成 MessageEvent.getMessage(),所以在handler上下文關係中,MessageEvent.getMessage()並不一定都返回ChannelBuffer型別的資料。
(2)MessageServerHandler只是簡單的將得到的msg再寫回給客戶端。e.getChannel().write(msg);操作將觸發DownstreamMessageEvent事件,也就是呼叫下面的MessageEncoder將編碼的資料返回給客戶端。
MessageEncoder是個ChannelDownstreamHandler。對該類的註解如下:
(1)如果編碼的msg不是合法型別,就直接返回該msg,之後OneToOneEncoder會呼叫 ctx.sendDownstream(evt);來呼叫下一個ChannelDownstreamHandler。對於該例子來說,這種情況是不應該出現的。
(2)開發者建立ChannelBuffer的用武之地就是這兒了,通常使用dynamicBuffer即可,表示得到的ChannelBuffer可動態增加大小。
(3)返回編碼後的ChannelBuffer之後,OneToOneEncoder會呼叫Channels.write將資料寫回客戶端。
2.2、MessageServerPipelineFactory
建立了3個ChannelHandler,需要將他們註冊到ChannelPipeline,而ChannelPipeline又是和Channel對應的(是全域性單例還是每個Channel對應一個ChannelPipeline例項依賴於實現)。可以實現ChannelPipeline的工廠介面 ChannelPipelineFactory實現該目的。MessageServerPipelineFactory的程式碼如下:
2.3、MessageServer
服務端程式就剩下啟動程式碼了,使用Netty的ServerBootstrap三下五除二完成之。
稍加補充的是,該Server程式並不完整,它沒有處理關閉時的資源釋放,儘管暴力的來看並不一定需要做這樣的善後工作。
3、客戶端程式
客戶端程式和服務端程式處理模型上是很相似的,這裡還是付上程式碼並作簡要說明。
3.1、 ChannelHandler
客戶端是先發送資料到服務端(downstream事件流),然後是處理從服務端接收的資料(upstream事件流)。這裡有個問題是,怎麼把需要傳送的資料送到downstream事件流裡呢?這就用到了ChannelUpstreamHandler的channelConnected事件了。實現的 MessageClientHandler程式碼如下:
對於編碼和解碼Handler,複用MessageEncoder和MessageDecoder即可。
3.2、 MessageClientPipelineFactory
3.3、MessageClient
在寫客戶端例子時,我想像的程式碼並不是這樣的,對客戶端的程式碼我也沒做過多的研究,所以也可能沒有找到更好的解決方案。在上面的例子中,bootstrap.connect方法中會觸發實際的連線操作,接著觸發 MessageClientHandler.channelConnected,使整個過程運轉起來。但是,我想要的是一個連線池,並且如何寫資料也不應該在channelConnected中,這樣對於動態的資料,只能在建構函式中傳遞需要寫的資料了。但到現在,我還不清楚如何將連線池和 ChannelPipeline有效的結合起來。或許,這樣的需求可以跨過Netty來實現。
4、總結
關於Netty的初步使用,尚且總結到這裡。關於這篇文章,寫得斷斷續續,以至於到後來我都沒興趣把內容都整理出來。當然,這多少也是因為我是先整理 Netty原理方面的東西所致。我也只能卑微的期望,該文對Netty入門者會有些許幫助。
=============================== 華麗的終止符 ================================
Java1.4提供了NIO使開發者可以使用Java編寫高效能的服務端程式,但使用原生的NIO API就像Linux C中網路程式設計一樣,還是需要做IO處理、協議處理等低層次工作。所以,就像C服務端程式大量使用libevent作為網路應用框架一樣,Java社群也不斷湧現出基於NIO的網路應用框架。在這其中,Jboss出品的Netty就是個中翹楚。Netty是個非同步的事件驅動網路應用框架,具有高效能、高擴充套件性等特性。Netty提供了統一的底層協議介面,使得開發者從底層的網路協議(比如TCP/IP、UDP)中解脫出來。就使用來說,開發者只要參考 Netty提供的若干例子和它的指南文件,就可以放手開發基於Netty的服務端程式了。
在Java社群,最知名的開源Java NIO框架要屬Mina和Netty,而且兩者淵源頗多,對兩者的比較自然不少。實際上,Netty的作者原來就是Mina作者之一,所以可以想到,Netty和Mina在設計理念上會有很多共同點。我對Mina沒什麼研究,但其作者介紹,Netty的設計對開發者有更友好的擴充套件性,並且效能方面要優於Mina,而Netty完善的文件也很吸引人。
2、服務端程式
2.1、ChannelHandler
服務端程式通常的處理過程是:解碼請求資料、業務邏輯處理、編碼響應。從框架角度來說,可以提供3個介面來控制並排程該處理過程;從更通用的角度來說,並不特化處理其中的每一步,而把每一步當做過濾器鏈中的一環,這也是Netty的做法。Netty對請求處理過程實現了過濾器鏈模式(ChannelPipeline),每個過濾器實現了ChannelHandler介面。Netty中有兩種請求事件流型別也做了細分:
1)downstream event
2)upstream event:其對應的ChannelHandler子介面是ChannelUpstreamHandler。upstream event處理的事件方向和downstream event相反,這一過程相當於接收處理外來請求的過程。
Netty中有個註釋@interface ChannelPipelineCoverage,它表示被註釋的ChannelHandler是否能新增到多個ChannelPipeline中,其可選的值是”all”和”one”。”all”表示ChannelHandler是無狀態的,可被多個ChannelPipeline共享,而”one”表示ChannelHandler只作用於單個ChannelPipeline中。但ChannelPipelineCoverage只是個註釋而已,並沒有實際的檢查作用。對於ChannelHandler是”all”還是”one”,還是根據邏輯需要而定。比如,像解碼請求handler,因為可能解碼的資料不完整,需要等待下一次讀事件來了之後再繼續解析,所以解碼請求handler就需要是”one”的(否則多個Channel共享資料就亂了)。而像業務邏輯處理hanlder通常是”all”的。
下面以一個簡單的例子說明如何編寫“解碼請求資料、業務邏輯處理、編碼響應”這一過程中涉及的ChannelHandler。該例子實現的協議格式很簡單,請求和響應流中[color=red]頭4個位元組表示後面跟的內容長度,根據該長度可得到內容體[/color]。
首先看下解碼器的實現:
public class MessageDecoder extends FrameDecoder {
@Override
protected Object decode(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
if (buffer.readableBytes() < 4) {
return null;//(1)
}
int dataLength = buffer.getInt(buffer.readerIndex());
if (buffer.readableBytes() < dataLength + 4) {
return null;//(2)
}
buffer.skipBytes(4);//(3)
byte[] decoded = new byte[dataLength];
buffer.readBytes(decoded);
String msg = new String(decoded);//(4)
return msg;
}
}
MessageDecoder繼承自FrameDecoder,FrameDecoder是Netty codec包中的輔助類,它是個ChannelUpstreamHandler,decode方法是FrameDecoder子類需要實現的。在上面的程式碼中,有:
(1)檢查ChannelBuffer中的位元組數,如果ChannelBuffer可讀的位元組數少於4,則返回null等待下次讀事件。
(2)繼續檢查ChannelBuffer中的位元組數,如果ChannelBuffer可讀的位元組數少於dataLength + 4,則返回null等待下次讀事件。
(3)越過dataLength的位元組。
(4)構造解碼的字串返回。
@ChannelPipelineCoverage("all")
public class MessageServerHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(
MessageServerHandler.class.getName());
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
if (!(e.getMessage() instanceof String)) {
return;//(1)
}
String msg = (String) e.getMessage();
System.out.println("got msg:"+msg);
e.getChannel().write(msg);//(2)
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}
MessageServerHandler是服務端業務處理handler,其繼承自SimpleChannelUpstreamHandler,並主要實現messageReceived事件。關於該類,有如下註解:
(1)該upstream事件流中,首先經過MessageDecoder,其會將decode返回的解碼後的資料構造成 MessageEvent.getMessage(),所以在handler上下文關係中,MessageEvent.getMessage()並不一定都返回ChannelBuffer型別的資料。
(2)MessageServerHandler只是簡單的將得到的msg再寫回給客戶端。e.getChannel().write(msg);操作將觸發DownstreamMessageEvent事件,也就是呼叫下面的MessageEncoder將編碼的資料返回給客戶端。
@ChannelPipelineCoverage("all")
public class MessageEncoder extends OneToOneEncoder {
@Override
protected Object encode(
ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if (!(msg instanceof String)) {
return msg;//(1)
}
String res = (String)msg;
byte[] data = res.getBytes();
int dataLength = data.length;
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();//(2)
buf.writeInt(dataLength);
buf.writeBytes(data);
return buf;//(3)
}
}
MessageEncoder是個ChannelDownstreamHandler。對該類的註解如下:
(1)如果編碼的msg不是合法型別,就直接返回該msg,之後OneToOneEncoder會呼叫 ctx.sendDownstream(evt);來呼叫下一個ChannelDownstreamHandler。對於該例子來說,這種情況是不應該出現的。
(2)開發者建立ChannelBuffer的用武之地就是這兒了,通常使用dynamicBuffer即可,表示得到的ChannelBuffer可動態增加大小。
(3)返回編碼後的ChannelBuffer之後,OneToOneEncoder會呼叫Channels.write將資料寫回客戶端。
2.2、MessageServerPipelineFactory
建立了3個ChannelHandler,需要將他們註冊到ChannelPipeline,而ChannelPipeline又是和Channel對應的(是全域性單例還是每個Channel對應一個ChannelPipeline例項依賴於實現)。可以實現ChannelPipeline的工廠介面 ChannelPipelineFactory實現該目的。MessageServerPipelineFactory的程式碼如下:
public class MessageServerPipelineFactory implements
ChannelPipelineFactory {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline();
pipeline.addLast("decoder", new MessageDecoder());
pipeline.addLast("encoder", new MessageEncoder());
pipeline.addLast("handler", new MessageServerHandler());
return pipeline;
}
}
2.3、MessageServer
服務端程式就剩下啟動程式碼了,使用Netty的ServerBootstrap三下五除二完成之。
public class MessageServer {
public static void main(String[] args) throws Exception {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the default event pipeline.
bootstrap.setPipelineFactory(new MessageServerPipelineFactory());
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(8080));
}
}
稍加補充的是,該Server程式並不完整,它沒有處理關閉時的資源釋放,儘管暴力的來看並不一定需要做這樣的善後工作。
3、客戶端程式
客戶端程式和服務端程式處理模型上是很相似的,這裡還是付上程式碼並作簡要說明。
3.1、 ChannelHandler
客戶端是先發送資料到服務端(downstream事件流),然後是處理從服務端接收的資料(upstream事件流)。這裡有個問題是,怎麼把需要傳送的資料送到downstream事件流裡呢?這就用到了ChannelUpstreamHandler的channelConnected事件了。實現的 MessageClientHandler程式碼如下:
@ChannelPipelineCoverage("all")
public class MessageClientHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(
MessageClientHandler.class.getName());
@Override
public void channelConnected(
ChannelHandlerContext ctx, ChannelStateEvent e) {
String message = "hello kafka0102";
e.getChannel().write(message);
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
// Send back the received message to the remote peer.
System.out.println("messageReceived send message "+e.getMessage());
try {
Thread.sleep(1000*3);
} catch (Exception ex) {
ex.printStackTrace();
}
e.getChannel().write(e.getMessage());
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
// Close the connection when an exception is raised.
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}
對於編碼和解碼Handler,複用MessageEncoder和MessageDecoder即可。
3.2、 MessageClientPipelineFactory
MessageClientPipelineFactory
public class MessageClientPipelineFactory implements
ChannelPipelineFactory {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline();
pipeline.addLast("decoder", new MessageDecoder());
pipeline.addLast("encoder", new MessageEncoder());
pipeline.addLast("handler", new MessageClientHandler());
return pipeline;
}
}
3.3、MessageClient
public class MessageClient {
public static void main(String[] args) throws Exception {
// Parse options.
String host = "127.0.0.1";
int port = 8080;
// Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new MessageClientPipelineFactory());
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection is closed or the connection attempt fails.
future.getChannel().getCloseFuture().awaitUninterruptibly();
// Shut down thread pools to exit.
bootstrap.releaseExternalResources();
}
}
在寫客戶端例子時,我想像的程式碼並不是這樣的,對客戶端的程式碼我也沒做過多的研究,所以也可能沒有找到更好的解決方案。在上面的例子中,bootstrap.connect方法中會觸發實際的連線操作,接著觸發 MessageClientHandler.channelConnected,使整個過程運轉起來。但是,我想要的是一個連線池,並且如何寫資料也不應該在channelConnected中,這樣對於動態的資料,只能在建構函式中傳遞需要寫的資料了。但到現在,我還不清楚如何將連線池和 ChannelPipeline有效的結合起來。或許,這樣的需求可以跨過Netty來實現。
4、總結
關於Netty的初步使用,尚且總結到這裡。關於這篇文章,寫得斷斷續續,以至於到後來我都沒興趣把內容都整理出來。當然,這多少也是因為我是先整理 Netty原理方面的東西所致。我也只能卑微的期望,該文對Netty入門者會有些許幫助。
=============================== 華麗的終止符 ================================