SpringBoot整合Netty之Websocket
阿新 • • 發佈:2018-11-19
前後端通過websocket通訊進行聊天~ 核心程式碼整理如下:
netty元件
@Component public class NettyBooter implements ApplicationListener<ContextRefreshedEvent> { @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { if(contextRefreshedEvent.getApplicationContext().getParent() == null){ try { //開啟WebSocket服務 WSServer.getInstance().start(); }catch (Exception e){ e.printStackTrace(); } } } }
WSServer.java
/** * 考慮反射: * 由於在呼叫 SingletonHolder.instance 的時候,才會對單例進行初始化,而且通過反射,是不能從外部類獲取內部類的屬性的。 * 所以這種形式,很好的避免了反射入侵。 * 考慮多執行緒: * 由於靜態內部類的特性,只有在其被第一次引用的時候才會被載入,所以可以保證其執行緒安全性。 * 不需要傳參的情況下 優先考慮靜態內部類 */ @Component public class WSServer { private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private ServerBootstrap server; private ChannelFuture future; private static class SingletionWSServer{ static final WSServer instance = new WSServer(); } public static WSServer getInstance(){ return SingletionWSServer.instance; } public WSServer() { bossGroup = new NioEventLoopGroup(); workerGroup =new NioEventLoopGroup(); server = new ServerBootstrap(); server.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new WSServerInitialzer());//自定義初始化handler容器 } public void start(){ //自定義埠 this.future = server.bind(8088); } }
初始化handler容器類
public class WSServerInitialzer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //websocket 基於http協議 所以要有http編解碼器 pipeline.addLast(new HttpServerCodec()); //對寫大資料流的支援 pipeline.addLast(new ChunkedWriteHandler()); //對httpMessage進行聚合,聚合成FullHttpRequest或FullHttpResponse //幾乎在netty中的程式設計 ,都會使用到此handler pipeline.addLast(new HttpObjectAggregator(1024*64)); //====================以上是使用支援http協議==== //===================增加心跳=================== //如果是讀寫空閒 不處理 pipeline.addLast(new IdleStateHandler(8,10,12)); //自定義空閒狀態檢測 pipeline.addLast(new HeartBeatHandler()); /* * websocket 伺服器處理的協議 ,用於指定給客戶端連線訪問的路由 :/ws * 本handler 會幫你處理一些繁重的複雜的事 * 會幫你處理握手動作 :handshaking (close,ping,pong)ping+pong=心跳 * 對於websocket來講, 都是以frams進行傳輸的不同的資料型別對應的frames也不同 * */ pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); //自定義handler pipeline.addLast(new ChatHandler()); } }
自定義的聊天handelr,其中channelRead0中很多東西沒有提供,自己看註釋理解吧 ,反正收到前端傳來的訊息,隨你怎麼處理,我只是提供一種思路而已
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
static public ChannelGroup clients =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
Channel currentChannel = ctx.channel();
//獲取客戶端傳輸過來的訊息
String content = msg.text();
System.out.println("接收的資料:" + content);
//1.獲取客戶端傳送來的訊息
DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class);
Integer action = dataContent.getAction();
//2判斷訊息的型別,更具不同的型別來處理不同的業務
if (action == MsgActionEnum.CONNECT.getType()) {
//2.1當websocket 第一次open的時候 初始化channel 並把userid和channel進行繫結
String senderId = dataContent.getMixinMsg().getSenderId();
UserChannelRel.put(senderId, currentChannel);
} else if (action == MsgActionEnum.CHAT.getType()) {
//2.2聊天型別的訊息
MixinMsg mixinMsg = dataContent.getMixinMsg();
String msgText = mixinMsg.getMsg();
String recevierId = mixinMsg.getReceiverId();
String senderId = mixinMsg.getSenderId();
//儲存訊息到資料庫,並且標記為未簽收
IChatMsgService chatMsgService = (IChatMsgService) SpringUtil.getBean("chatMsgServiceImpl");
String msgId = chatMsgService.saveMsg(mixinMsg);
mixinMsg.setMsgId(msgId);
//構造傳送的訊息
DataContent dataContentMsg = new DataContent();
dataContentMsg.setMixinMsg(mixinMsg);
//傳送訊息
Channel recvchannel = UserChannelRel.get(recevierId);
//從全域性使用者channel關係中獲取接收方的channel
if (recvchannel == null) {
//TODD channel為空代表使用者離線 推送訊息
} else {
//當channel 不為空的時候 從ChannelGroup去查詢channnel是否存在\
Channel findChannel = clients.find(recvchannel.id());
if (findChannel == null) {
//TODD channel為空代表使用者離線 推送訊息
} else {
//使用者線上
recvchannel.writeAndFlush(new TextWebSocketFrame(
JsonUtils.objectToJson(dataContentMsg)
));
}
}
} else if (action == MsgActionEnum.SIGNED.getType()) {
//批量簽收訊息
...
} else if (action == MsgActionEnum.KEEPALIVE.getType()) {
//2.2心跳型別的訊息
System.out.println("收到【" + ctx.channel() + "】的心跳包!");
}
/*
//群發
TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + "--"
+ ctx.channel().id() + "===》" + content);
for (Channel channel : clients) {
channel.writeAndFlush(tws);
}*/
// 下面這個方法 和上面的for迴圈 一致
// clients.writeAndFlush(tws);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
clients.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
//ChannelGroup會自動移除
clients.remove(ctx.channel());
}
//異常處理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
clients.remove(ctx.channel());
}
}
心跳類
//處理心跳
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.READER_IDLE) {
System.out.println("進入讀空閒。。。");
}else if(state == IdleState.WRITER_IDLE){
System.out.println("進入寫空閒。。。");
}else if(state == IdleState.ALL_IDLE){
//關閉無用的channel 以防資源浪費
Channel channel = ctx.channel();
channel.close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
前端 js 實現websocket客戶端 :https://blog.csdn.net/wangzhanzheng/article/details/78603532
改造實現如下,僅供參考
// 構建聊天業務CHAT WEBSocket
window.CHAT = {
socket: null,
init: function() {
if (window.WebSocket) {
// 如果當前的狀態已經連線,那就不需要重複初始化websocket
if (CHAT.socket != null &&
CHAT.socket != undefined &&
CHAT.socket.readyState == WebSocket.OPEN) {
return false;
}
CHAT.socket = new WebSocket(app.nettyServerUrl);
CHAT.socket.onopen = CHAT.wsopen,
CHAT.socket.onclose = CHAT.wsclose,
CHAT.socket.onerror = CHAT.wserror,
CHAT.socket.onmessage = CHAT.wsmessage;
} else {
alert("不支援ws通訊...");
}
},
chat: function(msg) {
// 如果當前websocket的狀態是已開啟,則直接傳送, 否則重連
if (CHAT.socket != null &&
CHAT.socket != undefined &&
CHAT.socket.readyState == WebSocket.OPEN) {
CHAT.socket.send(msg);
} else {
// 重連websocket
CHAT.init();
setTimeout("CHAT.reChat('" + msg + "')", "1000");
}
// 渲染快照列表進行展示
},
reChat: function(msg) {
console.log("訊息重新發送...");
CHAT.socket.send(msg);
},
wsopen: function() {
console.log("websocket連線已建立...");
// 構建Msg
// 構建DataContent
// 傳送websocket
CHAT.chat(JSON.stringify(dataContent));
// 每次連線之後,獲取使用者的未讀未簽收訊息列表
// 定時傳送心跳
setInterval("CHAT.keepalive()", 10000);
},
wsmessage: function(e) {
console.log("接受到訊息:" + e.data);
},
wsclose: function() {
console.log("連線關閉...");
},
wserror: function() {
console.log("發生錯誤...");
},
signMsgList: function(unSignedMsgIds) {
// 批量簽收
...
},
keepalive: function() {
// 傳送心跳
...
}
};