1. 程式人生 > 其它 >基於 springboot websocket 的分散式群聊實現

基於 springboot websocket 的分散式群聊實現

技術標籤:Javajavarediswebsocketspring boot分散式

基於 springboot websocket 的分散式群聊實現

倉庫地址

https://github.com/yemingfeng/jchat-server

功能列表

  • 分散式
  • 同一帳號多裝置登入
  • 群聊
  • 多裝置
  • 簡單鑑權
  • 心跳檢查

依賴

  • maven
  • jdk11
  • redis

redis 配置

redis 預設使用 localhost:6379。如果需要修改 host:port,可以修改 application.yml


redis 僅僅用於儲存使用者 username / password

原始碼分析

Auth 過程
String username =
((ServletServerHttpRequest) request) .getServletRequest().getParameter(USERNAME); String password = ((ServletServerHttpRequest) request) .getServletRequest().getParameter(PASSWORD); User user = userService.register(username, password); // 將 user 設定到 attributes 中 attributes.put(USER, user); return
true;
連線建立
// 1. 新增 session
sessionService.add(session);
// 2. 按 username + sessionId 生成 redis key,並進行訂閱,這樣做可以支援多裝置同一個帳號登入
MessageListener messageListener = (message, pattern) -> {
  log.info("Redis sub receive: [{}]", new String(message.getBody()));
  try {
    session.sendMessage(new TextMessage
(message.getBody())); } catch (IOException e) { log.error("", e); } }; redisMessageListenerContainer.addMessageListener(messageListener, new ChannelTopic(genSubKey(session))); // 由於 session 會斷開,需要儲存下來,以待 removeListener messageListenerMap.put(session, messageListener);
監聽訊息
// 獲取所有線上的 session,然後通過 redis pub 功能轉發訊息
sessionService.getSessions()
  .forEach(session -> pubSubService.pub(session, textMessage));
心跳檢查
// 核心邏輯是使用分桶策略,一共有 10 個桶,每個桶有對應的定時任務和延遲佇列
// 延遲佇列使用了 HeartbeatSessionTask, 其中 HeartbeatSessionTask 封裝了 session 和對應的過期時間

static final int BUCKET_SIZE = 10;

// 每個 session 進入桶時,會根據 sessionId.hashCode() & BUCKET_SIZE 選擇桶
bucket[Math.abs(session.getId().hashCode() % BUCKET_SIZE)].add(new HeartbeatSessionTask(session));

// 啟動每個桶的定時任務
for (int i = 0; i < BUCKET_SIZE; i++) {
  bucket[i] = new DelayQueue<>();
  executors[i] = Executors.newSingleThreadExecutor();
  int index = i;

  executors[index].submit(() -> {
    while (true) {
      try {
        HeartbeatSessionTask task;
        while ((task = bucket[index].poll()) != null) {
          task.getSession().close();
          log.warn("[{}] is dead, so close", SessionUtil.getUsernameFromSession(task.session));
        }
      } catch (Exception e) {
        log.error("", e);
      }
      Thread.sleep(TimeUnit.SECONDS.toMillis(1));
    }
  });
}

使用

服務端啟動

啟動後,會監聽 localhost:8080 埠


其中,websocket url 為 ws:localhost:8080/ws


獲取線上使用者數介面為 http://localhost:8080/session/page

shell 測試

使用 wscat 測試

wscat -c 'ws://localhost:8080/ws?username=aiden&password=123'
前端測試

由於有簡單的帳號體系,連結時需要制定 username / password,若 username 不存在,則直接註冊成功;否則會判斷 username / password 是否匹配


如 ws://localhost:8080/ws?username=aiden&password=123 才能進行連線
圖示:
在這裡插入圖片描述