1. 程式人生 > >續 Springboot+WebSocket+RabbitMQ

續 Springboot+WebSocket+RabbitMQ

在上一篇文章《Springboot+WebSocket+RabbitMQ》中有個漏洞,今天給大家修復一下,上一篇文章因為博主的業務需求需要,本人將websocket的服務端和rabbitmq的監聽消費端整合在了一起,結果出來了一個意想不到的結果,程式後臺控制檯一直不停地列印訊息,重複地接受前臺和佇列發來的訊息,後來經過仔細研究,應該是監聽佇列訊息與接受前臺訊息衝突了,後來我就把websocket前臺與後臺建立連線發訊息時作了非空判斷,然後將以前整合好的拆分開來,具體程式碼如下:

websocket前臺頁面:

//將訊息顯示在網頁上
function setMessageInnerHTML(innerHTML){
    console.log(innerHTML);
    if(innerHTML!=null||innerHTML!=""){
        alert(innerHTML);
    }

}
//關閉連線
function closeWebSocket(){
    websocket.close();
}
//傳送訊息
function send(){
    var message = document.getElementById('text').value;
    //websocket.send(message);
}

分離後的websocket服務端websocketserver

package com.iecas.controller;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

import org.springframework.stereotype.Controller;

/**
 * Created by qs5 on 2018/10/11.
 * @author wangzhen
 * @version 1.0
 */
@Slf4j
@ServerEndpoint(value = "/webSocket")
@Controller
public class WebSocketServer {
    private static Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
    //靜態變數,用來記錄當前線上連線數。應該把它設計成執行緒安全的。
    private static int onlineCount = 0;
    //concurrent包的執行緒安全Set,用來存放每個客戶端對應的MyWebSocket物件。
    public static CopyOnWriteArraySet<WebSocketServer> wsClientMap = new CopyOnWriteArraySet<WebSocketServer>();
    //與某個客戶端的連線會話,需要通過它來給客戶端傳送資料
    private Session session;

    /**
     * 連線建立成功呼叫的方法
     * @param session 當前會話session
     */
    @OnOpen
    public void onOpen (Session session){
        this.session = session;
        wsClientMap.add(this);
        addOnlineCount();
        logger.info(session.getId()+"有新連結加入,當前連結數為:" + wsClientMap.size());
    }
    /**
     * 連線關閉
     */
    @OnClose
    public void onClose (){
        wsClientMap.remove(this);
        subOnlineCount();
        logger.info("有一連結關閉,當前連結數為:" + wsClientMap.size());
    }
    /**
     * 收到客戶端訊息
     * @param message 客戶端傳送過來的訊息
     * @param session 當前會話session
     * @throws IOException
     */
    @OnMessage
    public void onMessage (String message, Session session)  {
        logger.info("客戶端傳送過來的訊息:" + message);
        for (WebSocketServer webSocketServer : wsClientMap) {
            try{
                webSocketServer.sendMessage(message);
            } catch (Exception e){

                e.printStackTrace();
                continue;
            }

        }

    }
    /**
     * 發生錯誤
     */
    @OnError
    public void onError(Session session, Throwable error) {
        logger.info("wsClientMap發生錯誤!");
        error.printStackTrace();
    }

    /**
     * 給所有客戶端群發訊息
     * @param message 訊息內容
     * @throws IOException
     */
   /* public void sendMsgToAll(String message) throws IOException {
        for ( WebSocketServer item : wsClientMap ){
            item.session.getBasicRemote().sendText(message);
        }
        logger.info("成功群送一條訊息:" + wsClientMap.size());
    }*/
    //實現伺服器主動推送
    public void sendMessage (String message) throws IOException {
        this.session.getBasicRemote().sendText(message);

        logger.info("成功傳送一條訊息:" + message);
    }

    public static synchronized  int getOnlineCount (){
        return onlineCount;
    }

    public static synchronized void addOnlineCount (){
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount (){
        WebSocketServer.onlineCount--;
    }
}

分離後的監聽消費端MsgReceiverController

package com.iecas.controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import java.io.IOException;

import org.springframework.stereotype.Controller;


/**
 * Created by qs5 on 2018/11/11.
 * @author wangzhen
 */
@RabbitListener(queues = "task_REPqueue")
@Controller
public class MsgReceiverController {
    private static Logger logger = LoggerFactory.getLogger(MsgReceiverController.class);
    //右鍵監聽結果
    @RabbitHandler
    public void receiver(String body)throws IOException{
        logger.info("<=============監聽到task_REPqueued佇列訊息============>"+body);
        if(body!=null||body!=""){

            for ( WebSocketServer item : WebSocketServer.wsClientMap ){

                item.sendMessage(body);
            }
        }
        System.out.println("===>"+body);
    }
}

這樣控制檯迴圈列印異常訊息的問題就解決了,至此終結,如果有幫到各位,可以關注博主,不定期更新實用乾貨給大家,當然本人也有可能會佛系更新,反正不一定啦。。。
最後送給大家一句微語:
對自己好,就是對生活認真。時間不多,你要盡力而為。時間很久,你會水到渠成。
在這裡插入圖片描述