Springboot整合WebSocket和RabbitMQ實現伺服器訊息推送
阿新 • • 發佈:2021-06-29
這裡只實現伺服器端WebScket到訊息中介軟體RabbitMQ部分,前端程式碼不會。前端跟中介軟體互動部分的功能(向中介軟體傳送訊息、從中介軟體讀取訊息)用介面代替
實現思路
前端發起請求與伺服器建立連線 ->
專案結構
配置RabbitMQ
#配置rabbitmq的基本資訊 : ip 埠 賬號和密碼
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
寫一個RabbitMQ的配置類
package com.example.demo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author lyd * @Description: 配置RabbitMQ,建立一個交換機、一個佇列 * @date 14:46 */ @Configuration public class AppConfig { public static final String ROUTING_KEY = "rabbit.msg"; public static final String DIRECT_EXCHANGE = "directexchange"; public static final String DIRECT_QUEUE = "directqueue"; @Bean public Queue directQueue() { return new Queue(DIRECT_QUEUE); } @Bean public DirectExchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE); } @Bean public Binding binding() { return BindingBuilder.bind(directQueue()).to(directExchange()).with(ROUTING_KEY); } }
再寫一個WebSocket啟動類,開啟WebSocket支援
package com.example.demo.config; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * @author lyd * @Description: 開啟WebSocket支援 * @date 15:43 */ @Configuration @Component public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); } }
實現WebSocket的核心類,通過@OnOpen
、@OnClose
、@OnMessag
、@OnError
四個註解實現四個核心的方法
package com.example.demo.websocket; import com.example.demo.direct.DirectSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; @ServerEndpoint(value = "/webSocket") @Component public class WebSocketServer { /** * 存放每個客戶端對應的WebSocket物件 */ public static CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<WebSocketServer>(); @Autowired static DirectSender directSender; /** * 連線建立成功呼叫的方法 */ @OnOpen public void onOpen() throws InterruptedException, IOException { webSockets.add(this); System.out.println("有新使用者加入"); } @OnClose public void onClose() throws IOException { webSockets.remove(this); System.out.println("有使用者離開"); } /** * 收到客戶端訊息後呼叫的方法 */ @OnMessage public void onMessage(String msg) throws InterruptedException { System.out.println("從客戶端接受的訊息: " + msg); } @OnError public void onError(Throwable error) { error.printStackTrace(); } }
定義一個訊息傳送類,實現一個通過WebSocket向佇列傳送訊息的方法
package com.example.demo.direct;
import com.example.demo.config.AppConfig;
import com.example.demo.websocket.WebSocketServer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author lyd
* @Description:
* @date 14:50
*/
@Component
public class DirectSender {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 向RabbitMQ佇列中傳送訊息,方便後面客戶端可以從佇列中讀取該訊息
*
* 也可以用來代替客戶端向佇列中傳送訊息,我不會寫前端連線rabbitmq的程式碼,就用這個介面代替了。或者在RabbitMQ的管理面板中手動輸入資料
* @param msg
*/
public void sendDirect(String msg) {
for (WebSocketServer webSocketServer : WebSocketServer.webSockets) {
rabbitTemplate.convertAndSend(AppConfig.DIRECT_EXCHANGE, AppConfig.ROUTING_KEY, msg +" ("+webSocketServer.toString()+")");
}
}
}
定義一個訊息接收類,實現一個通過WebSocket從佇列中讀取訊息的方法
package com.example.demo.direct;
import com.example.demo.config.AppConfig;
import com.example.demo.websocket.WebSocketServer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author lyd
* @Description: 監聽RabbitMQ中佇列訊息
* @date 15:16
*/
@Component
public class DirectReceive {
/**
* 監聽客戶端傳送到RabbitMQ佇列中的訊息,並把訊息傳送給WebSocketServer
* @param msg
* @throws InterruptedException
* @throws IOException
*/
@RabbitListener(queues = AppConfig.DIRECT_QUEUE)
@RabbitHandler
public void processToPre(String msg) throws InterruptedException, IOException {
Thread.sleep(500);
for (WebSocketServer webSocketServer : WebSocketServer.webSockets) {
System.out.println("WebSocket從佇列中取出客戶端("+webSocketServer.toString()+")傳送過來的訊息:");
webSocketServer.onMessage(msg);
}
}
}
再寫一個傳送訊息的介面,方便測試用
package com.example.demo.controller;
import com.example.demo.direct.DirectSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
/**
* @author lyd
* @Description:
* @date 14:49
*/
@RestController
public class MessageController {
@Autowired
DirectSender directSender;
/**
* 介面:
* 呼叫向佇列中傳送訊息的方法
*/
@RequestMapping("senderMsg")
@ResponseBody
public void senderMsg() {
directSender.sendDirect("我是向佇列中儲存的訊息");
}
}
最後寫一個html頁面,用於前端跟客戶端建立連線
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
</head>
<body>
</body>
<script type="text/javascript">
var websocket = null;
if ('WebSocket' in window) {
websocket = new WebSocket('ws://localhost:8080/webSocket');
} else {
alert('該瀏覽器不支援websocket');
}
websocket.onopen = function (e) {
console.log('websocket建立連線');
websocket.send('websocket建立連線');
}
websocket.onclose = function (e) {
console.log('websocket關閉連線');
}
websocket.onmessage = function (e) {
console.log(e, 'websocket收到訊息');
document.getElementById('msgs').innerHTML = document.getElementById('msgs').innerHTML + '<br/>' + e.data;
}
websocket.onerror = function (event) {
console.log('websocket通訊發生錯誤');
}
window.onbeforeunload = function (event) {
websocket.close();
}
</script>
</html>
測試
1)啟動專案後訪問http://localhost:8080/chat.html
,客戶端跟服務端建立連線
2) 呼叫http://localhost:8080/senderMsg
,WebSocket向RabbitMQ佇列中傳送訊息,這一步也同時實現了服務端監聽佇列訊息並把訊息發給WebSocket