1. 程式人生 > >Netty學習-02(粘包 解包 FrameDecoder)

Netty學習-02(粘包 解包 FrameDecoder)

1、簡介
Java1.4提供了NIO使開發者可以使用Java編寫高效能的服務端程式,但使用原生的NIO API就像Linux C中網路程式設計一樣,還是需要做IO處理、協議處理等低層次工作。所以,就像C服務端程式大量使用libevent作為網路應用框架一樣,Java社群也不斷湧現出基於NIO的網路應用框架。在這其中,Jboss出品的Netty就是個中翹楚。Netty是個非同步的事件驅動網路應用框架,具有高效能、高擴充套件性等特性。Netty提供了統一的底層協議介面,使得開發者從底層的網路協議(比如TCP/IP、UDP)中解脫出來。就使用來說,開發者只要參考 Netty提供的若干例子和它的指南文件,就可以放手開發基於Netty的服務端程式了。

在Java社群,最知名的開源Java NIO框架要屬Mina和Netty,而且兩者淵源頗多,對兩者的比較自然不少。實際上,Netty的作者原來就是Mina作者之一,所以可以想到,Netty和Mina在設計理念上會有很多共同點。我對Mina沒什麼研究,但其作者介紹,Netty的設計對開發者有更友好的擴充套件性,並且效能方面要優於Mina,而Netty完善的文件也很吸引人。
所以,如果你在尋找Java NIO框架,Netty是個很不錯的選擇。本文的內容就是圍繞一個demo介紹使用Netty的點點滴滴。

2、服務端程式

2.1、ChannelHandler
服務端程式通常的處理過程是:解碼請求資料、業務邏輯處理、編碼響應。從框架角度來說,可以提供3個介面來控制並排程該處理過程;從更通用的角度來說,並不特化處理其中的每一步,而把每一步當做過濾器鏈中的一環,這也是Netty的做法。Netty對請求處理過程實現了過濾器鏈模式(ChannelPipeline),每個過濾器實現了ChannelHandler介面。Netty中有兩種請求事件流型別也做了細分:

1)downstream event
:其對應的ChannelHandler子介面是ChannelDownstreamHandler。downstream event是說從頭到尾執行ChannelPipeline中的ChannelDownstreamHandler,這一過程相當於向外傳送資料的過程。 downstream event有:”write”、”bind”、”unbind”、 “connect”、 “disconnect”、”close”。

2)upstream event:其對應的ChannelHandler子介面是ChannelUpstreamHandler。upstream event處理的事件方向和downstream event相反,這一過程相當於接收處理外來請求的過程。
upstream event有:”messageReceived”、 “exceptionCaught”、”channelOpen”、”channelClosed”、 “channelBound”、”channelUnbound”、 “channelConnected”、”writeComplete”、”channelDisconnected”、”channelInterestChanged”。

Netty中有個註釋@interface ChannelPipelineCoverage,它表示被註釋的ChannelHandler是否能新增到多個ChannelPipeline中,其可選的值是”all”和”one”。”all”表示ChannelHandler是無狀態的,可被多個ChannelPipeline共享,而”one”表示ChannelHandler只作用於單個ChannelPipeline中。但ChannelPipelineCoverage只是個註釋而已,並沒有實際的檢查作用。對於ChannelHandler是”all”還是”one”,還是根據邏輯需要而定。比如,像解碼請求handler,因為可能解碼的資料不完整,需要等待下一次讀事件來了之後再繼續解析,所以解碼請求handler就需要是”one”的(否則多個Channel共享資料就亂了)。而像業務邏輯處理hanlder通常是”all”的。

下面以一個簡單的例子說明如何編寫“解碼請求資料、業務邏輯處理、編碼響應”這一過程中涉及的ChannelHandler。該例子實現的協議格式很簡單,請求和響應流中[color=red]頭4個位元組表示後面跟的內容長度,根據該長度可得到內容體[/color]。

首先看下解碼器的實現:


public class MessageDecoder extends FrameDecoder {
 
    @Override
    protected Object decode(
            ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
        if (buffer.readableBytes() < 4) {
            return null;//(1)
        }
        int dataLength = buffer.getInt(buffer.readerIndex());
        if (buffer.readableBytes() < dataLength + 4) {
            return null;//(2)
        }
 
        buffer.skipBytes(4);//(3)
        byte[] decoded = new byte[dataLength];
        buffer.readBytes(decoded);
        String msg = new String(decoded);//(4)
        return msg;
    }
}

MessageDecoder繼承自FrameDecoder,FrameDecoder是Netty codec包中的輔助類,它是個ChannelUpstreamHandler,decode方法是FrameDecoder子類需要實現的。在上面的程式碼中,有:

(1)檢查ChannelBuffer中的位元組數,如果ChannelBuffer可讀的位元組數少於4,則返回null等待下次讀事件。
(2)繼續檢查ChannelBuffer中的位元組數,如果ChannelBuffer可讀的位元組數少於dataLength + 4,則返回null等待下次讀事件。
(3)越過dataLength的位元組。
(4)構造解碼的字串返回。


@ChannelPipelineCoverage("all")
public class MessageServerHandler extends SimpleChannelUpstreamHandler {
 
    private static final Logger logger = Logger.getLogger(
            MessageServerHandler.class.getName());
 
    @Override
    public void messageReceived(
            ChannelHandlerContext ctx, MessageEvent e) {
        if (!(e.getMessage() instanceof String)) {
            return;//(1)
        }
        String msg = (String) e.getMessage();
        System.out.println("got msg:"+msg);
        e.getChannel().write(msg);//(2)
    }
 
    @Override
    public void exceptionCaught(
            ChannelHandlerContext ctx, ExceptionEvent e) {
        logger.log(
                Level.WARNING,
                "Unexpected exception from downstream.",
                e.getCause());
        e.getChannel().close();
    }
}

MessageServerHandler是服務端業務處理handler,其繼承自SimpleChannelUpstreamHandler,並主要實現messageReceived事件。關於該類,有如下註解:

(1)該upstream事件流中,首先經過MessageDecoder,其會將decode返回的解碼後的資料構造成 MessageEvent.getMessage(),所以在handler上下文關係中,MessageEvent.getMessage()並不一定都返回ChannelBuffer型別的資料。
(2)MessageServerHandler只是簡單的將得到的msg再寫回給客戶端。e.getChannel().write(msg);操作將觸發DownstreamMessageEvent事件,也就是呼叫下面的MessageEncoder將編碼的資料返回給客戶端


@ChannelPipelineCoverage("all")
public class MessageEncoder extends OneToOneEncoder {
 
    @Override
    protected Object encode(
            ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
        if (!(msg instanceof String)) {
            return msg;//(1)
        }
 
        String res = (String)msg;
        byte[] data = res.getBytes();
        int dataLength = data.length;
        ChannelBuffer buf = ChannelBuffers.dynamicBuffer();//(2)
        buf.writeInt(dataLength);
        buf.writeBytes(data);
        return buf;//(3)
    }
}

MessageEncoder是個ChannelDownstreamHandler。對該類的註解如下:

(1)如果編碼的msg不是合法型別,就直接返回該msg,之後OneToOneEncoder會呼叫 ctx.sendDownstream(evt);來呼叫下一個ChannelDownstreamHandler。對於該例子來說,這種情況是不應該出現的。
(2)開發者建立ChannelBuffer的用武之地就是這兒了,通常使用dynamicBuffer即可,表示得到的ChannelBuffer可動態增加大小。
(3)返回編碼後的ChannelBuffer之後,OneToOneEncoder會呼叫Channels.write將資料寫回客戶端。

2.2、MessageServerPipelineFactory
建立了3個ChannelHandler,需要將他們註冊到ChannelPipeline,而ChannelPipeline又是和Channel對應的(是全域性單例還是每個Channel對應一個ChannelPipeline例項依賴於實現)。可以實現ChannelPipeline的工廠介面 ChannelPipelineFactory實現該目的。MessageServerPipelineFactory的程式碼如下:


public class MessageServerPipelineFactory implements
        ChannelPipelineFactory {
 
    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = pipeline();
 
        pipeline.addLast("decoder", new MessageDecoder());
        pipeline.addLast("encoder", new MessageEncoder());
        pipeline.addLast("handler", new MessageServerHandler());
 
        return pipeline;
    }
}

2.3、MessageServer
服務端程式就剩下啟動程式碼了,使用Netty的ServerBootstrap三下五除二完成之。


public class MessageServer {
 
    public static void main(String[] args) throws Exception {
        // Configure the server.
        ServerBootstrap bootstrap = new ServerBootstrap(
                new NioServerSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool()));
 
        // Set up the default event pipeline.
        bootstrap.setPipelineFactory(new MessageServerPipelineFactory());
 
        // Bind and start to accept incoming connections.
        bootstrap.bind(new InetSocketAddress(8080));
 
    }
}

稍加補充的是,該Server程式並不完整,它沒有處理關閉時的資源釋放,儘管暴力的來看並不一定需要做這樣的善後工作。

3、客戶端程式
客戶端程式和服務端程式處理模型上是很相似的,這裡還是付上程式碼並作簡要說明。

3.1、 ChannelHandler
客戶端是先發送資料到服務端(downstream事件流),然後是處理從服務端接收的資料(upstream事件流)。這裡有個問題是,怎麼把需要傳送的資料送到downstream事件流裡呢?這就用到了ChannelUpstreamHandler的channelConnected事件了。實現的 MessageClientHandler程式碼如下:


@ChannelPipelineCoverage("all")
public class MessageClientHandler extends SimpleChannelUpstreamHandler {
 
    private static final Logger logger = Logger.getLogger(
            MessageClientHandler.class.getName());
 
 
    @Override
    public void channelConnected(
            ChannelHandlerContext ctx, ChannelStateEvent e) {
        String message = "hello kafka0102";
        e.getChannel().write(message);
    }
 
    @Override
    public void messageReceived(
            ChannelHandlerContext ctx, MessageEvent e) {
        // Send back the received message to the remote peer.
        System.out.println("messageReceived send message "+e.getMessage());
        try {
            Thread.sleep(1000*3);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        e.getChannel().write(e.getMessage());
    }
 
    @Override
    public void exceptionCaught(
            ChannelHandlerContext ctx, ExceptionEvent e) {
        // Close the connection when an exception is raised.
        logger.log(
                Level.WARNING,
                "Unexpected exception from downstream.",
                e.getCause());
        e.getChannel().close();
    }
}

對於編碼和解碼Handler,複用MessageEncoder和MessageDecoder即可。

3.2、 MessageClientPipelineFactory


MessageClientPipelineFactory
    public class MessageClientPipelineFactory implements
        ChannelPipelineFactory {
 
    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = pipeline();
 
        pipeline.addLast("decoder", new MessageDecoder());
        pipeline.addLast("encoder", new MessageEncoder());
        pipeline.addLast("handler", new MessageClientHandler());
 
        return pipeline;
    }
}


3.3、MessageClient

public class MessageClient {
 
    public static void main(String[] args) throws Exception {
        // Parse options.
        String host = "127.0.0.1";
        int port = 8080;
        // Configure the client.
        ClientBootstrap bootstrap = new ClientBootstrap(
                new NioClientSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool()));
        // Set up the event pipeline factory.
        bootstrap.setPipelineFactory(new MessageClientPipelineFactory());
        // Start the connection attempt.
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
        // Wait until the connection is closed or the connection attempt fails.
        future.getChannel().getCloseFuture().awaitUninterruptibly();
        // Shut down thread pools to exit.
        bootstrap.releaseExternalResources();
    }
}

在寫客戶端例子時,我想像的程式碼並不是這樣的,對客戶端的程式碼我也沒做過多的研究,所以也可能沒有找到更好的解決方案。在上面的例子中,bootstrap.connect方法中會觸發實際的連線操作,接著觸發 MessageClientHandler.channelConnected,使整個過程運轉起來。但是,我想要的是一個連線池,並且如何寫資料也不應該在channelConnected中,這樣對於動態的資料,只能在建構函式中傳遞需要寫的資料了。但到現在,我還不清楚如何將連線池和 ChannelPipeline有效的結合起來。或許,這樣的需求可以跨過Netty來實現。

4、總結
關於Netty的初步使用,尚且總結到這裡。關於這篇文章,寫得斷斷續續,以至於到後來我都沒興趣把內容都整理出來。當然,這多少也是因為我是先整理 Netty原理方面的東西所致。我也只能卑微的期望,該文對Netty入門者會有些許幫助。


=============================== 華麗的終止符 ================================