Apache Mina 自定義協議解析
阿新 • • 發佈:2022-05-06
目錄
協議說明
訊息頭4位元組,內容為訊息體位元組長度
服務端
package com.cnblogs.javalouvre; import org.apache.mina.core.IoUtil; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.*; import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.SocketSessionConfig; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; import java.nio.charset.StandardCharsets; public class Server { private static final Logger log = LoggerFactory.getLogger(Server.class); public static void main(String[] args) { final NioSocketAcceptor acceptor = new NioSocketAcceptor(); final DefaultIoFilterChainBuilder filterChain = acceptor.getFilterChain(); // 編碼、解碼過濾器 filterChain.addLast("codec", new ProtocolCodecFilter(new MessageProtocolCodecFactory())); // 日誌過濾器 filterChain.addLast("logger", new LoggingFilter()); // 把耗時操作放入單獨執行緒池中 filterChain.addLast("executor", new ExecutorFilter()); final SocketSessionConfig sessionConfig = acceptor.getSessionConfig(); sessionConfig.setMaxReadBufferSize(100); sessionConfig.setIdleTime(IdleStatus.BOTH_IDLE, 10); acceptor.setHandler(new IoHandlerAdapter() { @Override public void messageReceived(IoSession session, Object message) throws Exception { log.info("服務端收到訊息: {}", message); // 對當前所有連線的session廣播 IoUtil.broadcast("...BOOM...", session.getService().getManagedSessions().values()); session.write("COPY THAT."); } }); try { acceptor.bind(new InetSocketAddress(5678)); } catch (IOException e) { e.printStackTrace(); } System.out.println("伺服器啟動成功,埠號5678..."); } private static class MessageProtocolCodecFactory implements ProtocolCodecFactory { private final ProtocolEncoder encoder; private final ProtocolDecoder decoder; public MessageProtocolCodecFactory() { this(StandardCharsets.UTF_8); } public MessageProtocolCodecFactory(Charset charset) { this.encoder = new MessageProtocolEncoder(charset); this.decoder = new MessageProtocolDecoder(charset); } @Override public ProtocolEncoder getEncoder(IoSession session) throws Exception { return this.encoder; } @Override public ProtocolDecoder getDecoder(IoSession session) throws Exception { return this.decoder; } } private static class MessageProtocolDecoder extends CumulativeProtocolDecoder { private final CharsetDecoder charsetDecoder; public MessageProtocolDecoder(Charset charset) { this.charsetDecoder = charset.newDecoder(); } @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { // Header部分四位元組,如果未接收完整,終止解析 if (in.remaining() < 4) { return false; } else { in.mark(); // 讀Header部資訊方式① // byte[] bytes = new byte[4]; // in.get(bytes, 0, 4); // final int bodySize = convertByteArrayToInteger(bytes); // 讀Header部資訊方式② final int bodySize = in.getInt(); // Body部分,如果未接收完整,終止解析 if (in.remaining() < bodySize) { in.reset(); return false; } else { out.write(in.getString(bodySize, this.charsetDecoder)); return true; } } } private int convertByteArrayToInteger(byte[] bytes) { final int size = bytes.length; if (size > 4) { throw new IllegalArgumentException(); } int result = 0; for (int i = 0; i < size; i++) { result += (bytes[i] & 0xFF) << ((size - 1 - i) * 8); } return result; } } private static class MessageProtocolEncoder implements ProtocolEncoder { private final CharsetEncoder charsetEncoder; public MessageProtocolEncoder(Charset charset) { this.charsetEncoder = charset.newEncoder(); } @Override public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { final String msg = (String) message; final IoBuffer buf = IoBuffer.allocate(msg.length() + 4); // 寫Header部資訊方式① // buf.put(convertIntegerToByteArray(msg.length(), 4)); // 寫Header部資訊方式② buf.putInt(msg.length()); // 寫Body部資訊 buf.putString(msg, this.charsetEncoder); buf.flip(); out.write(buf); } @Override public void dispose(IoSession session) throws Exception { // do nothing } private byte[] convertIntegerToByteArray(int val, int length) { if (length > 4 || length < 1) { throw new IllegalArgumentException(); } byte[] result = new byte[length]; for (int i = 0; i < length; i++) { result[i] = (byte)((val >> ((length - 1 - i) * 8)) & 0xFF); } return result; } } }