Netty筆記:使用WebSocket協議開發聊天系統
阿新 • • 發佈:2019-02-15
前言,之前一直圍繞著Http協議來開發專案,最近由於參與一個類似競拍專案的開發,有這樣一個場景,多個客戶端競拍一個商品,當一個客戶端加價後,其它關注這個商品的客戶端需要立即知道該商品的最新價格。
這裡有個問題,Http協議是基於請求/響應的,客戶端傳送請求,然後服務端響應返回,客戶端是主動方,服務端被動的接收客戶端的請求來響應,無法解決上述場景中服務端主動將最新的資料推送給客戶端的需求。
當然,有人會提出ajax輪詢的方案,就是客戶端不斷的請求(假如1秒1次)最新競拍價格。顯然這種模式具有很明顯的缺點,即瀏覽器需要不斷地向伺服器發出請求,但是Http request的Header是非常冗長的,裡面包含的可用資料比例可能非常低,這會佔用很多的頻寬和伺服器資源。
還有一種比較新穎的方案,long poll(長輪詢)。利用長輪詢,客戶端可以開啟指向服務端的Http連線,而伺服器會一直保持連線開啟,直到服務端資料更新再發送響應。雖然這種方式比ajax輪詢有進步,但都存在一個共同問題:由於Http協議的開銷,導致它們不適合用於低延遲應用。
轉載請註明出處:http://blog.csdn.net/a906998248/article/details/52839425
一.WebSocket協議簡介
WebSocket 是 Html5 開始提供的一種瀏覽器與伺服器間進行全雙工通訊的網路技術。(全雙工:同一時刻,資料可以在客戶端和服務端兩個方向上傳輸)
在 WebSocket API 中,瀏覽器和伺服器只需要做一個握手的動作,然後瀏覽器和伺服器之間就形成了一條快速通道,兩者就可以直接互相傳送資料了 。
二.相比傳統Http協議的優點及作用
1.Http協議的弊端:
a.Http協議為半雙工協議。(半雙工:同一時刻,資料只能在客戶端和服務端一個方向上傳輸)
b.Http協議冗長且繁瑣
c.易收到攻擊,如長輪詢
d.非持久化協議
2.WebSocket的特性:
a.單一的 TCP 連線,採用全雙工模式通訊
b.對代理、防火牆和路由器透明
c.無頭部資訊、Cookie 和身份驗證
d.無安全開銷
e.通過 ping/pong 幀保持鏈路啟用
f.持久化協議,連線建立後,伺服器可以主動傳遞訊息給客戶端,不再需要客戶端輪詢
三.聊天例項
前面提到過,WebSocket通訊需要建立WebSocket連線,客戶端首先要向服務端發起一個 Http 請求,這個請求和通常的 Http 請求不同,包含了一些附加頭資訊,其中附加資訊"Upgrade:WebSocket"表明這是一個基於 Http 的 WebSocket 握手請求。如下:
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: sdewgzgfewfsgergzgewrfaf==
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13
Origin: http://example.com
其中,Sec-WebSocket-Key是隨機的,服務端會使用它加密後作為Sec-WebSocket-Accept的值返回;Sec-WebSocket-Protocol是一個使用者定義的字串,用來區分同URL下,不同的服務所需要的協議;Sec-WebSocket-Version是告訴伺服器所使用的Websocket Draft(協議版本)不出意外,服務端會返回下列資訊表示握手成功,連線已經建立:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: sdgdfshgretghsdfgergtbd=
Sec-WebSocket-Protocol: chat
到這裡 WebSocket 連線已經成功建立,服務端和客戶端可以正常通訊了,此時服務端和客戶端都是對等端點,都可以主動傳送請求到另一端。
下面是前端和後端的實現過程,後端我採用了 Netty 的 API,因為最近在學 Netty,所以就採用了 Netty 中的 NIO 來構建 WebSocket 後端,我看了下網上也有用 Tomcat API 來實現,看起來也很簡單,朋友們可以試試。前端使用HTML5 來構建,可以參考WebSocket介面文件,非常方便簡單。
Lanucher用來啟動WebSocket服務端
import com.company.server.WebSocketServer;
public class Lanucher {
public static void main(String[] args) throws Exception {
// 啟動WebSocket
new WebSocketServer().run(WebSocketServer.WEBSOCKET_PORT);
}
}
使用 Netty 構建的 WebSocket 服務
import org.apache.log4j.Logger;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* WebSocket服務
*
*/
public class WebSocketServer {
private static final Logger LOG = Logger.getLogger(WebSocketServer.class);
// websocket埠
public static final int WEBSOCKET_PORT = 9090;
public void run(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("http-codec", new HttpServerCodec()); // Http訊息編碼解碼
pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // Http訊息組裝
pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket通訊支援
pipeline.addLast("handler", new BananaWebSocketServerHandler()); // WebSocket服務端Handler
}
});
Channel channel = b.bind(port).sync().channel();
LOG.info("WebSocket 已經啟動,埠:" + port + ".");
channel.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
WebSocket 服務端處理類,注意第一次握手是 Http 協議
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;
import org.apache.log4j.Logger;
import com.company.serviceimpl.BananaService;
import com.company.util.CODE;
import com.company.util.Request;
import com.company.util.Response;
import com.google.common.base.Strings;
import com.google.gson.JsonSyntaxException;
/**
* WebSocket服務端Handler
*
*/
public class BananaWebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger LOG = Logger.getLogger(BananaWebSocketServerHandler.class.getName());
private WebSocketServerHandshaker handshaker;
private ChannelHandlerContext ctx;
private String sessionId;
@Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) { // 傳統的HTTP接入
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) { // WebSocket接入
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOG.error("WebSocket異常", cause);
ctx.close();
LOG.info(sessionId + " 登出");
BananaService.logout(sessionId); // 登出
BananaService.notifyDownline(sessionId); // 通知有人下線
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
LOG.info("WebSocket關閉");
super.close(ctx, promise);
LOG.info(sessionId + " 登出");
BananaService.logout(sessionId); // 登出
BananaService.notifyDownline(sessionId); // 通知有人下線
}
/**
* 處理Http請求,完成WebSocket握手<br/>
* 注意:WebSocket連線第一次請求使用的是Http
* @param ctx
* @param request
* @throws Exception
*/
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
// 如果HTTP解碼失敗,返回HHTP異常
if (!request.getDecoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {
sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
// 正常WebSocket的Http連線請求,構造握手響應返回
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + request.headers().get(HttpHeaders.Names.HOST), null, false);
handshaker = wsFactory.newHandshaker(request);
if (handshaker == null) { // 無法處理的websocket版本
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
} else { // 向客戶端傳送websocket握手,完成握手
handshaker.handshake(ctx.channel(), request);
// 記錄管道處理上下文,便於伺服器推送資料到客戶端
this.ctx = ctx;
}
}
/**
* 處理Socket請求
* @param ctx
* @param frame
* @throws Exception
*/
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
// 判斷是否是關閉鏈路的指令
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
// 判斷是否是Ping訊息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
// 當前只支援文字訊息,不支援二進位制訊息
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException("當前只支援文字訊息,不支援二進位制訊息");
}
// 處理來自客戶端的WebSocket請求
try {
Request request = Request.create(((TextWebSocketFrame)frame).text());
Response response = new Response();
response.setServiceId(request.getServiceId());
if (CODE.online.code.intValue() == request.getServiceId()) { // 客戶端註冊
String requestId = request.getRequestId();
if (Strings.isNullOrEmpty(requestId)) {
response.setIsSucc(false).setMessage("requestId不能為空");
return;
} else if (Strings.isNullOrEmpty(request.getName())) {
response.setIsSucc(false).setMessage("name不能為空");
return;
} else if (BananaService.bananaWatchMap.containsKey(requestId)) {
response.setIsSucc(false).setMessage("您已經註冊了,不能重複註冊");
return;
}
if (!BananaService.register(requestId, new BananaService(ctx, request.getName()))) {
response.setIsSucc(false).setMessage("註冊失敗");
} else {
response.setIsSucc(true).setMessage("註冊成功");
BananaService.bananaWatchMap.forEach((reqId, callBack) -> {
response.getHadOnline().put(reqId, ((BananaService)callBack).getName()); // 將已經上線的人員返回
if (!reqId.equals(requestId)) {
Request serviceRequest = new Request();
serviceRequest.setServiceId(CODE.online.code);
serviceRequest.setRequestId(requestId);
serviceRequest.setName(request.getName());
try {
callBack.send(serviceRequest); // 通知有人上線
} catch (Exception e) {
LOG.warn("回調發送訊息給客戶端異常", e);
}
}
});
}
sendWebSocket(response.toJson());
this.sessionId = requestId; // 記錄會話id,當頁面重新整理或瀏覽器關閉時,登出掉此鏈路
} else if (CODE.send_message.code.intValue() == request.getServiceId()) { // 客戶端傳送訊息到聊天群
String requestId = request.getRequestId();
if (Strings.isNullOrEmpty(requestId)) {
response.setIsSucc(false).setMessage("requestId不能為空");
} else if (Strings.isNullOrEmpty(request.getName())) {
response.setIsSucc(false).setMessage("name不能為空");
} else if (Strings.isNullOrEmpty(request.getMessage())) {
response.setIsSucc(false).setMessage("message不能為空");
} else {
response.setIsSucc(true).setMessage("傳送訊息成功");
BananaService.bananaWatchMap.forEach((reqId, callBack) -> { // 將訊息傳送到所有機器
Request serviceRequest = new Request();
serviceRequest.setServiceId(CODE.receive_message.code);
serviceRequest.setRequestId(requestId);
serviceRequest.setName(request.getName());
serviceRequest.setMessage(request.getMessage());
try {
callBack.send(serviceRequest);
} catch (Exception e) {
LOG.warn("回調發送訊息給客戶端異常", e);
}
});
}
sendWebSocket(response.toJson());
} else if (CODE.downline.code.intValue() == request.getServiceId()) { // 客戶端下線
String requestId = request.getRequestId();
if (Strings.isNullOrEmpty(requestId)) {
sendWebSocket(response.setIsSucc(false).setMessage("requestId不能為空").toJson());
} else {
BananaService.logout(requestId);
response.setIsSucc(true).setMessage("下線成功");
BananaService.notifyDownline(requestId); // 通知有人下線
sendWebSocket(response.toJson());
}
} else {
sendWebSocket(response.setIsSucc(false).setMessage("未知請求").toJson());
}
} catch (JsonSyntaxException e1) {
LOG.warn("Json解析異常", e1);
} catch (Exception e2) {
LOG.error("處理Socket請求異常", e2);
}
}
/**
* Http返回
* @param ctx
* @param request
* @param response
*/
private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) {
// 返回應答給客戶端
if (response.getStatus().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), CharsetUtil.UTF_8);
response.content().writeBytes(buf);
buf.release();
HttpHeaders.setContentLength(response, response.content().readableBytes());
}
// 如果是非Keep-Alive,關閉連線
ChannelFuture f = ctx.channel().writeAndFlush(response);
if (!HttpHeaders.isKeepAlive(request) || response.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
/**
* WebSocket返回
* @param ctx
* @param req
* @param res
*/
public void sendWebSocket(String msg) throws Exception {
if (this.handshaker == null || this.ctx == null || this.ctx.isRemoved()) {
throw new Exception("尚未握手成功,無法向客戶端傳送WebSocket訊息");
}
this.ctx.channel().write(new TextWebSocketFrame(msg));
this.ctx.flush();
}
}
聊天服務介面和實現類
import com.company.util.Request;
public interface BananaCallBack {
// 服務端傳送訊息給客戶端
void send(Request request) throws Exception;
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import com.company.service.BananaCallBack;
import com.company.util.CODE;
import com.company.util.Request;
import com.google.common.base.Strings;
public class BananaService implements BananaCallBack {
private static final Logger LOG = Logger.getLogger(BananaService.class);
public static final Map<String, BananaCallBack> bananaWatchMap = new ConcurrentHashMap<String, BananaCallBack>(); // <requestId, callBack>
private ChannelHandlerContext ctx;
private String name;
public BananaService(ChannelHandlerContext ctx, String name) {
this.ctx = ctx;
this.name = name;
}
public static boolean register(String requestId, BananaCallBack callBack) {
if (Strings.isNullOrEmpty(requestId) || bananaWatchMap.containsKey(requestId)) {
return false;
}
bananaWatchMap.put(requestId, callBack);
return true;
}
public static boolean logout(String requestId) {
if (Strings.isNullOrEmpty(requestId) || !bananaWatchMap.containsKey(requestId)) {
return false;
}
bananaWatchMap.remove(requestId);
return true;
}
@Override
public void send(Request request) throws Exception {
if (this.ctx == null || this.ctx.isRemoved()) {
throw new Exception("尚未握手成功,無法向客戶端傳送WebSocket訊息");
}
this.ctx.channel().write(new TextWebSocketFrame(request.toJson()));
this.ctx.flush();
}
/**
* 通知所有機器有機器下線
* @param requestId
*/
public static void notifyDownline(String requestId) {
BananaService.bananaWatchMap.forEach((reqId, callBack) -> { // 通知有人下線
Request serviceRequest = new Request();
serviceRequest.setServiceId(CODE.downline.code);
serviceRequest.setRequestId(requestId);
try {
callBack.send(serviceRequest);
} catch (Exception e) {
LOG.warn("回調發送訊息給客戶端異常", e);
}
});
}
public String getName() {
return name;
}
}
前端html5聊天頁面及js
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Netty WebSocket 聊天例項</title>
</head>
<script src="jquery.min.js" type="text/javascript"></script>
<script src="map.js" type="text/javascript"></script>
<script type="text/javascript">
$(document).ready(function() {
var uuid = guid(); // uuid在一個會話唯一
var nameOnline = ''; // 上線姓名
var onlineName = new Map(); // 已上線人員, <requestId, name>
$("#name").attr("disabled","disabled");
$("#onlineBtn").attr("disabled","disabled");
$("#downlineBtn").attr("disabled","disabled");
$("#banana").hide();
// 初始化websocket
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:9090/");
socket.onmessage = function(event) {
console.log("收到伺服器訊息:" + event.data);
if (event.data.indexOf("isSucc") != -1) {// 這裡需要判斷是客戶端請求服務端返回後的訊息(response)
var response = JSON.parse(event.data);
if (response != undefined && response != null) {
if (response.serviceId == 1001) { // 上線
if (response.isSucc) {
// 上線成功,初始化已上線人員
onlineName.clear();
$("#showOnlineNames").empty();
for (var reqId in response.hadOnline) {
onlineName.put(reqId, response.hadOnline[reqId]);
}
initOnline();
$("#name").attr("disabled","disabled");
$("#onlineBtn").attr("disabled","disabled");
$("#downlineBtn").removeAttr("disabled");
$("#banana").show();
} else {
alert("上線失敗");
}
} else if (response.serviceId == 1004) {
if (response.isSucc) {
onlineName.clear();
$("#showBanana").empty();
$("#showOnlineNames").empty();
$("#name").removeAttr("disabled");
$("#onlineBtn").removeAttr("disabled");
$("#downlineBtn").attr("disabled","disabled");
$("#banana").hide();
} else {
alert("下線失敗");
}
}
}
} else {// 還是服務端向客戶端的請求(request)
var request = JSON.parse(event.data);
if (request != undefined && request != null) {
if (request.serviceId == 1001 || request.serviceId == 1004) { // 有人上線/下線
if (request.serviceId == 1001) {
onlineName.put(request.requestId, request.name);
}
if (request.serviceId == 1004) {
onlineName.removeByKey(request.requestId);
}
initOnline();
} else if (request.serviceId == 1003) { // 有人發訊息
appendBanana(request.name, request.message);
}
}
}
};
socket.onopen = function(event) {
$("#name").removeAttr("disabled");
$("#onlineBtn").removeAttr("disabled");
console.log("已連線伺服器");
};
socket.onclose = function(event) { // WebSocket 關閉
console.log("WebSocket已經關閉!");
};
socket.onerror = function(event) {
console.log("WebSocket異常!");
};
} else {
alert("抱歉,您的瀏覽器不支援WebSocket協議!");
}
// WebSocket傳送請求
function send(message) {
if (!window.WebSocket) { return; }
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
console.log("WebSocket連線沒有建立成功!");
alert("您還未連線上伺服器,請重新整理頁面重試");
}
}
// 重新整理上線人員
function initOnline() {
$("#showOnlineNames").empty();
for (var i=0;i<onlineName.size();i++) {
$("#showOnlineNames").append('<tr><td>' + (i+1) + '</td>' +
'<td>' + onlineName.element(i).value + '</td>' +
'</tr>');
}
}
// 追加聊天資訊
function appendBanana(name, message) {
$("#showBanana").append('<tr><td>' + name + ': ' + message + '</td></tr>');
}
$("#onlineBtn").bind("click", function() {
var name = $("#name").val();
if (name == null || name == '') {
alert("請輸入您的尊姓大名");
return;
}
nameOnline = name;
// 上線
send(JSON.stringify({"requestId":uuid, "serviceId":1001, "name":name}));
});
$("#downlineBtn").bind("click", function() {
// 下線
send(JSON.stringify({"requestId":uuid, "serviceId":1004}));
});
$("#sendBtn").bind("click", function() {
var message = $("#messageInput").val();
if (message == null || message == '') {
alert("請輸入您的聊天資訊");
return;
}
// 傳送聊天訊息
send(JSON.stringify({"requestId":uuid, "serviceId":1002, "name":nameOnline, "message":message}));
$("#messageInput").val("");
});
});
function guid() {
function S4() {
return (((1+Math.random())*0x10000)|0).toString(16).substring(1);
}
return (S4()+S4()+"-"+S4()+"-"+S4()+"-"+S4()+"-"+S4()+S4()+S4());
}
</script>
<body>
<h1>Netty WebSocket 聊天例項</h1>
<input type="text" id="name" value="佚名" placeholder="姓名" />
<input type="button" id="onlineBtn" value="上線" />
<input type="button" id="downlineBtn" value="下線" />
<hr/>
<table id="banana" border="1" >
<tr>
<td width="600" align="center">聊天</td>
<td width="100" align="center">上線人員</td>
</tr>
<tr height="200" valign="top">
<td>
<table id="showBanana" border="0" width="600">
<!--
<tr>
<td>張三: 大家好</td>
</tr>
<tr>
<td>李四: 歡迎加入群聊</td>
</tr>
-->
</table>
</td>
<td>
<table id="showOnlineNames" border="0">
<!--
<tr>
<td>1</td>
<td>張三</td>
<tr/>
<tr>
<td>2</td>
<td>李四</td>
<tr/>
-->
</table>
</td>
</tr>
<tr height="40">
<td></td>
<td></td>
</tr>
<tr>
<td>
<input type="text" id="messageInput" style="width:590px" placeholder="巴拉巴拉點什麼吧" />
</td>
<td>
<input type="button" id="sendBtn" value="傳送" />
</td>
</tr>
</table>
</body>
</html>
執行方式:
1.執行Lanucher來啟動後端的 WebSocket服務
2.開啟Resources下的banana.html頁面即可線上聊天,如下:
當有人上線/下線時,右邊的"上線人員"會動態變化
綜上,WebSocket 協議用於構建低延遲的服務,如競拍、股票行情等,使用 Netty 可以方便的構建 WebSocket 服務,需要注意的是,WebSocket 協議基於 Http協議,採用 Http 握手成功後,就可以進行 TCP 全雙工通訊了。
參考:《Netty 權威指南》