1. 程式人生 > >AAA基於netty TCP server server主動給client 發訊息解決方案

AAA基於netty TCP server server主動給client 發訊息解決方案

思路

1 client 主動連線server ,server儲存server和client的channel,並放在map裡面,可以用java多執行緒的ConcurrentHashMap
2 當server想給client發訊息的時候,拿到對應的key 從ConcurrentHashMap 
   TCPServerNetty.getMap().get(clientIP).writeAndFlush(bytes) 發訊息

TCPServerNetty.java

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


public class TCPServerNetty{
	private int port;
	private static Map<String, Channel> map = new ConcurrentHashMap<String, Channel>();
	private static Map<String, byte[]> messageMap = new ConcurrentHashMap<String, byte[]>();
	
	public TCPServerNetty(int port){
		this.port = port;
	}
	
	public TCPServerNetty(){}
	
	public void start() throws Exception{
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                       	    // Decoders
	                        //ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
	                        ch.pipeline().addLast("bytesDecoder", new ByteArrayDecoder());
	                       	// Encoder
	                       	//ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(4));
	                       	ch.pipeline().addLast("bytesEncoder", new ByteArrayEncoder());                        	
                        	ch.pipeline().addLast(new OutBoundHandler());
                            ch.pipeline().addLast(new IdleStateHandler(0,0,300), new InBoundHandler());
                        }
                    });

            b.bind(port);
            // Start the server.
            //ChannelFuture f = b.bind(port).sync();

            // Wait until the server socket is closed.
            //f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // Shut down all event loops to terminate all threads.
            //bossGroup.shutdownGracefully();
            //workerGroup.shutdownGracefully();
        }
	}
	
	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String args[]) throws Exception{
		new TCPServerNetty(999).start();
	}

	public static Map<String, Channel> getMap() {
		return map;
	}

	public static void setMap(Map<String, Channel> map) {
		TCPServerNetty.map = map;
	}

    public static String bytesToHexString(byte[] src){       
        StringBuilder stringBuilder = new StringBuilder();       
        if (src == null || src.length <= 0) {       
            return null;       
        }       
        for (int i = 0; i < src.length; i++) {       
            int v = src[i] & 0xFF;       
            String hv = Integer.toHexString(v);       
            if (hv.length() < 2) {       
                stringBuilder.append(0);       
            }       
            stringBuilder.append(hv); 
            stringBuilder.append(' ');
        }       
        return stringBuilder.toString();       
    }

	/**
	 * @return the messageMap
	 */
	public static Map<String, byte[]> getMessageMap() {
		return messageMap;
	}

	/**
	 * @param messageMap the messageMap to set
	 */
	public static void setMessageMap(Map<String, byte[]> messageMap) {
		TCPServerNetty.messageMap = messageMap;
	}
}
InBoundHandler.java
public class InBoundHandler extends SimpleChannelInboundHandler<byte[]> {
	private static Logger logger = LoggerFactory.getLogger(InBoundHandler.class);
	private static Map<String, MultiFrameModel> map = new ConcurrentHashMap<String, MultiFrameModel>();
	
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		super.channelActive(ctx);
		
		logger.info("CLIENT"+getRemoteAddress(ctx)+" 接入連線");
		//往channel map中新增channel資訊
		TCPServerNetty.getMap().put(getIPString(ctx), ctx.channel());	
	}

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		//刪除Channel Map中的失效Client
		TCPServerNetty.getMap().remove(getIPString(ctx));	
		ctx.close();
	}

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, byte[] msg)
			throws Exception {
		logger.info("來自裝置的資訊:"+TCPServerNetty.bytesToHexString(msg));
		byte byteA3 = msg[11]; 
		byte[] addressDomain = new byte[5]; 
		System.arraycopy(msg, 7, addressDomain, 0, 5);
		String str1 = getKeyFromArray(addressDomain); //生成key
		logger.info("根據地址域生成的Key為:"+str1);
		
		if (byteA3==0) {
		
		}
		else{
			logger.info("上述訊息是從裝置採集到的訊息!");
			TCPServerNetty.getMessageMap().put("1", msg);
		}
	}
	
	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
			throws Exception {
		String socketString = ctx.channel().remoteAddress().toString();
		
		if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
            	logger.info("Client: "+socketString+" READER_IDLE 讀超時");
                ctx.disconnect();
            } else if (event.state() == IdleState.WRITER_IDLE) {
            	logger.info("Client: "+socketString+" WRITER_IDLE 寫超時");
                ctx.disconnect();
            } else if (event.state() == IdleState.ALL_IDLE) {
            	logger.info("Client: "+socketString+" ALL_IDLE 總超時");
                ctx.disconnect();
            }
        }
	}
	public static String getIPString(ChannelHandlerContext ctx){
		String ipString = "";
		String socketString = ctx.channel().remoteAddress().toString();
		int colonAt = socketString.indexOf(":");
		ipString = socketString.substring(1, colonAt);
		return ipString;
	}
	
	
	public static String getRemoteAddress(ChannelHandlerContext ctx){
		String socketString = "";
		socketString = ctx.channel().remoteAddress().toString();
		return socketString;
	}
	

	private String getKeyFromArray(byte[] addressDomain) {
		StringBuffer sBuffer = new StringBuffer();
		for(int i=0;i<5;i++){
			sBuffer.append(addressDomain[i]);
		}
		return sBuffer.toString();
	}

	protected String to8BitString(String binaryString) {
		int len = binaryString.length();
		for (int i = 0; i < 8-len; i++) {
			binaryString = "0"+binaryString;
		}
		return binaryString;
	}

	protected static byte[] combine2Byte(byte[] bt1, byte[] bt2){
    	byte[] byteResult = new byte[bt1.length+bt2.length];
    	System.arraycopy(bt1, 0, byteResult, 0, bt1.length);
    	System.arraycopy(bt2, 0, byteResult, bt1.length, bt2.length);
    	return byteResult;
    }

OutBoundHandler.java
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class OutBoundHandler extends ChannelOutboundHandlerAdapter {
	private static Logger logger = LoggerFactory.getLogger(OutBoundHandler.class);
	
	@Override
	public void write(ChannelHandlerContext ctx, Object msg,
			ChannelPromise promise) throws Exception {
		
		if (msg instanceof byte[]) {
			byte[] bytesWrite = (byte[])msg;
			ByteBuf buf = ctx.alloc().buffer(bytesWrite.length); 
			logger.info("向裝置下發的資訊為:"+TCPServerNetty.bytesToHexString(bytesWrite));
		
			buf.writeBytes(bytesWrite); 
			ctx.writeAndFlush(buf).addListener(new ChannelFutureListener(){  
                @Override  
                public void operationComplete(ChannelFuture future)  
                        throws Exception {  
                	logger.info("下發成功!");
                }  
            });
		}
	}
}