SocketIO實現訊息推送
阿新 • • 發佈:2020-07-21
Springboot + SocketIO實現點贊、關注、評論等即時訊息推送(Java後臺程式碼)
pom.xml
<!-- 集中定義依賴版本號 --> <properties> <!-- Tools Settings --> <hutool.version>5.3.8</hutool.version> <!-- Netty Setting --> <netty-all.version>4.1.50.Final</netty-all.version> <netty-socketio.version>1.7.17</netty-socketio.version> </properties> <dependencies> <!-- Tools Begin --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>${hutool.version}</version> </dependency> <!-- Tools End --> <!-- SocketIO Begin --> <dependency> <groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>${netty-socketio.version}</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>${netty-all.version}</version> </dependency> <!-- SocketIO End --> </dependencies>
application.yml
socketIO: # SocketIO埠 port: 9090 # 連線數大小 workCount: 100 # 允許客戶請求 allowCustomRequests: true # 協議升級超時時間(毫秒),預設10秒,HTTP握手升級為ws協議超時時間 upgradeTimeout: 10000 # Ping訊息超時時間(毫秒),預設60秒,這個時間間隔內沒有接收到心跳訊息就會發送超時事件 pingTimeout: 60000 # Ping訊息間隔(毫秒),預設25秒。客戶端向伺服器傳送一條心跳訊息間隔 pingInterval: 25000 # 設定HTTP互動最大內容長度 maxHttpContentLength: 1048576 # 設定最大每幀處理資料的長度,防止他人利用大資料來攻擊伺服器 maxFramePayloadLength: 1048576
SocketConfig
import com.corundumstudio.socketio.SocketIOServer; import com.corundumstudio.socketio.annotation.SpringAnnotationScanner; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Socket配置 * * @author bai * @since 2020/7/20 */ @Configuration public class SocketConfig { @Value("${socketIO.port}") private Integer port; @Value("${socketIO.workCount}") private int workCount; @Value("${socketIO.allowCustomRequests}") private boolean allowCustomRequests; @Value("${socketIO.upgradeTimeout}") private int upgradeTimeout; @Value("${socketIO.pingTimeout}") private int pingTimeout; @Value("${socketIO.pingInterval}") private int pingInterval; @Value("${socketIO.maxFramePayloadLength}") private int maxFramePayloadLength; @Value("${socketIO.maxHttpContentLength}") private int maxHttpContentLength; /** * SocketIOServer 配置 * * @return {@link SocketIOServer} * @author bai * @since 2020/7/20 */ @Bean("socketIOServer") public SocketIOServer socketIoServer() { com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration(); // 配置埠 config.setPort(port); // 開啟Socket埠複用 com.corundumstudio.socketio.SocketConfig socketConfig = new com.corundumstudio.socketio.SocketConfig(); socketConfig.setReuseAddress(true); config.setSocketConfig(socketConfig); // 連線數大小 config.setWorkerThreads(workCount); // 允許客戶請求 config.setAllowCustomRequests(allowCustomRequests); // 協議升級超時時間(毫秒),預設10秒,HTTP握手升級為ws協議超時時間 config.setUpgradeTimeout(upgradeTimeout); // Ping訊息超時時間(毫秒),預設60秒,這個時間間隔內沒有接收到心跳訊息就會發送超時事件 config.setPingTimeout(pingTimeout); // Ping訊息間隔(毫秒),預設25秒。客戶端向伺服器傳送一條心跳訊息間隔 config.setPingInterval(pingInterval); // 設定HTTP互動最大內容長度 config.setMaxHttpContentLength(maxHttpContentLength); // 設定最大每幀處理資料的長度,防止他人利用大資料來攻擊伺服器 config.setMaxFramePayloadLength(maxFramePayloadLength); return new SocketIOServer(config); } /** * 開啟 SocketIOServer 註解支援 * * @param socketServer socketServer * @return {@link SpringAnnotationScanner} * @author bai * @since 2020/7/20 */ @Bean public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) { return new SpringAnnotationScanner(socketServer); } }
SocketServer
import ch.qos.logback.core.net.server.ServerRunner;
import com.corundumstudio.socketio.SocketIOServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
* SpringBoot啟動之後執行
*
* @author bai
* @since 2020/7/20
*/
@Component
@Order(1)
public class SocketServer implements CommandLineRunner {
/**
* logger
*/
private static final Logger logger = LoggerFactory.getLogger(ServerRunner.class);
/**
* socketIOServer
*/
private final SocketIOServer socketIOServer;
@Autowired
public SocketServer(SocketIOServer server) {
this.socketIOServer = server;
}
@Override
public void run(String... args) {
logger.info("---------- NettySocket通知服務開始啟動 ----------");
socketIOServer.start();
logger.info("---------- NettySocket通知服務啟動成功 ----------");
}
}
MessageDTO
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 訊息資料傳輸物件
*
* @author bai
* @since 2020/7/20
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MessageDTO implements Serializable {
/**
* 登入使用者 Id
*/
@ApiModelProperty(value = "登入使用者Id", name = "userId")
private String userId;
/**
* 接收訊息使用者 Id
*/
@ApiModelProperty(value = "接收訊息使用者Id", name = "toUserId")
private String toUserId;
/**
* 被操作物件 Id
*/
@ApiModelProperty(value = "被操作物件Id", name = "beOperatedId")
private String beOperatedId;
/**
* 訊息型別
*/
@ApiModelProperty(value = "訊息型別", name = "msgType")
private String msgType;
}
MsgTypeEnum
/**
* 訊息型別
*
* @author bai
* @since 2020/7/20
*/
public enum MsgTypeEnum {
/**
* 關注
*/
FOLLOW("follow"),
/**
* 認同
*/
LIKE("like"),
/**
* 評論
*/
COMMENT("comment");
private String value;
MsgTypeEnum(String type) {
value = type;
}
public String getValue() {
return value;
}
}
MsgStatusEnum
/**
* 訊息狀態
*
* @author bai
* @since 2020/7/20
*/
public enum MsgStatusEnum {
/**
* 上線
*/
ONLINE("上線"),
/**
* 下線
*/
OFFLINE("下線");
private String value;
MsgStatusEnum(String type) {
value = type;
}
public String getValue() {
return value;
}
}
SocketHandler
import cn.hutool.core.util.StrUtil;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import com.tbsc.pojo.dto.MessageDTO;
import com.tbsc.service.UserService;
import com.tbsc.utils.enums.MsgStatusEnum;
import com.tbsc.utils.enums.MsgTypeEnum;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* Socket 處理器
*
* @author bai
* @since 2020/7/20
*/
@Component
public class SocketHandler {
/**
* ConcurrentHashMap 儲存當前 SocketServer 使用者 Id 對應關係
*/
private Map<String, UUID> clientMap = new ConcurrentHashMap<>(16);
/**
* socketIOServer
*/
private final SocketIOServer socketIOServer;
@Autowired
public SocketHandler(SocketIOServer server) {
this.socketIOServer = server;
}
@Resource
private UserService userService;
/**
* 當客戶端發起連線時呼叫
*
* @param socketClient socketClient
* @author bai
* @since 2020/7/20
*/
@OnConnect
public void onConnect(SocketIOClient socketClient) {
String userId = socketClient.getHandshakeData().getSingleUrlParam("userId");
if (StrUtil.isNotBlank(userId)) {
if (userService.queryUserById(userId) != null) {
System.out.println("使用者{" + userId + "}開啟長連線通知, NettySocketSessionId: {"
+ socketClient.getSessionId().toString() + "},NettySocketRemoteAddress: {"
+ socketClient.getRemoteAddress().toString() + "}");
// 儲存
clientMap.put(userId, socketClient.getSessionId());
// 傳送上線通知
this.sendNotice(new MessageDTO(userId, null, null, MsgStatusEnum.ONLINE.getValue()));
}
}
}
/**
* 客戶端斷開連線時呼叫,重新整理客戶端資訊
*
* @param socketClient socketClient
* @author bai
* @since 2020/7/20
*/
@OnDisconnect
public void onDisConnect(SocketIOClient socketClient) {
String userId = socketClient.getHandshakeData().getSingleUrlParam("userId");
if (StrUtil.isNotBlank(userId)) {
System.out.println("使用者{" + userId + "}斷開長連線通知, NettySocketSessionId: {"
+ socketClient.getSessionId().toString() + "},NettySocketRemoteAddress: {"
+ socketClient.getRemoteAddress().toString() + "}");
// 移除
clientMap.remove(userId);
// 傳送下線通知
this.sendNotice(new MessageDTO(userId, null, null, MsgStatusEnum.OFFLINE.getValue()));
}
}
/**
* 傳送上下線通知
*
* @param messageDTO messageDTO
* @author bai
* @since 2020/7/20
*/
private void sendNotice(MessageDTO messageDTO) {
if (messageDTO != null) {
// 全部發送
clientMap.forEach((key, value) -> {
if (value != null) {
socketIOServer.getClient(value).sendEvent("receiveMsg", messageDTO);
}
});
}
}
/**
* sendMsg: 接收前端訊息,方法名需與前端一致
* receiveMsg:前端接收後端傳送資料的方法,方法名需與前端一致
*
* @param socketClient socketClient
* @param messageDTO messageDTO
* @author bai
* @since 2020/7/20
*/
@OnEvent("sendMsg")
public void sendMsg(SocketIOClient socketClient, MessageDTO messageDTO) {
// 獲取前端傳入的接收訊息使用者 Id
String toUserId = messageDTO.getToUserId();
// 客戶端 SessionId
UUID sessionId = clientMap.get(toUserId);
// 獲取前端傳入的訊息型別
String msgType = messageDTO.getMsgType();
// 獲取前端傳入的當前登入使用者 Id
String userId = messageDTO.getUserId();
// 獲取前端傳入的被操作物件 Id
String beOperatedId = messageDTO.getBeOperatedId();
// 如果 SessionId 相同,表示使用者線上,傳送即時通知,使用者每次開啟 APP 都會生成新的 SessionId
if (sessionId.equals(socketClient.getSessionId())) {
if (msgType.equals(MsgTypeEnum.LIKE.getValue())) {
socketIOServer.getClient(sessionId).sendEvent("receiveMsg", "你有一條認同訊息");
} else if (msgType.equals(MsgTypeEnum.FOLLOW.getValue())) {
socketIOServer.getClient(sessionId).sendEvent("receiveMsg", "你有一條關注訊息");
} else if (msgType.equals(MsgTypeEnum.COMMENT.getValue())) {
socketIOServer.getClient(sessionId).sendEvent("receiveMsg", "你有一條評論訊息");
} else {
System.out.println("訊息型別不匹配");
}
}
}
}