1. 程式人生 > 程式設計 >基於Netty-SocketIO的主動推送服務

基於Netty-SocketIO的主動推送服務

背景

前端時間,公司開發了一款主動服務的機器人的程式,講產生的訊息通過服務端主動推送到客戶端(H5、IOS、Android),支援使用者的個性化開關設定,使用者可自由選擇接受的訊息型別;同時支援使用者主動提問;在此記錄下整個部署以及實現的大致思路;

同時感謝我的Leader給予的幫助。

部署

Nginx配置

  • 為了保持長連線有效,配置HTTP版本1.1;
  • 配置UpgradeConnection響應頭資訊;

完整配置如下:

location / {
    proxy_pass http://nodes;

    # enable WebSockets
    proxy_http_version
1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; } 複製程式碼

Socket配置

Socket配置類

public class WebSocketConfig {

    private Logger log = LoggerFactory.getLogger(WebSocketConfig.class);

    @Value("${wss.server.host}")
    private String host;

    @Value("${wss.server.port}"
) private Integer port; @Value("${redis.passwd}") private String redisPasswd; @Value("${redis.address}") private String redisAddress; @Bean public PubSubStore pubSubStore() { return socketIOServer().getConfiguration().getStoreFactory().pubSubStore(); } @Bean
public SocketIOServer socketIOServer() { Config redissonConfig = new Config(); // 高版本需求 redis:// 字首 redissonConfig.useSingleServer().setPassword("xxx").setAddress("redis://xxx:xx").setDatabase(); RedissonClient redisson = Redisson.create(redissonConfig); RedissonStoreFactory redisStoreFactory = new RedissonStoreFactory(redisson); Configuration config = new Configuration(); config.setHostname(host); config.setPort(port); config.setOrigin(origin); config.setHttpCompression(false); config.setWebsocketCompression(false); config.setStoreFactory(redisStoreFactory); // 注意如果開放跨域設定,需要設定為null而不是"*" config.setOrigin(null); // 協議升級超時時間(毫秒),預設10000。HTTP握手升級為ws協議超時時間 config.setUpgradeTimeout(10000); // Ping訊息間隔(毫秒),預設25000。客戶端向伺服器傳送一條心跳訊息間隔 config.setPingInterval(25000); // Ping訊息超時時間(毫秒),預設60000,這個時間間隔內沒有接收到心跳訊息就會傳送超時事件 config.setPingTimeout(60000); /** 異常監聽事件,必須覆寫全部方法 */ config.setExceptionListener(new ExceptionListener(){ @Override public void onConnectException(Exception e,SocketIOClient client) { ResponseMessage error = ResponseMessage.error(-1,"連線異常!"); client.sendEvent("exception",JSON.toJSON(new Response<String>(error,"連線異常!"))); } @Override public void onDisconnectException(Exception e,"斷開異常!"); client.sendEvent("exception","連線異常!"))); } @Override public void onEventException(Exception e,List<Object> data,"伺服器異常!"); client.sendEvent("exception","連線異常!"))); } @Override public void onPingException(Exception e,"PING 超時異常!"); client.sendEvent("exception","PING 超時異常!"))); } @Override public boolean exceptionCaught(ChannelHandlerContext ctx,Throwable e) { return false; } }); // 類似於過濾器設定,此處不作處理 config.setAuthorizationListener(data -> { // // 可以使用如下程式碼獲取使用者密碼資訊 // String appId = data.getSingleUrlParam("appId"); // String source = data.getSingleUrlParam("source"); // log.info("token {},client {}",appId,source); return true; }); return new SocketIOServer(config); } @Bean public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) { return new SpringAnnotationScanner(socketServer); } } 複製程式碼

Socket啟動類

@Log4j2
@Component
@Order(value=1)
public class ServerRunner implements CommandLineRunner {

    private final SocketIOServer server;


    @Autowired
    public ServerRunner(SocketIOServer server) {
        this.server = server;
    }

    @Override
    public void run(String... args) throws Exception {
        server.start();
        log.info("socket.io啟動成功!");
    }
}
複製程式碼

最終架構

實現過程

主動推送服務監聽作為KafKa消費者,資料生產者講加工好的資料推到KafKa中,消費者監聽到訊息廣播給客戶端;推送時在資料庫查詢使用者對應的個性化設定,僅推送客戶端選擇接受的訊息;

由於主動推送服務部署了多個節點,而多個節點分配在同一個KafKa消費組中,這樣會引起多個節點僅消費到全部訊息的一部分的問題;這裡使用Redis釋出/訂閱的機制解決了這個問題:當各個節點消費到訊息之後,將訊息釋出之後,其它節點訂閱該Topic將訊息傳送給各自節點上連線的客戶端,在這裡各個節點即是釋出者,又是訂閱者;

從資料的產生,到消費

使用Redisson的Topic實現分散式釋出/訂閱

Redisson為了方便Redis中的釋出/訂閱機制的使用,將其封裝成Topic,並提供了程式碼級別的釋出/訂閱操作,如此一來多個JVM程式連線到Redis(單機/叢集)後,便可以實現在一個JVM程式中釋出Topic,在其他已經訂閱了該主題的JVM程式中就能及時收到訊息。

在Netty-SocketIO整合了Redisson之後,內部也使用了釋出/訂閱機制

訊息的釋出

public void sendMessageToAllClient(String eventType,String message,String desc) {
    Collection<SocketIOClient> clients = server.getBroadcastOperations().getClients();
    for(final SocketIOClient client : clients){
      // Do Somthing
    }

    Packet packet = new Packet(PacketType.MESSAGE);
    packet.setData(new BroadcastMessage(message,eventType,desc));
    publishMessage(packet);
}

private void publishMessage(Packet packet) {
    DispatchMessage dispatchMessage = new DispatchMessage("",packet,"");
    pubSubStore.publish(PubSubType.DISPATCH,dispatchMessage);
    BroadcastMessage broadcastMessage = dispatchMessage.getPacket().getData();

}
複製程式碼
訊息的訂閱
@PostConstruct
public void init() {
  pubSubStore.subscribe(PubSubType.DISPATCH,dispatchMessage -> {
      BroadcastMessage messageData = dispatchMessage.getPacket().getData();
    
      Collection<SocketIOClient> clients = server.getBroadcastOperations().getClients();

      for(final SocketIOClient client : clients){
        // DO Somthing
      },DispatchMessage.class);
}
複製程式碼