1. 程式人生 > 實用技巧 >SocketIO實現訊息推送

SocketIO實現訊息推送

Springboot + SocketIO實現點贊、關注、評論等即時訊息推送(Java後臺程式碼)

pom.xml

<!-- 集中定義依賴版本號 -->
<properties>
    <!-- Tools Settings -->
    <hutool.version>5.3.8</hutool.version>
    
    <!-- Netty Setting -->
    <netty-all.version>4.1.50.Final</netty-all.version>
    <netty-socketio.version>1.7.17</netty-socketio.version>
</properties>

<dependencies>
    <!-- Tools Begin -->
    <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
    </dependency>
    <dependency>
       <groupId>cn.hutool</groupId>
       <artifactId>hutool-all</artifactId>
       <version>${hutool.version}</version>
    </dependency>
    <!-- Tools End -->

    <!-- SocketIO Begin -->
    <dependency>
        <groupId>com.corundumstudio.socketio</groupId>
        <artifactId>netty-socketio</artifactId>
        <version>${netty-socketio.version}</version>
    </dependency>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>${netty-all.version}</version>
    </dependency>
    <!-- SocketIO End -->
  </dependencies>

application.yml

socketIO:
  # SocketIO埠
  port: 9090
  # 連線數大小
  workCount: 100
  # 允許客戶請求
  allowCustomRequests: true
  # 協議升級超時時間(毫秒),預設10秒,HTTP握手升級為ws協議超時時間
  upgradeTimeout: 10000
  # Ping訊息超時時間(毫秒),預設60秒,這個時間間隔內沒有接收到心跳訊息就會發送超時事件
  pingTimeout: 60000
  # Ping訊息間隔(毫秒),預設25秒。客戶端向伺服器傳送一條心跳訊息間隔
  pingInterval: 25000
  # 設定HTTP互動最大內容長度
  maxHttpContentLength: 1048576
  # 設定最大每幀處理資料的長度,防止他人利用大資料來攻擊伺服器
  maxFramePayloadLength: 1048576

SocketConfig

import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.SpringAnnotationScanner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Socket配置
 *
 * @author bai
 * @since 2020/7/20
 */
@Configuration
public class SocketConfig {

    @Value("${socketIO.port}")
    private Integer port;

    @Value("${socketIO.workCount}")
    private int workCount;

    @Value("${socketIO.allowCustomRequests}")
    private boolean allowCustomRequests;

    @Value("${socketIO.upgradeTimeout}")
    private int upgradeTimeout;

    @Value("${socketIO.pingTimeout}")
    private int pingTimeout;

    @Value("${socketIO.pingInterval}")
    private int pingInterval;

    @Value("${socketIO.maxFramePayloadLength}")
    private int maxFramePayloadLength;

    @Value("${socketIO.maxHttpContentLength}")
    private int maxHttpContentLength;


    /**
     * SocketIOServer 配置
     *
     * @return {@link SocketIOServer}
     * @author bai
     * @since 2020/7/20
     */
    @Bean("socketIOServer")
    public SocketIOServer socketIoServer() {
        com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
        // 配置埠
        config.setPort(port);
        // 開啟Socket埠複用
        com.corundumstudio.socketio.SocketConfig socketConfig = new com.corundumstudio.socketio.SocketConfig();
        socketConfig.setReuseAddress(true);
        config.setSocketConfig(socketConfig);
        // 連線數大小
        config.setWorkerThreads(workCount);
        // 允許客戶請求
        config.setAllowCustomRequests(allowCustomRequests);
        // 協議升級超時時間(毫秒),預設10秒,HTTP握手升級為ws協議超時時間
        config.setUpgradeTimeout(upgradeTimeout);
        // Ping訊息超時時間(毫秒),預設60秒,這個時間間隔內沒有接收到心跳訊息就會發送超時事件
        config.setPingTimeout(pingTimeout);
        // Ping訊息間隔(毫秒),預設25秒。客戶端向伺服器傳送一條心跳訊息間隔
        config.setPingInterval(pingInterval);
        // 設定HTTP互動最大內容長度
        config.setMaxHttpContentLength(maxHttpContentLength);
        // 設定最大每幀處理資料的長度,防止他人利用大資料來攻擊伺服器
        config.setMaxFramePayloadLength(maxFramePayloadLength);
        return new SocketIOServer(config);
    }


    /**
     * 開啟 SocketIOServer 註解支援
     *
     * @param socketServer socketServer
     * @return {@link SpringAnnotationScanner}
     * @author bai
     * @since 2020/7/20
     */
    @Bean
    public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
        return new SpringAnnotationScanner(socketServer);
    }
}

SocketServer

import ch.qos.logback.core.net.server.ServerRunner;
import com.corundumstudio.socketio.SocketIOServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

/**
 * SpringBoot啟動之後執行
 *
 * @author bai
 * @since 2020/7/20
 */
@Component
@Order(1)
public class SocketServer implements CommandLineRunner {

    /**
     * logger
     */
    private static final Logger logger = LoggerFactory.getLogger(ServerRunner.class);

    /**
     * socketIOServer
     */
    private final SocketIOServer socketIOServer;

    @Autowired
    public SocketServer(SocketIOServer server) {
        this.socketIOServer = server;
    }

    @Override
    public void run(String... args) {
        logger.info("---------- NettySocket通知服務開始啟動 ----------");
        socketIOServer.start();
        logger.info("---------- NettySocket通知服務啟動成功 ----------");
    }
}

MessageDTO

import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
 * 訊息資料傳輸物件
 *
 * @author bai
 * @since 2020/7/20
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MessageDTO implements Serializable {

    /**
     * 登入使用者 Id
     */
    @ApiModelProperty(value = "登入使用者Id", name = "userId")
    private String userId;

    /**
     * 接收訊息使用者 Id
     */
    @ApiModelProperty(value = "接收訊息使用者Id", name = "toUserId")
    private String toUserId;

    /**
     * 被操作物件 Id
     */
    @ApiModelProperty(value = "被操作物件Id", name = "beOperatedId")
    private String beOperatedId;

    /**
     * 訊息型別
     */
    @ApiModelProperty(value = "訊息型別", name = "msgType")
    private String msgType;
}

MsgTypeEnum

/**
 * 訊息型別
 *
 * @author bai
 * @since 2020/7/20
 */
public enum MsgTypeEnum {

    /**
     * 關注
     */
    FOLLOW("follow"),

    /**
     * 認同
     */
    LIKE("like"),

    /**
     * 評論
     */
    COMMENT("comment");

    private String value;

    MsgTypeEnum(String type) {
        value = type;
    }

    public String getValue() {
        return value;
    }
}

MsgStatusEnum

/**
 * 訊息狀態
 *
 * @author bai
 * @since 2020/7/20
 */
public enum MsgStatusEnum {

    /**
     * 上線
     */
    ONLINE("上線"),

    /**
     * 下線
     */
    OFFLINE("下線");

    private String value;

    MsgStatusEnum(String type) {
        value = type;
    }

    public String getValue() {
        return value;
    }
}

SocketHandler

import cn.hutool.core.util.StrUtil;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import com.tbsc.pojo.dto.MessageDTO;
import com.tbsc.service.UserService;
import com.tbsc.utils.enums.MsgStatusEnum;
import com.tbsc.utils.enums.MsgTypeEnum;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Socket 處理器
 *
 * @author bai
 * @since 2020/7/20
 */
@Component
public class SocketHandler {

    /**
     * ConcurrentHashMap 儲存當前 SocketServer 使用者 Id 對應關係
     */
    private Map<String, UUID> clientMap = new ConcurrentHashMap<>(16);

    /**
     * socketIOServer
     */
    private final SocketIOServer socketIOServer;

    @Autowired
    public SocketHandler(SocketIOServer server) {
        this.socketIOServer = server;
    }

    @Resource
    private UserService userService;

    /**
     * 當客戶端發起連線時呼叫
     *
     * @param socketClient socketClient
     * @author bai
     * @since 2020/7/20
     */
    @OnConnect
    public void onConnect(SocketIOClient socketClient) {
        String userId = socketClient.getHandshakeData().getSingleUrlParam("userId");
        if (StrUtil.isNotBlank(userId)) {
            if (userService.queryUserById(userId) != null) {
                System.out.println("使用者{" + userId + "}開啟長連線通知, NettySocketSessionId: {"
                        + socketClient.getSessionId().toString() + "},NettySocketRemoteAddress: {"
                        + socketClient.getRemoteAddress().toString() + "}");
                // 儲存
                clientMap.put(userId, socketClient.getSessionId());
                // 傳送上線通知
                this.sendNotice(new MessageDTO(userId, null, null, MsgStatusEnum.ONLINE.getValue()));
            }
        }
    }


    /**
     * 客戶端斷開連線時呼叫,重新整理客戶端資訊
     *
     * @param socketClient socketClient
     * @author bai
     * @since 2020/7/20
     */
    @OnDisconnect
    public void onDisConnect(SocketIOClient socketClient) {
        String userId = socketClient.getHandshakeData().getSingleUrlParam("userId");
        if (StrUtil.isNotBlank(userId)) {
            System.out.println("使用者{" + userId + "}斷開長連線通知, NettySocketSessionId: {"
                    + socketClient.getSessionId().toString() + "},NettySocketRemoteAddress: {"
                    + socketClient.getRemoteAddress().toString() + "}");
            // 移除
            clientMap.remove(userId);
            // 傳送下線通知
            this.sendNotice(new MessageDTO(userId, null, null, MsgStatusEnum.OFFLINE.getValue()));
        }
    }


    /**
     * 傳送上下線通知
     *
     * @param messageDTO messageDTO
     * @author bai
     * @since 2020/7/20
     */
    private void sendNotice(MessageDTO messageDTO) {
        if (messageDTO != null) {
            // 全部發送
            clientMap.forEach((key, value) -> {
                if (value != null) {
                    socketIOServer.getClient(value).sendEvent("receiveMsg", messageDTO);
                }
            });
        }
    }


    /**
     * sendMsg:   接收前端訊息,方法名需與前端一致
     * receiveMsg:前端接收後端傳送資料的方法,方法名需與前端一致
     *
     * @param socketClient socketClient
     * @param messageDTO messageDTO
     * @author bai
     * @since 2020/7/20
     */
    @OnEvent("sendMsg")
    public void sendMsg(SocketIOClient socketClient, MessageDTO messageDTO) {

        // 獲取前端傳入的接收訊息使用者 Id
        String toUserId = messageDTO.getToUserId();
        // 客戶端 SessionId
        UUID sessionId = clientMap.get(toUserId);
        // 獲取前端傳入的訊息型別
        String msgType = messageDTO.getMsgType();
        // 獲取前端傳入的當前登入使用者 Id
        String userId = messageDTO.getUserId();
        // 獲取前端傳入的被操作物件 Id
        String beOperatedId = messageDTO.getBeOperatedId();

        // 如果 SessionId 相同,表示使用者線上,傳送即時通知,使用者每次開啟 APP 都會生成新的 SessionId
        if (sessionId.equals(socketClient.getSessionId())) {
            if (msgType.equals(MsgTypeEnum.LIKE.getValue())) {
                socketIOServer.getClient(sessionId).sendEvent("receiveMsg", "你有一條認同訊息");
            } else if (msgType.equals(MsgTypeEnum.FOLLOW.getValue())) {
                socketIOServer.getClient(sessionId).sendEvent("receiveMsg", "你有一條關注訊息");
            } else if (msgType.equals(MsgTypeEnum.COMMENT.getValue())) {
                socketIOServer.getClient(sessionId).sendEvent("receiveMsg", "你有一條評論訊息");
            } else {
                System.out.println("訊息型別不匹配");
            }
        }
    }
}