springboot整合SocketIO實時通訊
概述
基於 socket.io 來說,採用 node 實現更加合適,本文使用兩個後端的Java開源框架實現。
業務需求是將之前通過輪詢方式調動RESTFul API改成使用WebSocket長連線方式,實現要伺服器實時的推送訊息,另外還要實時監控POS機的線上狀態等。
引入依賴
<!-- netty-socketio--> <dependency> <groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>1.7.13</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-resolver</artifactId> <version>4.1.15.Final</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-transport</artifactId> <version>4.1.15.Final</version> </dependency> <dependency> <groupId>io.socket</groupId> <artifactId>socket.io-client</artifactId> <version>1.0.0</version> </dependency>
先來服務端程式爽一把,話不多說,先上程式碼:
伺服器程式碼
public class NamespaceSocketServer { private static final Logger logger = LoggerFactory.getLogger(NamespaceSocketServer.class); public static void main(String[] args) { /* * 建立Socket,並設定監聽埠 */ Configuration config = new Configuration(); //設定主機名 config.setHostname("localhost"); //設定監聽埠 config.setPort(9092); // 協議升級超時時間(毫秒),預設10秒。HTTP握手升級為ws協議超時時間 config.setUpgradeTimeout(10000); // Ping訊息超時時間(毫秒),預設60秒,這個時間間隔內沒有接收到心跳訊息就會發送超時事件 config.setPingTimeout(180000); // Ping訊息間隔(毫秒),預設25秒。客戶端向伺服器傳送一條心跳訊息間隔 config.setPingInterval(60000); // 連線認證,這裡使用token更合適 config.setAuthorizationListener(new AuthorizationListener() { @Override public boolean isAuthorized(HandshakeData data) { // String token = data.getSingleUrlParam("token"); // String username = JWTUtil.getSocketUsername(token); // return JWTUtil.verifySocket(token, "secret"); return true; } }); final SocketIOServer server = new SocketIOServer(config); /* * 新增連線監聽事件,監聽是否與客戶端連線到伺服器 */ server.addConnectListener(new ConnectListener() { @Override public void onConnect(SocketIOClient client) { // 判斷是否有客戶端連線 if (client != null) { logger.info("連線成功。clientId=" + client.getSessionId().toString()); client.joinRoom(client.getHandshakeData().getSingleUrlParam("appid")); } else { logger.error("並沒有人連線上。。。"); } } }); /* * 新增監聽事件,監聽客戶端的事件 * 1.第一個引數eventName需要與客戶端的事件要一致 * 2.第二個引數eventClase是傳輸的資料型別 * 3.第三個引數listener是用於接收客戶端傳的資料,資料型別需要與eventClass一致 */ server.addEventListener("login", LoginRequest.class, new DataListener<LoginRequest>() { @Override public void onData(SocketIOClient client, LoginRequest data, AckRequest ackRequest) { logger.info("接收到客戶端login訊息:code = " + data.getCode() + ",body = " + data.getBody()); // check is ack requested by client, but it's not required check if (ackRequest.isAckRequested()) { // send ack response with data to client ackRequest.sendAckData("已成功收到客戶端登入請求", "yeah"); } // 向客戶端傳送訊息 List<String> list = new ArrayList<>(); list.add("登入成功,sessionId=" + client.getSessionId()); // 第一個引數必須與eventName一致,第二個引數data必須與eventClass一致 String room = client.getHandshakeData().getSingleUrlParam("appid"); server.getRoomOperations(room).sendEvent("login", list.toString()); } }); //啟動服務 server.start(); } }
老規矩,先上程式碼爽爽
Android客戶端
public class SocketClient { private static Socket socket; private static final Logger logger = LoggerFactory.getLogger(SocketClient.class); public static void main(String[] args) throws URISyntaxException { IO.Options options = new IO.Options(); options.transports = new String[]{"websocket"}; options.reconnectionAttempts = 2; // 重連嘗試次數 options.reconnectionDelay = 1000; // 失敗重連的時間間隔(ms) options.timeout = 20000; // 連線超時時間(ms) options.forceNew = true; options.query = "username=test1&password=test1&appid=com.xncoding.apay2"; socket = IO.socket("http://localhost:9092/", options); socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { @Override public void call(Object... args) { // 客戶端一旦連線成功,開始發起登入請求 LoginRequest message = new LoginRequest(12, "這是客戶端訊息體"); socket.emit("login", JsonConverter.objectToJSONObject(message), (Ack) args1 -> { logger.info("回執訊息=" + Arrays.stream(args1).map(Object::toString).collect(Collectors.joining(","))); }); } }).on("login", new Emitter.Listener() { @Override public void call(Object... args) { logger.info("接受到伺服器房間廣播的登入訊息:" + Arrays.toString(args)); } }).on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() { @Override public void call(Object... args) { logger.info("Socket.EVENT_CONNECT_ERROR"); socket.disconnect(); } }).on(Socket.EVENT_CONNECT_TIMEOUT, new Emitter.Listener() { @Override public void call(Object... args) { logger.info("Socket.EVENT_CONNECT_TIMEOUT"); socket.disconnect(); } }).on(Socket.EVENT_PING, new Emitter.Listener() { @Override public void call(Object... args) { logger.info("Socket.EVENT_PING"); } }).on(Socket.EVENT_PONG, new Emitter.Listener() { @Override public void call(Object... args) { logger.info("Socket.EVENT_PONG"); } }).on(Socket.EVENT_MESSAGE, new Emitter.Listener() { @Override public void call(Object... args) { logger.info("-----------接受到訊息啦--------" + Arrays.toString(args)); } }).on(Socket.EVENT_DISCONNECT, new Emitter.Listener() { @Override public void call(Object... args) { logger.info("客戶端斷開連線啦。。。"); socket.disconnect(); } }); socket.connect(); } }
關於心跳機制
根據 Socket.IO文件 解釋, 客戶端會定期傳送心跳包,並觸發一個ping事件和一個pong事件,如下:
ping
Fired when a ping packet is written out to the server.pong
Fired when a pong is received from the server. Parameters:Number
number of ms elapsed sinceping
packet (i.e.: latency)
這裡最重要的兩個伺服器引數如下:
- pingTimeout (Number): how many ms without a pong packet to consider the connection closed (60000)
- pingInterval (Number): how many ms before sending a new ping packet (25000).
也就是說握手協議的時候,客戶端從伺服器拿到這兩個引數,一個是ping訊息的傳送間隔時間,一個是從伺服器返回pong訊息的超時時間, 客戶端會在超時後斷開連線。心跳包傳送方向是客戶端向伺服器端傳送,以維持線上狀態。
關於斷線和超時
關閉瀏覽器、直接關閉客戶端程式、kill程序、主動執行disconnect方法都會導致立刻產生斷線事件。 而客戶端把網路斷開,伺服器端在 pingTimeout
ms後產生斷線事件、客戶端在 pingTimeout
ms後也產生斷線事件。
實際上,超時後會產生一個斷線事件,叫”disconnect”。客戶端和伺服器端都可以對這個事件作出應答,釋放連線。
客戶端程式碼:
.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
logger.info("客戶端斷開連線啦。。。");
socket.disconnect();
}
});
下面是客戶端日誌:連上伺服器後,斷開網路。超過了心跳超時時間後,產生斷線事件。如果客戶端不主動斷開連線的話,會自動重連, 這時候發現連線不上,又產生連線錯誤事件,然後重試2次,都失敗後自動斷開連線了。
1 2 3 4 5 |
SocketClient - 回執訊息=伺服器已成功收到客戶端登入請求,yeah SocketClient - Socket.EVENT_PING SocketClient - Socket.EVENT_PONG SocketClient - 客戶端斷開連線啦。。。 SocketClient - Socket.EVENT_CONNECT_ERROR |
伺服器端程式碼:
server.addDisconnectListener(new DisconnectListener() {
@Override
public void onDisconnect(SocketIOClient client) {
System.out.println("伺服器收到斷線通知... sessionId=" + client.getSessionId());
}
});
伺服器邏輯是,如果在心跳超時後,就直接斷開這個連線,並且產生一個斷開連線事件。
伺服器通過netty處理心跳包ping/pong的日誌如下:
1 2 3 |
WebSocket08FrameDecoder - Decoding WebSocket Frame opCode=1 WebSocket08FrameDecoder - Decoding WebSocket Frame length=1 WebSocket08FrameEncoder - Encoding WebSocket Frame opCode=1 length=1 |
瀏覽器客戶端演示
對於netty-socketio
有一個demo工程,裡面通過一個網頁的聊天小程式演示了各種使用方法。
SpringBoot整合
最後重點講一下如何在SpringBoot中整合。
修改配置
首先maven依賴之前已經講過了,先修改下application.yml
配置檔案來配置下幾個引數,比如主機、埠、心跳時間等等。
1 2 3 4 |
################### 自定義專案配置 ################### xncoding: socket-hostname: localhost socket-port: 9096 |
新增Bean配置
然後增加一個SocketServer的Bean配置:
@Configuration
public class NettySocketConfig {
@Resource
private MyProperties myProperties;
@Resource
private ApiService apiService;
@Resource
private ManagerInfoService managerInfoService;
private static final Logger logger = LoggerFactory.getLogger(NettySocketConfig.class);
@Bean
public SocketIOServer socketIOServer() {
/*
* 建立Socket,並設定監聽埠
*/
com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
// 設定主機名,預設是0.0.0.0
// config.setHostname("localhost");
// 設定監聽埠
config.setPort(9096);
// 協議升級超時時間(毫秒),預設10000。HTTP握手升級為ws協議超時時間
config.setUpgradeTimeout(10000);
// Ping訊息間隔(毫秒),預設25000。客戶端向伺服器傳送一條心跳訊息間隔
config.setPingInterval(60000);
// Ping訊息超時時間(毫秒),預設60000,這個時間間隔內沒有接收到心跳訊息就會發送超時事件
config.setPingTimeout(180000);
// 這個版本0.9.0不能處理好namespace和query引數的問題。所以為了做認證必須使用全域性預設名稱空間
config.setAuthorizationListener(new AuthorizationListener() {
@Override
public boolean isAuthorized(HandshakeData data) {
// 可以使用如下程式碼獲取使用者密碼資訊
String username = data.getSingleUrlParam("username");
String password = data.getSingleUrlParam("password");
logger.info("連線引數:username=" + username + ",password=" + password);
ManagerInfo managerInfo = managerInfoService.findByUsername(username);
// MD5鹽
String salt = managerInfo.getSalt();
String encodedPassword = ShiroKit.md5(password, username + salt);
// 如果認證不通過會返回一個Socket.EVENT_CONNECT_ERROR事件
return encodedPassword.equals(managerInfo.getPassword());
}
});
final SocketIOServer server = new SocketIOServer(config);
return server;
}
@Bean
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
return new SpringAnnotationScanner(socketServer);
}
}
後面還有個SpringAnnotationScanner
的定義不能忘記。注意,我在AuthorizationListener
裡面通過呼叫service做了使用者名稱和密碼的認證。通過註解方式可以注入service, 執行相應的連線授權動作。
新增訊息結構類
預先定義好客戶端和伺服器端直接傳遞的訊息型別,使用簡單的JavaBean即可,比如
public class ReportParam {
/**
* IMEI碼
*/
private String imei;
/**
* 位置
*/
private String location;
public String getImei() {
return imei;
}
public void setImei(String imei) {
this.imei = imei;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
}
這裡才是最核心的介面處理類,所有介面處理邏輯都應該寫在這裡面,我只舉了一個例子,就是POS上傳位置介面:
新增訊息處理類
/**
* 訊息事件處理器
*
* @version 1.0
* @since 2018/1015
*/
@Component
public class MessageEventHandler {
private final SocketIOServer server;
private final ApiService apiService;
private static final Logger logger = LoggerFactory.getLogger(MessageEventHandler.class);
@Autowired
public MessageEventHandler(SocketIOServer server, ApiService apiService) {
this.server = server;
this.apiService = apiService;
}
//新增connect事件,當客戶端發起連線時呼叫
@OnConnect
public void onConnect(SocketIOClient client) {
if (client != null) {
String imei = client.getHandshakeData().getSingleUrlParam("imei");
String applicationId = client.getHandshakeData().getSingleUrlParam("appid");
logger.info("連線成功, applicationId=" + applicationId + ", imei=" + imei +
", sessionId=" + client.getSessionId().toString() );
client.joinRoom(applicationId);
// 更新POS監控狀態為線上
ReportParam param = new ReportParam();
param.setImei(imei);
apiService.updateJustState(param, client.getSessionId().toString(), 1);
} else {
logger.error("客戶端為空");
}
}
//新增@OnDisconnect事件,客戶端斷開連線時呼叫,重新整理客戶端資訊
@OnDisconnect
public void onDisconnect(SocketIOClient client) {
String imei = client.getHandshakeData().getSingleUrlParam("imei");
logger.info("客戶端斷開連線, imei=" + imei + ", sessionId=" + client.getSessionId().toString());
// 更新POS監控狀態為離線
ReportParam param = new ReportParam();
param.setImei(imei);
apiService.updateJustState(param, "", 2);
client.disconnect();
}
// 訊息接收入口
@OnEvent(value = Socket.EVENT_MESSAGE)
public void onEvent(SocketIOClient client, AckRequest ackRequest, Object data) {
logger.info("接收到客戶端訊息");
if (ackRequest.isAckRequested()) {
// send ack response with data to client
ackRequest.sendAckData("伺服器回答Socket.EVENT_MESSAGE", "好的");
}
}
// 廣播訊息接收入口
@OnEvent(value = "broadcast")
public void onBroadcast(SocketIOClient client, AckRequest ackRequest, Object data) {
logger.info("接收到廣播訊息");
// 房間廣播訊息
String room = client.getHandshakeData().getSingleUrlParam("appid");
server.getRoomOperations(room).sendEvent("broadcast", "廣播啦啦啦啦");
}
/**
* 報告地址介面
* @param client 客戶端
* @param ackRequest 回執訊息
* @param param 報告地址引數
*/
@OnEvent(value = "doReport")
public void onDoReport(SocketIOClient client, AckRequest ackRequest, ReportParam param) {
logger.info("報告地址介面 start....");
BaseResponse result = postReport(param);
ackRequest.sendAckData(result);
}
/*----------------------------------------下面是私有方法-------------------------------------*/
private BaseResponse postReport(ReportParam param) {
BaseResponse result = new BaseResponse();
int r = apiService.report(param);
if (r > 0) {
result.setSuccess(true);
result.setMsg("報告地址成功");
} else {
result.setSuccess(false);
result.setMsg("該POS機還沒有入網,報告地址失敗。");
}
return result;
}
}
還有一個步驟就是新增啟動器,在SpringBoot啟動之後立馬執行:
新增ServerRunner
/**
* SpringBoot啟動之後執行
*
*
* @version 1.0
* @since 2018/10/15
*/
@Component
@Order(1)
public class ServerRunner implements CommandLineRunner {
private final SocketIOServer server;
private static final Logger logger = LoggerFactory.getLogger(ServerRunner.class);
@Autowired
public ServerRunner(SocketIOServer server) {
this.server = server;
}
@Override
public void run(String... args) throws Exception {
logger.info("ServerRunner 開始啟動啦...");
server.start();
}
}
要實現通過域名並走標準80或443埠的話,最好使用nginx做反向代理,跟正常的http反向代理基本一致, 不過websocket需要增加一個upgrade的配置。
nginx反向代理
下面我以一個實際使用例子來說明如何配置nginx的https訪問websocket,並且開啟301自動http跳轉https。
首先要有一個域名,比如test.enzhico.net
,然後申請letsencrypt的免費證書,這個過程我不講了
配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
map $http_upgrade $connection_upgrade { default upgrade; '' close; } server { server_name test.enzhico.net; location / { proxy_pass http://localhost:9096; proxy_read_timeout 300s; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; } #root /opt/www/test.enzhico.net; #index index.html index.htm; error_page 404 /404.html; location = /40x.html { } error_page 500 502 503 504 /50x.html; location = /50x.html { } listen 443 ssl; # managed by Certbot ssl_certificate /etc/letsencrypt/live/test.enzhico.net/fullchain.pem; # managed by Certbot ssl_certificate_key /etc/letsencrypt/live/test.enzhico.net/privkey.pem; # managed by Certbot include /etc/letsencrypt/options-ssl-nginx.conf; # managed by Certbot ssl_dhparam /etc/letsencrypt/ssl-dhparams.pem; # managed by Certbot } server { listen 80; server_name test.enzhico.net; return 301 https://$host$request_uri; # managed by Certbot } |
注意這其中和普通HTTP代理的關鍵不同是:
1 2 |
proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; |