AAA基於netty TCP server server主動給client 發訊息解決方案
阿新 • • 發佈:2019-01-10
思路
1 client 主動連線server ,server儲存server和client的channel,並放在map裡面,可以用java多執行緒的ConcurrentHashMap
2 當server想給client發訊息的時候,拿到對應的key 從ConcurrentHashMap
TCPServerNetty.getMap().get(clientIP).writeAndFlush(bytes) 發訊息
TCPServerNetty.java
InBoundHandler.javaimport io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.bytes.ByteArrayDecoder; import io.netty.handler.codec.bytes.ByteArrayEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class TCPServerNetty{ private int port; private static Map<String, Channel> map = new ConcurrentHashMap<String, Channel>(); private static Map<String, byte[]> messageMap = new ConcurrentHashMap<String, byte[]>(); public TCPServerNetty(int port){ this.port = port; } public TCPServerNetty(){} public void start() throws Exception{ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // Decoders //ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)); ch.pipeline().addLast("bytesDecoder", new ByteArrayDecoder()); // Encoder //ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(4)); ch.pipeline().addLast("bytesEncoder", new ByteArrayEncoder()); ch.pipeline().addLast(new OutBoundHandler()); ch.pipeline().addLast(new IdleStateHandler(0,0,300), new InBoundHandler()); } }); b.bind(port); // Start the server. //ChannelFuture f = b.bind(port).sync(); // Wait until the server socket is closed. //f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // Shut down all event loops to terminate all threads. //bossGroup.shutdownGracefully(); //workerGroup.shutdownGracefully(); } } /** * @param args * @throws Exception */ public static void main(String args[]) throws Exception{ new TCPServerNetty(999).start(); } public static Map<String, Channel> getMap() { return map; } public static void setMap(Map<String, Channel> map) { TCPServerNetty.map = map; } public static String bytesToHexString(byte[] src){ StringBuilder stringBuilder = new StringBuilder(); if (src == null || src.length <= 0) { return null; } for (int i = 0; i < src.length; i++) { int v = src[i] & 0xFF; String hv = Integer.toHexString(v); if (hv.length() < 2) { stringBuilder.append(0); } stringBuilder.append(hv); stringBuilder.append(' '); } return stringBuilder.toString(); } /** * @return the messageMap */ public static Map<String, byte[]> getMessageMap() { return messageMap; } /** * @param messageMap the messageMap to set */ public static void setMessageMap(Map<String, byte[]> messageMap) { TCPServerNetty.messageMap = messageMap; } }
public class InBoundHandler extends SimpleChannelInboundHandler<byte[]> { private static Logger logger = LoggerFactory.getLogger(InBoundHandler.class); private static Map<String, MultiFrameModel> map = new ConcurrentHashMap<String, MultiFrameModel>(); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); logger.info("CLIENT"+getRemoteAddress(ctx)+" 接入連線"); //往channel map中新增channel資訊 TCPServerNetty.getMap().put(getIPString(ctx), ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //刪除Channel Map中的失效Client TCPServerNetty.getMap().remove(getIPString(ctx)); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, byte[] msg) throws Exception { logger.info("來自裝置的資訊:"+TCPServerNetty.bytesToHexString(msg)); byte byteA3 = msg[11]; byte[] addressDomain = new byte[5]; System.arraycopy(msg, 7, addressDomain, 0, 5); String str1 = getKeyFromArray(addressDomain); //生成key logger.info("根據地址域生成的Key為:"+str1); if (byteA3==0) { } else{ logger.info("上述訊息是從裝置採集到的訊息!"); TCPServerNetty.getMessageMap().put("1", msg); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { String socketString = ctx.channel().remoteAddress().toString(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { logger.info("Client: "+socketString+" READER_IDLE 讀超時"); ctx.disconnect(); } else if (event.state() == IdleState.WRITER_IDLE) { logger.info("Client: "+socketString+" WRITER_IDLE 寫超時"); ctx.disconnect(); } else if (event.state() == IdleState.ALL_IDLE) { logger.info("Client: "+socketString+" ALL_IDLE 總超時"); ctx.disconnect(); } } } public static String getIPString(ChannelHandlerContext ctx){ String ipString = ""; String socketString = ctx.channel().remoteAddress().toString(); int colonAt = socketString.indexOf(":"); ipString = socketString.substring(1, colonAt); return ipString; } public static String getRemoteAddress(ChannelHandlerContext ctx){ String socketString = ""; socketString = ctx.channel().remoteAddress().toString(); return socketString; } private String getKeyFromArray(byte[] addressDomain) { StringBuffer sBuffer = new StringBuffer(); for(int i=0;i<5;i++){ sBuffer.append(addressDomain[i]); } return sBuffer.toString(); } protected String to8BitString(String binaryString) { int len = binaryString.length(); for (int i = 0; i < 8-len; i++) { binaryString = "0"+binaryString; } return binaryString; } protected static byte[] combine2Byte(byte[] bt1, byte[] bt2){ byte[] byteResult = new byte[bt1.length+bt2.length]; System.arraycopy(bt1, 0, byteResult, 0, bt1.length); System.arraycopy(bt2, 0, byteResult, bt1.length, bt2.length); return byteResult; }
OutBoundHandler.java
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class OutBoundHandler extends ChannelOutboundHandlerAdapter { private static Logger logger = LoggerFactory.getLogger(OutBoundHandler.class); @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof byte[]) { byte[] bytesWrite = (byte[])msg; ByteBuf buf = ctx.alloc().buffer(bytesWrite.length); logger.info("向裝置下發的資訊為:"+TCPServerNetty.bytesToHexString(bytesWrite)); buf.writeBytes(bytesWrite); ctx.writeAndFlush(buf).addListener(new ChannelFutureListener(){ @Override public void operationComplete(ChannelFuture future) throws Exception { logger.info("下發成功!"); } }); } } }