Netty實現WebSocket
阿新 • • 發佈:2018-05-25
request inb date keep turn elf HA close 地址
package com.qmtt.server; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; @Service public class NettyServer { private static final Logger log = LoggerFactory.getLogger(NettyServer.class); EventLoopGroup bossGroup; EventLoopGroup workGroup; Channel channel;// public static void main(String[] args) { // new NettyServer().run(); // } @PostConstruct public void run() { log.info("啟動netty"); bossGroup = new NioEventLoopGroup(); workGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workGroup); b.channel(NioServerSocketChannel.class); b.childHandler(new ChildChannelHandler()); channel = b.bind(7397).sync().channel(); // channel.closeFuture().sync(); } catch (Exception e) { log.error("", e); } finally { // bossGroup.shutdownGracefully(); // workGroup.shutdownGracefully(); } } @PreDestroy public void stop() { log.info("關閉netty"); if (null == channel) { log.error("server channel is null"); } bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); channel.closeFuture().syncUninterruptibly(); bossGroup = null; workGroup = null; channel = null; } }
package com.qmtt.server; import java.util.Hashtable; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.RedisTemplate; import com.qmtt.tools.JsonUtils; import com.qmtt.tools.SpringUtil; import com.qmtt.websocket.GameFunction2; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.util.CharsetUtil; public class MyWebSocket2 extends SimpleChannelInboundHandler<Object> { private static final Logger log = LoggerFactory.getLogger(MyWebSocket2.class); private WebSocketServerHandshaker handshaker; private static Map<String, ChannelHandlerContext> webSocketMap = new Hashtable<String, ChannelHandlerContext>(); public GameFunction2 gameFunction; RedisTemplate redisTemplate; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("客戶端與服務端連接開啟"); gameFunction = SpringUtil.getBean(GameFunction2.class); redisTemplate = (RedisTemplate) SpringUtil.getBean("redisTemplate"); // 添加 // Global.group.add(ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 移除 // Global.group.remove(ctx.channel()); log.info("客戶端與服務端連接關閉"); String key = null; Iterator iterator = webSocketMap.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<String, ChannelHandlerContext> entry = (Entry<String, ChannelHandlerContext>) iterator.next(); key = entry.getKey(); if (entry.getValue().equals(ctx)) { key = entry.getKey(); break; } } log.info("<{}>斷開連接", key); if (key != null) { webSocketMap.remove(key); } gameFunction.close(key); } @Override protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { handleHttpRequest(ctx, ((FullHttpRequest) msg)); } else if (msg instanceof WebSocketFrame) { handlerWebSocketFrame(ctx, (WebSocketFrame) msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // 判斷是否關閉鏈路的指令 if (frame instanceof CloseWebSocketFrame) { log.info("連接開閉"); handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // 判斷是否ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } // 本例程僅支持文本消息,不支持二進制消息 if (!(frame instanceof TextWebSocketFrame)) { log.info("不支持二進制消息"); return; } // 返回應答消息 String message = ((TextWebSocketFrame) frame).text(); if (!message.contains("msgType")) { return; } log.info("服務端收到消息:" + message); try { Map map = JsonUtils.json2map(message); String msgTpye = map.get("msgType").toString(); String openid = map.get("openid").toString(); // 開始遊戲 if (msgTpye.equals("start")) { webSocketMap.put(openid, ctx); // String rankValue = map.get("rankValue").toString(); gameFunction.joinGame(openid); return; } // 回答問題 if (msgTpye.equals("answer")) { gameFunction.answer(map); return; } // 遊戲結束 if (msgTpye.equals("gameover")) { gameFunction.gameover(map); return; } // 發出邀請等待對手 if (msgTpye.equals("wait")) { webSocketMap.put(openid, ctx); gameFunction.waitEnter(openid); return; } // 發出邀請對手進入 if (msgTpye.equals("waitEnter")) { webSocketMap.put(openid, ctx); String inviteOpenid = (String) map.get("inviteOpenid"); // 要判斷用戶是否已經開始在玩遊戲 了,是否已經離開 gameFunction.checkUserStatus(openid, inviteOpenid); return; } // 發出邀請對手進入 if (msgTpye.equals("waitStart")) { gameFunction.waitStart(openid); return; } // 再來一局 if (msgTpye.equals("playAgain")) { gameFunction.playAgain(openid); return; } } catch (Exception e) { log.error("", e); } // TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() // + ctx.channel().id() + ":" + request); // // 群發 // Global.group.writeAndFlush(tws); // 返回【誰發的發給誰】 // ctx.channel().writeAndFlush(tws); } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { if (!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } // 註意,這條地址別被誤導了,其實這裏填寫什麽都無所謂,WS協議消息的接收不受這裏控制 // 消息分發可以通過Req中獲取uri處理 // WebSocketServerHandshakerFactory wsFactory = new // WebSocketServerHandshakerFactory("ws://127.0.0.1:7397/websocket", // null, // false); WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("", null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } } private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { // 返回應答給客戶端 if (res.getStatus().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } // 如果是非Keep-Alive,關閉連接 ChannelFuture f = ctx.channel().writeAndFlush(res); if (!isKeepAlive(req) || res.getStatus().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } private static boolean isKeepAlive(FullHttpRequest req) { return false; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } public static Map<String, ChannelHandlerContext> getWebSocketMap() { return webSocketMap; } public static int sendMsg(String id, Object msg) { try { String ret = JsonUtils.toJsonStringIgnoreNull(msg); ChannelHandlerContext socket = MyWebSocket2.getWebSocketMap().get(id); if (socket != null) { log.info("給<{}>發送消息:{}", id, ret); socket.writeAndFlush(new TextWebSocketFrame(ret)); return 1; } else { log.info("<{}>連接不存在,不處理", id); } } catch (Exception ex) { log.error("", ex); } return 0; } public static int sendMsg(String id, String msg) { try { ChannelHandlerContext socket = MyWebSocket2.getWebSocketMap().get(id); if (socket != null) { log.info("給<{}>發送消息:{}", id, msg); socket.writeAndFlush(new TextWebSocketFrame(msg)); return 1; } else { log.info("連接不存在,不處理"); } } catch (Exception ex) { log.error("", ex); } return 0; } }
package com.qmtt.server; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.stream.ChunkedWriteHandler; public class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel e) throws Exception { e.pipeline().addLast("http-codec", new HttpServerCodec()); e.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); e.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); e.pipeline().addLast("handler", new MyWebSocket2()); } }
此代碼為詩詞榮耀websocket的實現,解決了tomcat實現的websocket連接不穩定的問題
Netty實現WebSocket