1. 程式人生 > 其它 >Springboot整合WebSocket和RabbitMQ實現伺服器訊息推送

Springboot整合WebSocket和RabbitMQ實現伺服器訊息推送

這裡只實現伺服器端WebScket到訊息中介軟體RabbitMQ部分,前端程式碼不會。前端跟中介軟體互動部分的功能(向中介軟體傳送訊息、從中介軟體讀取訊息)用介面代替

實現思路

前端發起請求與伺服器建立連線 ->

專案結構

配置RabbitMQ

#配置rabbitmq的基本資訊 : ip 埠  賬號和密碼

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /

寫一個RabbitMQ的配置類

package com.example.demo.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author lyd
 * @Description: 配置RabbitMQ,建立一個交換機、一個佇列
 * @date 14:46
 */
@Configuration
public class AppConfig {

	public static final String ROUTING_KEY = "rabbit.msg";
	public static final String DIRECT_EXCHANGE = "directexchange";
	public static final String DIRECT_QUEUE = "directqueue";

	@Bean
	public Queue directQueue() {
		return new Queue(DIRECT_QUEUE);
	}

	@Bean
	public DirectExchange directExchange() {
		return new DirectExchange(DIRECT_EXCHANGE);
	}

	@Bean
	public Binding binding() {
		return BindingBuilder.bind(directQueue()).to(directExchange()).with(ROUTING_KEY);
	}


}

再寫一個WebSocket啟動類,開啟WebSocket支援

package com.example.demo.config;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author lyd
 * @Description: 開啟WebSocket支援
 * @date 15:43
 */
@Configuration
@Component
public class WebSocketConfig {

	@Bean
	public ServerEndpointExporter serverEndpointExporter(){
		return new ServerEndpointExporter();
	}

}

實現WebSocket的核心類,通過@OnOpen@OnClose@OnMessag@OnError四個註解實現四個核心的方法

package com.example.demo.websocket;

import com.example.demo.direct.DirectSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

@ServerEndpoint(value = "/webSocket")
@Component
public class WebSocketServer {

	/**
	 * 存放每個客戶端對應的WebSocket物件
	 */
	public static CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<WebSocketServer>();


	@Autowired
	static DirectSender directSender;

	/**
	 * 連線建立成功呼叫的方法
	 */
	@OnOpen
	public void onOpen() throws InterruptedException, IOException {
		webSockets.add(this);
		System.out.println("有新使用者加入");
	}

	@OnClose
	public void onClose() throws IOException {
		webSockets.remove(this);
		System.out.println("有使用者離開");
	}

	/**
	 * 收到客戶端訊息後呼叫的方法
	 */
	@OnMessage
	public void onMessage(String msg) throws InterruptedException {
		System.out.println("從客戶端接受的訊息: " + msg);
	}

	@OnError
	public void onError(Throwable error) {
		error.printStackTrace();
	}


}

定義一個訊息傳送類,實現一個通過WebSocket向佇列傳送訊息的方法

package com.example.demo.direct;

import com.example.demo.config.AppConfig;
import com.example.demo.websocket.WebSocketServer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author lyd
 * @Description:
 * @date 14:50
 */
@Component
public class DirectSender {

	@Autowired
	RabbitTemplate rabbitTemplate;

	/**
	 * 向RabbitMQ佇列中傳送訊息,方便後面客戶端可以從佇列中讀取該訊息
	 *
	 * 也可以用來代替客戶端向佇列中傳送訊息,我不會寫前端連線rabbitmq的程式碼,就用這個介面代替了。或者在RabbitMQ的管理面板中手動輸入資料
	 * @param msg
	 */
	public void sendDirect(String msg) {

		for (WebSocketServer webSocketServer : WebSocketServer.webSockets) {
			rabbitTemplate.convertAndSend(AppConfig.DIRECT_EXCHANGE, AppConfig.ROUTING_KEY, msg +" ("+webSocketServer.toString()+")");
		}

	}


}

定義一個訊息接收類,實現一個通過WebSocket從佇列中讀取訊息的方法

package com.example.demo.direct;

import com.example.demo.config.AppConfig;
import com.example.demo.websocket.WebSocketServer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author lyd
 * @Description: 監聽RabbitMQ中佇列訊息
 * @date 15:16
 */
@Component
public class DirectReceive {


	/**
	 * 監聽客戶端傳送到RabbitMQ佇列中的訊息,並把訊息傳送給WebSocketServer
	 * @param msg
	 * @throws InterruptedException
	 * @throws IOException
	 */
	@RabbitListener(queues = AppConfig.DIRECT_QUEUE)
	@RabbitHandler
	public void processToPre(String msg) throws InterruptedException, IOException {
		Thread.sleep(500);
		for (WebSocketServer webSocketServer : WebSocketServer.webSockets) {
			System.out.println("WebSocket從佇列中取出客戶端("+webSocketServer.toString()+")傳送過來的訊息:");
			webSocketServer.onMessage(msg);
		}
	}



}

再寫一個傳送訊息的介面,方便測試用

package com.example.demo.controller;

import com.example.demo.direct.DirectSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;


/**
 * @author lyd
 * @Description:
 * @date 14:49
 */
@RestController
public class MessageController {

	@Autowired
	DirectSender directSender;

	/**
	 * 介面:
	 * 呼叫向佇列中傳送訊息的方法
	 */
	@RequestMapping("senderMsg")
	@ResponseBody
	public void senderMsg() {
		directSender.sendDirect("我是向佇列中儲存的訊息");
	}


}

最後寫一個html頁面,用於前端跟客戶端建立連線

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>Insert title here</title>
</head>
<body>

</body>
<script type="text/javascript">
    var websocket = null;
    if ('WebSocket' in window) {
        websocket = new WebSocket('ws://localhost:8080/webSocket');
    } else {
        alert('該瀏覽器不支援websocket');
    }

    websocket.onopen = function (e) {
        console.log('websocket建立連線');
        websocket.send('websocket建立連線');
    }
    websocket.onclose = function (e) {
        console.log('websocket關閉連線');
    }
    websocket.onmessage = function (e) {
        console.log(e, 'websocket收到訊息');
        document.getElementById('msgs').innerHTML = document.getElementById('msgs').innerHTML + '<br/>' + e.data;
    }
    websocket.onerror = function (event) {
        console.log('websocket通訊發生錯誤');

    }
    window.onbeforeunload = function (event) {
        websocket.close();
    }

</script>
</html>

測試

1)啟動專案後訪問http://localhost:8080/chat.html,客戶端跟服務端建立連線

2) 呼叫http://localhost:8080/senderMsg,WebSocket向RabbitMQ佇列中傳送訊息,這一步也同時實現了服務端監聽佇列訊息並把訊息發給WebSocket

專案原始碼

https://github.com/Wranglery/test-rabbitMQ-websocket