Spring+Netty+WebSocket實例
阿新 • • 發佈:2017-12-06
adc random 目錄結構 div socket 相對 mage exec 客戶端
比較貼近生產,詳見註釋
一、pom.xml
具體太長,詳見源碼
</dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.2.Final</version> </dependency>
二、目錄結構
三、AfterSpringBegin
繼承了AfterSpringBegin的子類在spring加載成功後,會自動啟動
package com.netty.init; import java.util.Timer; import java.util.TimerTask; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; /** * * spring加載後改方法的子類 * */ public abstract class AfterSpringBegin extends TimerTask implementsView CodeApplicationListener<ContextRefreshedEvent>{ public void onApplicationEvent(ContextRefreshedEvent event) { // TODO Auto-generated method stub if(event.getApplicationContext().getParent() ==null){ Timer timer = new Timer(); timer.schedule(this, 0); } } }
四、Constant
存放了websocket相關信道
package com.netty.constant; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 常量 * */ public class Constant { //存放所有的ChannelHandlerContext public static Map<String, ChannelHandlerContext> pushCtxMap = new ConcurrentHashMap<String, ChannelHandlerContext>() ; //存放某一類的channel public static ChannelGroup aaChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); }View Code
五、WebSocketServer
啟動服務
package com.netty.server; import javax.annotation.PreDestroy; import org.springframework.beans.factory.annotation.Autowired; import com.netty.init.AfterSpringBegin; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * 啟動服務 * */ public class WebSocketServer extends AfterSpringBegin{ //用於客戶端連接請求 @Autowired private EventLoopGroup bossGroup; //用於處理客戶端I/O操作 @Autowired private EventLoopGroup workerGroup; //服務器的輔助啟動類 @Autowired private ServerBootstrap serverBootstrap; //BS的I/O處理類 private ChannelHandler childChannelHandler; private ChannelFuture channelFuture; //服務端口 private int port; public WebSocketServer(){ System.out.println("初始化"); } public EventLoopGroup getBossGroup() { return bossGroup; } public void setBossGroup(EventLoopGroup bossGroup) { this.bossGroup = bossGroup; } public EventLoopGroup getWorkerGroup() { return workerGroup; } public void setWorkerGroup(EventLoopGroup workerGroup) { this.workerGroup = workerGroup; } public ServerBootstrap getServerBootstrap() { return serverBootstrap; } public void setServerBootstrap(ServerBootstrap serverBootstrap) { this.serverBootstrap = serverBootstrap; } public ChannelHandler getChildChannelHandler() { return childChannelHandler; } public void setChildChannelHandler(ChannelHandler childChannelHandler) { this.childChannelHandler = childChannelHandler; } public ChannelFuture getChannelFuture() { return channelFuture; } public void setChannelFuture(ChannelFuture channelFuture) { this.channelFuture = channelFuture; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } @Override public void run() { // TODO Auto-generated method stub try { bulid(port); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void bulid(int port) throws Exception{ try { //(1)boss輔助客戶端的tcp連接請求 worker負責與客戶端之前的讀寫操作 //(2)配置客戶端的channel類型 //(3)配置TCP參數,握手字符串長度設置 //(4)TCP_NODELAY是一種算法,為了充分利用帶寬,盡可能發送大塊數據,減少充斥的小塊數據,true是關閉,可以保持高實時性,若開啟,減少交互次數,但是時效性相對無法保證 //(5)開啟心跳包活機制,就是客戶端、服務端建立連接處於ESTABLISHED狀態,超過2小時沒有交流,機制會被啟動 //(6)netty提供了2種接受緩存區分配器,FixedRecvByteBufAllocator是固定長度,但是拓展,AdaptiveRecvByteBufAllocator動態長度 //(7)綁定I/O事件的處理類,WebSocketChildChannelHandler中定義 serverBootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(592048)) .childHandler(childChannelHandler); System.out.println("成功"); channelFuture = serverBootstrap.bind(port).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { // TODO: handle exception bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } //執行之後關閉 @PreDestroy public void close(){ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }View Code
六、WebSocketChildChannelHandler
五裏面的private ChannelHandler childChannelHandler; 註入的就是這個類,註入配置在後面的xml中,用處在五代碼裏註解了
package com.netty.server; import javax.annotation.Resource; import org.springframework.stereotype.Component; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.stream.ChunkedWriteHandler; @Component public class WebSocketChildChannelHandler extends ChannelInitializer<SocketChannel>{ @Resource(name = "webSocketServerHandler") private ChannelHandler webSocketServerHandler; @Override protected void initChannel(SocketChannel ch) throws Exception { // TODO Auto-generated method stub ch.pipeline().addLast("http-codec", new HttpServerCodec()); ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); ch.pipeline().addLast("handler",webSocketServerHandler); } }View Code
七、WebSocketServerHandler
websocket具體的業務處理,六中的private ChannelHandler webSocketServerHandler;,註入的就是這個類
package com.netty.server; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import com.netty.constant.Constant; import com.netty.manage.ManageMessage; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandler.Sharable; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.util.CharsetUtil; /** * websocket 具體業務處理方法 * * */ @Component @Sharable public class WebSocketServerHandler extends BaseWebSocketServerHandler{ private WebSocketServerHandshaker handshaker; /** * 當客戶端連接成功,返回個成功信息 * */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub push(ctx, "連接成功"); } /** * 當客戶端斷開連接 * */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub for(String key:Constant.pushCtxMap.keySet()){ if(ctx.equals(Constant.pushCtxMap.get(key))){ //從連接池內剔除 System.out.println(Constant.pushCtxMap.size()); System.out.println("剔除"+key); Constant.pushCtxMap.remove(key); System.out.println(Constant.pushCtxMap.size()); } } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub ctx.flush(); } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub //http://xxxx if(msg instanceof FullHttpRequest){ handleHttpRequest(ctx,(FullHttpRequest)msg); }else if(msg instanceof WebSocketFrame){ //ws://xxxx handlerWebSocketFrame(ctx,(WebSocketFrame)msg); } } public void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception{ //關閉請求 if(frame instanceof CloseWebSocketFrame){ handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain()); return; } //ping請求 if(frame instanceof PingWebSocketFrame){ ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } //只支持文本格式,不支持二進制消息 if(!(frame instanceof TextWebSocketFrame)){ throw new Exception("僅支持文本格式"); } //客服端發送過來的消息 String request = ((TextWebSocketFrame) frame).text(); System.out.println("服務端收到:" + request); JSONObject jsonObject = null; try { jsonObject = JSONObject.parseObject(request); System.out.println(jsonObject.toJSONString()); } catch (Exception e) { } if (jsonObject == null){ return; } String id = (String) jsonObject.get("id"); String type = (String) jsonObject.get("type"); //根據id判斷是否登陸或者是否有權限等 if(id!=null && !"".equals("id") && type!=null && !"".equals("type")){ //用戶是否有權限 boolean idAccess = true; //類型是否符合定義 boolean typeAccess = true; if(idAccess && typeAccess){ System.out.println("添加到連接池:"+request); Constant.pushCtxMap.put(request,ctx); Constant.aaChannelGroup.add(ctx.channel()); } //根據type 存放進對於的channel池,這裏就簡單實現,直接放進aaChannelGroup,方便群發 } } //第一次請求是http請求,請求頭包括ws的信息 public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){ if(!req.decoderResult().isSuccess()){ sendHttpResponse(ctx,req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws:/"+ctx.channel()+ "/websocket",null,false); handshaker = wsFactory.newHandshaker(req); if(handshaker == null){ //不支持 WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); }else{ handshaker.handshake(ctx.channel(), req); } } public static void sendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req,DefaultFullHttpResponse res){ // 返回應答給客戶端 if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } // 如果是非Keep-Alive,關閉連接 ChannelFuture f = ctx.channel().writeAndFlush(res); if (!isKeepAlive(req) || res.status().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } private static boolean isKeepAlive(FullHttpRequest req) { return false; } //異常處理,netty默認是關閉channel @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub //輸出日誌 cause.printStackTrace(); ctx.close(); } }View Code
八、BaseWebSocketServerHandler
把推送方法單獨抽象出來,方便調用
package com.netty.server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; /** * 發消息方式 抽象出來 * * */ public abstract class BaseWebSocketServerHandler extends SimpleChannelInboundHandler<Object>{ /** * 推送單個 * * */ public static final void push(final ChannelHandlerContext ctx,final String message){ TextWebSocketFrame tws = new TextWebSocketFrame(message); ctx.channel().writeAndFlush(tws); } /** * 群發 * * */ public static final void push(final ChannelGroup ctxGroup,final String message){ TextWebSocketFrame tws = new TextWebSocketFrame(message); ctxGroup.writeAndFlush(tws); } }View Code
九、配置
application-netty.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:cache="http://www.springframework.org/schema/cache" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd"> <bean id="bossGroup" class="io.netty.channel.nio.NioEventLoopGroup"></bean> <bean id="workerGroup" class="io.netty.channel.nio.NioEventLoopGroup"></bean> <bean id="serverBootstrap" class="io.netty.bootstrap.ServerBootstrap" scope="prototype"></bean> <bean id="webSocketServer" class="com.netty.server.WebSocketServer"> <property name="port" value="${websocket.server.port}"></property> <property name="childChannelHandler" ref="webSocketChildChannelHandler" /> </bean> </beans>View Code
application-beans.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"> <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/> <context:annotation-config /> <context:component-scan base-package="com.netty"> <!-- 排除vst.back目錄下Controller的service註入 --> <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/> </context:component-scan> <bean id="configProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="locations"> <list> <value>classpath*:conf/websocket.properties</value> </list> </property> </bean> <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PreferencesPlaceholderConfigurer"> <property name="properties" ref="configProperties" /> </bean> </beans>View Code
springmvc.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd"> <description>Spring-web MVC配置</description> <bean class="org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter"> <property name="messageConverters"> <list> <bean class="org.springframework.http.converter.StringHttpMessageConverter"> <property name="supportedMediaTypes"> <list> <value>text/html;charset=UTF-8</value> </list> </property> </bean> </list> </property> </bean> <mvc:annotation-driven /> <context:component-scan base-package="com.netty.controller"> <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller" /> <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Service" /> </context:component-scan> </beans>View Code
websocket.properties
websocket.server.port=7397
十、客戶端
用的jsp頁面,具體連接邏輯什麽的看需要寫
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <title></title> </head> </head> <script type="text/javascript"> var socket; //實際生產中,id可以從session裏面拿用戶id var id = Math.random().toString(36).substr(2); if(!window.WebSocket){ window.WebSocket = window.MozWebSocket; } if(window.WebSocket){ socket = new WebSocket("ws://localhost:7397"); socket.onmessage = function(event){ appendln("receive:" + event.data); }; socket.onopen = function(event){ appendln("WebSocket is opened"); login(); }; socket.onclose = function(event){ appendln("WebSocket is closed"); }; }else{ alert("WebSocket is not support"); } function appendln(text) { var ta = document.getElementById(‘responseText‘); ta.value += text + "\r\n"; } function login(){ console.log("aaaaaa"); var date={"id":id,"type":"aa"}; var login = JSON.stringify(date); socket.send(login); } </script> <body> <form onSubmit="return false;"> <input type = "text" name="message" value="hello"/> <br/><br/> <textarea id="responseText" style="width: 800px;height: 300px;"></textarea> </form> </body> </html>View Code
十一、源碼
源碼
Spring+Netty+WebSocket實例