1. 程式人生 > >apache mina 學習(十)-----Codec Filter

apache mina 學習(十)-----Codec Filter

首先明白為什麼用 ProtocolCodecFilter:

1、TCP保證了按順序傳輸所有的資料包,但是不能保證傳送端進行了一個寫操作會導致接收端相應的進行一個讀操作。

2、在mina中如果沒有ProtocolCodecFilter,傳送端的 一個IoSession.write(Object message)操作會觸發接收端的多個messageReceived(IoSessionsession, Object message) 事件,多個IoSession.write(Object message)操作也可能會導致只觸發了一個messageReceived事件,這不就亂套了嘛。

3、很多時候我們需要知道當前message的終止位置和下一個message的起始位置。

4、分離基礎協議邏輯和業務邏輯。

一般來說我們如果想從一長串位元組流中得到我們要的資料並組織成業務上的pojo,我們一般用以下幾種方法:

1、採用固定長度的message

2、用固定的頭標示body的長度

3、用基於文字的標示,如換行、回車等

我們基於前兩種方式的會比較多一些。

下面是官方的一個例子:

首先定義一下網路協議採用TCP/IP協議,客戶端傳送的訊息的格式如下:

4 bytes

4 bytes

4 bytes

width

height

numchars

開頭4個位元組是一個圖片的寬度,中間4個位元組是圖片的高度,最後4個位元組是圖片中字元的數量,我們可以暫且認為這是個圖片驗證碼的小demo。

服務端發回給客戶端的就是圖片,當然圖片是通過位元組流的方式發過來的,訊息體如下:

4 bytes

variable length body

4 bytes

variable length body

length1

image1

length2

image2

開頭的四個位元組代表第一張圖片的長度,然後是圖片的具體內容,然後是第二張圖片的長度和第二張圖片的具體內容。

然後定義兩個實體,用來封裝服務端的響應和客戶端的請求:

public class ImageRequest {

    private int width;

    private int height;

    private int numberOfCharacters;

    public ImageRequest(int width, int height, int numberOfCharacters) {

        this.width = width;

        this.height = height;

        this.numberOfCharacters = numberOfCharacters;

    }

    public int getWidth() {

        return width;

    }

    public int getHeight() {

        return height;

    }

    public int getNumberOfCharacters() {

        return numberOfCharacters;

    }

}

public class ImageResponse {

    private BufferedImageimage1;

    private BufferedImageimage2;

    publicImageResponse(BufferedImage image1, BufferedImage image2) {

        this.image1= image1;

        this.image2= image2;

    }

    public BufferedImagegetImage1() {

        returnimage1;

    }

    public BufferedImagegetImage2() {

        returnimage2;

    }

}

然後需要把這兩個業務的bean轉換為事先定義的message格式,這就是傳說中的encode和decode:

首先對ImageRequest進行encode,mina中需要實現ProtocolEncode介面,並重寫encode方法:

public class ImageRequestEncoder implements ProtocolEncoder {

    public voidencode(IoSession session, Object message,ProtocolEncoderOutput out) throws Exception {

        ImageRequest request =(ImageRequest) message;

        IoBuffer buffer =IoBuffer.allocate(12, false);

       buffer.putInt(request.getWidth());

       buffer.putInt(request.getHeight());

       buffer.putInt(request.getNumberOfCharacters());

        buffer.flip();

        out.write(buffer);

    }

    public voiddispose(IoSession session) throws Exception {

        // nothing to dispose

    }

}

同樣,我們需要實現一個解碼器來把底層傳輸的byte轉換為我們的業務物件ImageRequest,注意要實現CumulativeProtocolDecoder的doDecode方法:

public class ImageRequestDecoder extendsCumulativeProtocolDecoder {

    protected booleandoDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {

        if (in.remaining()>= 12) {

            int width =in.getInt();

            int height =in.getInt();

            intnumberOfCharachters = in.getInt();

            ImageRequest request = newImageRequest(width, height, numberOfCharachters);

            out.write(request);

            return true;

        } else {

            return false;

        }

    }

}

同理,對ImageResponse進行編碼和解碼:

public class ImageResponseEncoder extendsProtocolEncoderAdapter {

    public voidencode(IoSession session, Object message,ProtocolEncoderOutput out) throws Exception {

        ImageResponse imageResponse= (ImageResponse) message;

        byte[] bytes1 =getBytes(imageResponse.getImage1());

        byte[] bytes2 =getBytes(imageResponse.getImage2());

        int capacity =bytes1.length + bytes2.length + 8;

        IoBuffer buffer =IoBuffer.allocate(capacity, false);

        buffer.setAutoExpand(true);//設定自動擴充

        buffer.putInt(bytes1.length);

        buffer.put(bytes1);

       buffer.putInt(bytes2.length);

        buffer.put(bytes2);

        buffer.flip();

        out.write(buffer);

    }

    private byte[]getBytes(BufferedImage image) throws IOException {

        ByteArrayOutputStream baos =newByteArrayOutputStream();

        ImageIO.write(image, "PNG", baos);

        returnbaos.toByteArray();

    }

}

public class ImageResponseDecoder extendsCumulativeProtocolDecoder {

    private static final String DECODER_STATE_KEY= ImageResponseDecoder.class.getName() + ".STATE";//儲存decoding的進度

    public static final int MAX_IMAGE_SIZE= 5 * 1024 * 1024;

    private static classDecoderState {

        BufferedImage image1;

    }

    protected booleandoDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {

        DecoderState decoderState =(DecoderState) session.getAttribute(DECODER_STATE_KEY);

        if (decoderState== null) {

            decoderState = new DecoderState();

           session.setAttribute(DECODER_STATE_KEY, decoderState);

        }

        if(decoderState.image1 == null) {

            // try to read firstimage

            if(in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) {//這個方法對於有長度字首的message解析很好用

                decoderState.image1= readImage(in);

            } else {

                // not enough dataavailable to read first image

                return false;

            }

        }

        if(decoderState.image1 != null) {

            // try to read second image

            if(in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) {

                BufferedImage image2= readImage(in);

                ImageResponseimageResponse = new ImageResponse(decoderState.image1,image2);

                out.write(imageResponse);

                decoderState.image1= null;

                return true;

            } else {

                // not enough dataavailable to read second image

                return false;

            }

        }

        return false;

    }

    private BufferedImagereadImage(IoBuffer in) throws IOException {

        int length =in.getInt();

        byte[] bytes = new byte[length];

        in.get(bytes);

        ByteArrayInputStream bais = newByteArrayInputStream(bytes);

        returnImageIO.read(bais);

    }

}

然後把這四個編碼解碼器註冊為自己的CodecFactory:

public class ImageCodecFactory implementsProtocolCodecFactory {

    private ProtocolEncoderencoder;

    private ProtocolDecoderdecoder;

    publicImageCodecFactory(boolean client) {

        if (client) {

            encoder = newImageRequestEncoder();

            decoder = newImageResponseDecoder();

        } else {

            encoder = newImageResponseEncoder();

            decoder = newImageRequestDecoder();

        }

    }

    public ProtocolEncodergetEncoder(IoSession ioSession) throws Exception {

        return encoder;

    }

    public ProtocolDecodergetDecoder(IoSession ioSession) throws Exception {

        return decoder;

    }

}

然後是客戶端和服務端的測試程式碼:

public class ImageServer {

    public static final int PORT = 33789;

    public static void main(String[] args) throws IOException {

        ImageServerIoHandler handler= new ImageServerIoHandler();

        NioSocketAcceptor acceptor =newNioSocketAcceptor();

       acceptor.getFilterChain().addLast("protocol", newProtocolCodecFilter(new ImageCodecFactory(false)));

        acceptor.setLocalAddress(newInetSocketAddress(PORT));

       acceptor.setHandler(handler);

        acceptor.bind();

        System.out.println("server islistenig at port " + PORT);

    }

}

public class ImageClient extendsIoHandlerAdapter {

    public static final int CONNECT_TIMEOUT =3000;

    private String host;

    private int port;

    private SocketConnectorconnector;

    private IoSession session;

    private ImageListenerimageListener;

    public ImageClient(String host, int port, ImageListener imageListener) {

        this.host= host;

        this.port= port;

        this.imageListener= imageListener;

        connector = newNioSocketConnector();

        connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(newImageCodecFactory(true)));

        connector.setHandler(this);

    }

    public voidmessageReceived(IoSession session, Object message) throws Exception {

        ImageResponse response =(ImageResponse) message;

       imageListener.onImages(response.getImage1(), response.getImage2());

    }

...

public class ImageServerIoHandler extends IoHandlerAdapter {

    private final static String characters = "mina rocksabcdefghijklmnopqrstuvwxyz0123456789";

    public static final String INDEX_KEY =ImageServerIoHandler.class.getName() + ".INDEX";

    private Logger logger =LoggerFactory.getLogger(this.getClass());

    public voidsessionOpened(IoSession session) throws Exception {

        session.setAttribute(INDEX_KEY, 0);

    }

    public voidexceptionCaught(IoSession session, Throwable cause) throwsException {

        IoSessionLogger sessionLogger =IoSessionLogger.getLogger(session, logger);

        sessionLogger.warn(cause.getMessage(),cause);

    }

    public voidmessageReceived(IoSession session, Object message) throws Exception {

        ImageRequest request = (ImageRequest)message;

        String text1= generateString(session, request.getNumberOfCharacters());

        String text2= generateString(session, request.getNumberOfCharacters());

        BufferedImage image1 =createImage(request, text1);

        BufferedImage image2 =createImage(request, text2);

        ImageResponse response = new ImageResponse(image1, image2);

        session.write(response);

    }

    private BufferedImagecreateImage(ImageRequest request, String text) {

        BufferedImage image = new BufferedImage(request.getWidth(), request.getHeight(),BufferedImage.TYPE_BYTE_INDEXED);

        Graphics graphics =image.createGraphics();

        graphics.setColor(Color.YELLOW);

        graphics.fillRect(0, 0,image.getWidth(), image.getHeight());

        Font serif = new Font("serif", Font.PLAIN, 30);

        graphics.setFont(serif);

        graphics.setColor(Color.BLUE);

        graphics.drawString(text, 10, 50);

        returnimage;

    }

    private String generateString(IoSession session, int length) {

        Integer index= (Integer) session.getAttribute(INDEX_KEY);

        StringBuffer buffer = new StringBuffer(length);

        while(buffer.length() < length) {

           buffer.append(characters.charAt(index));

            index++;

            if(index >= characters.length()) {

                index = 0;

            }

        }

        session.setAttribute(INDEX_KEY, index);

        returnbuffer.toString();

    }

}

執行結果: