基於Netty-SocketIO的主動推送服務
阿新 • • 發佈:2019-12-31
背景
前端時間,公司開發了一款主動服務的機器人的程式,講產生的訊息通過服務端主動推送到客戶端(H5、IOS、Android),支援使用者的個性化開關設定,使用者可自由選擇接受的訊息型別;同時支援使用者主動提問;在此記錄下整個部署以及實現的大致思路;
同時感謝我的Leader給予的幫助。
部署
Nginx配置
- 為了保持長連線有效,配置HTTP版本1.1;
- 配置
Upgrade
和Connection
響應頭資訊;
完整配置如下:
location / {
proxy_pass http://nodes;
# enable WebSockets
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
複製程式碼
Socket配置
Socket配置類
public class WebSocketConfig {
private Logger log = LoggerFactory.getLogger(WebSocketConfig.class);
@Value("${wss.server.host}")
private String host;
@Value("${wss.server.port}" )
private Integer port;
@Value("${redis.passwd}")
private String redisPasswd;
@Value("${redis.address}")
private String redisAddress;
@Bean
public PubSubStore pubSubStore() {
return socketIOServer().getConfiguration().getStoreFactory().pubSubStore();
}
@Bean
public SocketIOServer socketIOServer() {
Config redissonConfig = new Config();
// 高版本需求 redis:// 字首
redissonConfig.useSingleServer().setPassword("xxx").setAddress("redis://xxx:xx").setDatabase();
RedissonClient redisson = Redisson.create(redissonConfig);
RedissonStoreFactory redisStoreFactory = new RedissonStoreFactory(redisson);
Configuration config = new Configuration();
config.setHostname(host);
config.setPort(port);
config.setOrigin(origin);
config.setHttpCompression(false);
config.setWebsocketCompression(false);
config.setStoreFactory(redisStoreFactory);
// 注意如果開放跨域設定,需要設定為null而不是"*"
config.setOrigin(null);
// 協議升級超時時間(毫秒),預設10000。HTTP握手升級為ws協議超時時間
config.setUpgradeTimeout(10000);
// Ping訊息間隔(毫秒),預設25000。客戶端向伺服器傳送一條心跳訊息間隔
config.setPingInterval(25000);
// Ping訊息超時時間(毫秒),預設60000,這個時間間隔內沒有接收到心跳訊息就會傳送超時事件
config.setPingTimeout(60000);
/** 異常監聽事件,必須覆寫全部方法 */
config.setExceptionListener(new ExceptionListener(){
@Override
public void onConnectException(Exception e,SocketIOClient client) {
ResponseMessage error = ResponseMessage.error(-1,"連線異常!");
client.sendEvent("exception",JSON.toJSON(new Response<String>(error,"連線異常!")));
}
@Override
public void onDisconnectException(Exception e,"斷開異常!");
client.sendEvent("exception","連線異常!")));
}
@Override
public void onEventException(Exception e,List<Object> data,"伺服器異常!");
client.sendEvent("exception","連線異常!")));
}
@Override
public void onPingException(Exception e,"PING 超時異常!");
client.sendEvent("exception","PING 超時異常!")));
}
@Override
public boolean exceptionCaught(ChannelHandlerContext ctx,Throwable e) {
return false;
}
});
// 類似於過濾器設定,此處不作處理
config.setAuthorizationListener(data -> {
// // 可以使用如下程式碼獲取使用者密碼資訊
// String appId = data.getSingleUrlParam("appId");
// String source = data.getSingleUrlParam("source");
// log.info("token {},client {}",appId,source);
return true;
});
return new SocketIOServer(config);
}
@Bean
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
return new SpringAnnotationScanner(socketServer);
}
}
複製程式碼
Socket啟動類
@Log4j2
@Component
@Order(value=1)
public class ServerRunner implements CommandLineRunner {
private final SocketIOServer server;
@Autowired
public ServerRunner(SocketIOServer server) {
this.server = server;
}
@Override
public void run(String... args) throws Exception {
server.start();
log.info("socket.io啟動成功!");
}
}
複製程式碼
最終架構
實現過程
主動推送服務監聽作為KafKa消費者,資料生產者講加工好的資料推到KafKa中,消費者監聽到訊息廣播給客戶端;推送時在資料庫查詢使用者對應的個性化設定,僅推送客戶端選擇接受的訊息;
由於主動推送服務部署了多個節點,而多個節點分配在同一個KafKa消費組中,這樣會引起多個節點僅消費到全部訊息的一部分的問題;這裡使用Redis
的釋出/訂閱
的機制解決了這個問題:當各個節點消費到訊息之後,將訊息釋出之後,其它節點訂閱該Topic
將訊息傳送給各自節點上連線的客戶端,在這裡各個節點即是釋出者,又是訂閱者;
從資料的產生,到消費
使用Redisson的Topic實現分散式釋出/訂閱
Redisson為了方便Redis中的釋出/訂閱
機制的使用,將其封裝成Topic,並提供了程式碼級別的釋出/訂閱
操作,如此一來多個JVM程式連線到Redis(單機/叢集)後,便可以實現在一個JVM程式中釋出
的Topic
,在其他已經訂閱
了該主題的JVM程式中就能及時收到訊息。
在Netty-SocketIO整合了Redisson
之後,內部也使用了釋出/訂閱
機制
訊息的釋出
public void sendMessageToAllClient(String eventType,String message,String desc) {
Collection<SocketIOClient> clients = server.getBroadcastOperations().getClients();
for(final SocketIOClient client : clients){
// Do Somthing
}
Packet packet = new Packet(PacketType.MESSAGE);
packet.setData(new BroadcastMessage(message,eventType,desc));
publishMessage(packet);
}
private void publishMessage(Packet packet) {
DispatchMessage dispatchMessage = new DispatchMessage("",packet,"");
pubSubStore.publish(PubSubType.DISPATCH,dispatchMessage);
BroadcastMessage broadcastMessage = dispatchMessage.getPacket().getData();
}
複製程式碼
訊息的訂閱
@PostConstruct
public void init() {
pubSubStore.subscribe(PubSubType.DISPATCH,dispatchMessage -> {
BroadcastMessage messageData = dispatchMessage.getPacket().getData();
Collection<SocketIOClient> clients = server.getBroadcastOperations().getClients();
for(final SocketIOClient client : clients){
// DO Somthing
},DispatchMessage.class);
}
複製程式碼