Mina Core 09-編解碼過濾器
本教程試圖解釋為什麼以及如何使用ProtocolCodecFilter。
為什麼要使用ProtocolCodecFilter?
TCP以正確的順序保證所有資料包的傳遞。但是不能保證傳送方的一次寫操作會導致接收方發生一次讀事件。請參閱
http://en.wikipedia.org/wiki/IPv4#Fragmentation_and_reassembly
和http://en.wikipedia.org/wiki/Nagle%27s_algorithm
在MINA術語中:沒有ProtocolCodecFilter一次呼叫IoSession.write(Object message)傳送方可以在接收方上產生多個messageReceived(IoSession會話,物件訊息)事件;多次呼叫IoSession.write(Object message)可能會導致單個messageReceived事件。當客戶端和伺服器在同一主機(或本地網路)上執行時,您可能不會遇到此行為,但您的應用程式應該能夠應對此問題。
大多數網路應用程式需要一種方法來查詢當前訊息的結束位置以及下一條訊息的開始位置。
您可以在IoHandler中實現所有這些邏輯,但新增ProtocolCodecFilter將使您的程式碼更清晰,更易於維護。
它允許您將協議邏輯與業務邏輯(IoHandler)分開。
怎麼樣 做?
您的應用程式基本上只是接收一堆位元組,您需要將這些位元組轉換為訊息(更高級別的物件)。
將位元組流拆分為訊息有三種常用技術:
1.使用固定長度的訊息
2.使用固定長度的標題,指示身體的長度
3.使用分隔符;例如,許多基於文字的協議在每條訊息之後附加換行符(或CR LF對)(
在本教程中,我們將使用第一種和第二種方法,因為它們肯定更容易實現。之後我們將看一下使用分隔符。
例子
我們將開發一個(相當無用的)圖形計費伺服器來說明如何實現自己的協議編解碼器(ProtocolEncoder,ProtocolDecoder和ProtocolCodecFactory)。協議非常簡單。這是請求訊息的佈局:
4 bytes |
4 bytes |
4 bytes |
width |
height |
numchars |
1.width:請求影象的寬度(網路位元組順序中的整數)
2.height:請求影象的高度(網路位元組順序中的整數)
3.numchars:要生成的字元數(網路位元組順序中的整數)
伺服器響應所請求尺寸的兩個影象,並在其上繪製所請求的字元數。這是響應訊息的佈局:
4 bytes |
variable length body |
4 bytes |
variable length body |
length1 |
image1 |
length2 |
image2 |
我們編碼和解碼請求和響應所需的類概述:
1.ImageRequest:一個簡單的POJO,表示對ImageServer的請求。
2.ImageRequestEncoder:將ImageRequest物件編碼為特定於協議的資料(由客戶端使用)
3.ImageRequestDecoder:將特定於協議的資料解碼為ImageRequest物件(由伺服器使用)
4.ImageResponse:一個簡單的POJO,表示來自ImageServer的響應。
5.ImageResponseEncoder:伺服器用於編碼ImageResponse物件
6.ImageResponseDecoder:客戶端用於解碼ImageResponse物件 ImageCodecFactory:這個類建立了necesarry編碼器和解碼器
這是ImageRequest類:
public class ImageRequest { private int width; private int height; private int numberOfCharacters; public ImageRequest(int width, int height, int numberOfCharacters) { this.width = width; this.height = height; this.numberOfCharacters = numberOfCharacters; } public int getWidth() { return width; } public int getHeight() { return height; } public int getNumberOfCharacters() { return numberOfCharacters; } }
編碼通常比解碼簡單,所以讓我們從ImageRequestEncoder開始:
public class ImageRequestEncoder implements ProtocolEncoder { public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { ImageRequest request = (ImageRequest) message; IoBuffer buffer = IoBuffer.allocate(12, false); buffer.putInt(request.getWidth()); buffer.putInt(request.getHeight()); buffer.putInt(request.getNumberOfCharacters()); buffer.flip(); out.write(buffer); } public void dispose(IoSession session) throws Exception { // nothing to dispose } }
備註:
1.MINA將為IoSession寫入佇列中的所有訊息呼叫encode函式。由於我們的客戶端只會編寫ImageRequest物件,因此我們可以安全地將訊息轉換為ImageRequest。
2.我們從堆中分配一個新的IoBuffer。最好避免使用直接緩衝區,因為通常堆緩衝區的效能更好。見http://issues.apache.org/jira/browse/DIRMINA-289)
3.您不必釋放緩衝區,MINA會為您完成,請參閱http://mina.apache.org/mina-project/apidocs/org/apache/mina/core/buffer/IoBuffer.html
4.在dispose()方法中,您應釋放在編碼期間為指定會話獲取的所有資源。如果沒有任何可處理的內容,您可以讓編碼器繼承自ProtocolEncoderAdapter。
現在讓我們來看看解碼器。 CumulativeProtocolDecoder是編寫自己的解碼器的一個很好的幫助:它將緩衝所有傳入的資料,直到你的解碼器決定它可以用它做什麼。在這種情況下,訊息具有固定大小,因此最容易等到所有資料都可用:
public class ImageRequestDecoder extends CumulativeProtocolDecoder { protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { if (in.remaining() >= 12) { int width = in.getInt(); int height = in.getInt(); int numberOfCharachters = in.getInt(); ImageRequest request = new ImageRequest(width, height, numberOfCharachters); out.write(request); return true; } else { return false; } } }
備註:
1. 每次解碼完整的訊息時,都應該將其寫入ProtocolDecoderOutput;這些訊息將沿著過濾器鏈傳播,並最終到達您的IoHandler.messageReceived方法
2.您不負責釋出IoBuffer
3. 當沒有足夠的資料可用於解碼訊息時,只返回false
響應也是一個非常簡單的POJO:
public class ImageResponse { private BufferedImage image1; private BufferedImage image2; public ImageResponse(BufferedImage image1, BufferedImage image2) { this.image1 = image1; this.image2 = image2; } public BufferedImage getImage1() { return image1; } public BufferedImage getImage2() { return image2; } }
編碼響應也很簡單:
public class ImageResponseEncoder extends ProtocolEncoderAdapter { public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { ImageResponse imageResponse = (ImageResponse) message; byte[] bytes1 = getBytes(imageResponse.getImage1()); byte[] bytes2 = getBytes(imageResponse.getImage2()); int capacity = bytes1.length + bytes2.length + 8; IoBuffer buffer = IoBuffer.allocate(capacity, false); buffer.setAutoExpand(true); buffer.putInt(bytes1.length); buffer.put(bytes1); buffer.putInt(bytes2.length); buffer.put(bytes2); buffer.flip(); out.write(buffer); } private byte[] getBytes(BufferedImage image) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ImageIO.write(image, "PNG", baos); return baos.toByteArray(); } }
備註:
當事先無法計算IoBuffer的長度時,可以通過呼叫buffer.setAutoExpand(true)來使用自動擴充套件緩衝區;
現在讓我們看一下解碼響應:
public class ImageResponseDecoder extends CumulativeProtocolDecoder { private static final String DECODER_STATE_KEY = ImageResponseDecoder.class.getName() + ".STATE"; public static final int MAX_IMAGE_SIZE = 5 * 1024 * 1024; private static class DecoderState { BufferedImage image1; } protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { DecoderState decoderState = (DecoderState) session.getAttribute(DECODER_STATE_KEY); if (decoderState == null) { decoderState = new DecoderState(); session.setAttribute(DECODER_STATE_KEY, decoderState); } if (decoderState.image1 == null) { // try to read first image if (in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) { decoderState.image1 = readImage(in); } else { // not enough data available to read first image return false; } } if (decoderState.image1 != null) { // try to read second image if (in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) { BufferedImage image2 = readImage(in); ImageResponse imageResponse = new ImageResponse(decoderState.image1, image2); out.write(imageResponse); decoderState.image1 = null; return true; } else { // not enough data available to read second image return false; } } return false; } private BufferedImage readImage(IoBuffer in) throws IOException { int length = in.getInt(); byte[] bytes = new byte[length]; in.get(bytes); ByteArrayInputStream bais = new ByteArrayInputStream(bytes); return ImageIO.read(bais); } }
備註:
1.我們將解碼過程的狀態儲存在會話屬性中。也可以將此狀態儲存在Decoder物件本身中,但這有幾個缺點:
a)每個IoSession都需要自己的Decoder例項
b)MINA確保永遠不會有多個執行緒同時為同一個IoSession執行decode()函式,但它並不保證它始終是相同的執行緒。假設第一段資料由thread-1處理,它決定它還不能解碼,當下一條資料到達時,它可以由另一個執行緒處理。為避免可見性問題,必須正確同步對此解碼器狀態的訪問(IoSession屬性儲存在ConcurrentHashMap中,因此它們對其他執行緒自動可見)。
c)對郵件列表的討論得出了這樣的結論:在IoSession中儲存狀態或在Decoder例項本身之間進行選擇更多的是品味問題。為了確保沒有兩個執行緒將為同一IoSession執行解碼方法,MINA需要進行某種形式的同步=>此同步還將確保您不會遇到上述可見性問題。 (感謝Adam Fisk指出這一點)請參閱http://www.nabble.com/Tutorial-on-ProtocolCodecFilter,-state-and-threads-t3965413.html
2.當協議使用長度字首時,IoBuffer.prefixedDataAvailable()非常方便;它支援1,2或4個位元組的字首。
3.不要忘記在解碼響應時重置解碼器狀態(刪除會話屬性是另一種方法)
如果響應由單個影象組成,我們不需要儲存解碼器狀態:
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { if (in.prefixedDataAvailable(4)) { int length = in.getInt(); byte[] bytes = new byte[length]; in.get(bytes); ByteArrayInputStream bais = new ByteArrayInputStream(bytes); BufferedImage image = ImageIO.read(bais); out.write(image); return true; } else { return false; } }
現在讓我們將它們粘合在一起:
public class ImageCodecFactory implements ProtocolCodecFactory { private ProtocolEncoder encoder; private ProtocolDecoder decoder; public ImageCodecFactory(boolean client) { if (client) { encoder = new ImageRequestEncoder(); decoder = new ImageResponseDecoder(); } else { encoder = new ImageResponseEncoder(); decoder = new ImageRequestDecoder(); } } public ProtocolEncoder getEncoder(IoSession ioSession) throws Exception { return encoder; } public ProtocolDecoder getDecoder(IoSession ioSession) throws Exception { return decoder; } }
備註:
1.對於每個新會話,MINA將向ImageCodecFactory詢問編碼器和解碼器。
2.由於我們的編碼器和解碼器不儲存會話狀態,因此讓所有會話共享單個例項是安全的。
這是伺服器使用ProtocolCodecFactory的方式:
public class ImageServer { public static final int PORT = 33789; public static void main(String[] args) throws IOException { ImageServerIoHandler handler = new ImageServerIoHandler(); NioSocketAcceptor acceptor = new NioSocketAcceptor(); acceptor.getFilterChain().addLast("protocol", new ProtocolCodecFilter(new ImageCodecFactory(false))); acceptor.setLocalAddress(new InetSocketAddress(PORT)); acceptor.setHandler(handler); acceptor.bind(); System.out.println("server is listenig at port " + PORT); } }
客戶端的用法是相同的:
public class ImageClient extends IoHandlerAdapter { public static final int CONNECT_TIMEOUT = 3000; private String host; private int port; private SocketConnector connector; private IoSession session; private ImageListener imageListener; public ImageClient(String host, int port, ImageListener imageListener) { this.host = host; this.port = port; this.imageListener = imageListener; connector = new NioSocketConnector(); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ImageCodecFactory(true))); connector.setHandler(this); } public void messageReceived(IoSession session, Object message) throws Exception { ImageResponse response = (ImageResponse) message; imageListener.onImages(response.getImage1(), response.getImage2()); } ...
為了完整起見,我將為伺服器端IoHandler新增程式碼:
public class ImageServerIoHandler extends IoHandlerAdapter { private final static String characters = "mina rocks abcdefghijklmnopqrstuvwxyz0123456789"; public static final String INDEX_KEY = ImageServerIoHandler.class.getName() + ".INDEX"; private Logger logger = LoggerFactory.getLogger(this.getClass()); public void sessionOpened(IoSession session) throws Exception { session.setAttribute(INDEX_KEY, 0); } public void exceptionCaught(IoSession session, Throwable cause) throws Exception { IoSessionLogger sessionLogger = IoSessionLogger.getLogger(session, logger); sessionLogger.warn(cause.getMessage(), cause); } public void messageReceived(IoSession session, Object message) throws Exception { ImageRequest request = (ImageRequest) message; String text1 = generateString(session, request.getNumberOfCharacters()); String text2 = generateString(session, request.getNumberOfCharacters()); BufferedImage image1 = createImage(request, text1); BufferedImage image2 = createImage(request, text2); ImageResponse response = new ImageResponse(image1, image2); session.write(response); } private BufferedImage createImage(ImageRequest request, String text) { BufferedImage image = new BufferedImage(request.getWidth(), request.getHeight(), BufferedImage.TYPE_BYTE_INDEXED); Graphics graphics = image.createGraphics(); graphics.setColor(Color.YELLOW); graphics.fillRect(0, 0, image.getWidth(), image.getHeight()); Font serif = new Font("serif", Font.PLAIN, 30); graphics.setFont(serif); graphics.setColor(Color.BLUE); graphics.drawString(text, 10, 50); return image; } private String generateString(IoSession session, int length) { Integer index = (Integer) session.getAttribute(INDEX_KEY); StringBuffer buffer = new StringBuffer(length); while (buffer.length() < length) { buffer.append(characters.charAt(index)); index++; if (index >= characters.length()) { index = 0; } } session.setAttribute(INDEX_KEY, index); return buffer.toString(); } }
結論
有關編碼和解碼的內容還有很多。但是我希望這個教程已經讓你開始了。我將嘗試在不久的將來新增有關DemuxingProtocolCodecFactory的內容。然後我們還將看看如何使用分隔符而不是長度字首。