1. 程式人生 > >springboot整合SocketIO實時通訊

springboot整合SocketIO實時通訊

概述

基於 socket.io 來說,採用 node 實現更加合適,本文使用兩個後端的Java開源框架實現。

業務需求是將之前通過輪詢方式調動RESTFul API改成使用WebSocket長連線方式,實現要伺服器實時的推送訊息,另外還要實時監控POS機的線上狀態等。

引入依賴

<!-- netty-socketio-->
<dependency>
    <groupId>com.corundumstudio.socketio</groupId>
    <artifactId>netty-socketio</artifactId>
    <version>1.7.13</version>
</dependency>
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-resolver</artifactId>
    <version>4.1.15.Final</version>
</dependency>
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-transport</artifactId>
    <version>4.1.15.Final</version>
</dependency>
<dependency>
    <groupId>io.socket</groupId>
    <artifactId>socket.io-client</artifactId>
    <version>1.0.0</version>
</dependency>

先來服務端程式爽一把,話不多說,先上程式碼:

伺服器程式碼

public class NamespaceSocketServer {
    private static final Logger logger = LoggerFactory.getLogger(NamespaceSocketServer.class);

    public static void main(String[] args) {
        /*
         * 建立Socket,並設定監聽埠
         */
        Configuration config = new Configuration();
        //設定主機名
        config.setHostname("localhost");
        //設定監聽埠
        config.setPort(9092);
        // 協議升級超時時間(毫秒),預設10秒。HTTP握手升級為ws協議超時時間
        config.setUpgradeTimeout(10000);
        // Ping訊息超時時間(毫秒),預設60秒,這個時間間隔內沒有接收到心跳訊息就會發送超時事件
        config.setPingTimeout(180000);
        // Ping訊息間隔(毫秒),預設25秒。客戶端向伺服器傳送一條心跳訊息間隔
        config.setPingInterval(60000);
        // 連線認證,這裡使用token更合適
        config.setAuthorizationListener(new AuthorizationListener() {
            @Override
            public boolean isAuthorized(HandshakeData data) {
                // String token = data.getSingleUrlParam("token");
                // String username = JWTUtil.getSocketUsername(token);
                // return JWTUtil.verifySocket(token, "secret");
                return true;
            }
        });

        final SocketIOServer server = new SocketIOServer(config);

        /*
         * 新增連線監聽事件,監聽是否與客戶端連線到伺服器
         */
        server.addConnectListener(new ConnectListener() {
            @Override
            public void onConnect(SocketIOClient client) {
                // 判斷是否有客戶端連線
                if (client != null) {
                    logger.info("連線成功。clientId=" + client.getSessionId().toString());
                    client.joinRoom(client.getHandshakeData().getSingleUrlParam("appid"));
                } else {
                    logger.error("並沒有人連線上。。。");
                }
            }
        });

        /*
         * 新增監聽事件,監聽客戶端的事件
         * 1.第一個引數eventName需要與客戶端的事件要一致
         * 2.第二個引數eventClase是傳輸的資料型別
         * 3.第三個引數listener是用於接收客戶端傳的資料,資料型別需要與eventClass一致
         */
        server.addEventListener("login", LoginRequest.class, new DataListener<LoginRequest>() {
            @Override
            public void onData(SocketIOClient client, LoginRequest data, AckRequest ackRequest) {
                logger.info("接收到客戶端login訊息:code = " + data.getCode() + ",body = " + data.getBody());
                // check is ack requested by client, but it's not required check
                if (ackRequest.isAckRequested()) {
                    // send ack response with data to client
                    ackRequest.sendAckData("已成功收到客戶端登入請求", "yeah");
                }
                // 向客戶端傳送訊息
                List<String> list = new ArrayList<>();
                list.add("登入成功,sessionId=" + client.getSessionId());
                // 第一個引數必須與eventName一致,第二個引數data必須與eventClass一致
                String room = client.getHandshakeData().getSingleUrlParam("appid");
                server.getRoomOperations(room).sendEvent("login", list.toString());
            }
        });
        //啟動服務
        server.start();
    }
}

老規矩,先上程式碼爽爽

Android客戶端

public class SocketClient {
    private static Socket socket;
    private static final Logger logger = LoggerFactory.getLogger(SocketClient.class);

    public static void main(String[] args) throws URISyntaxException {
        IO.Options options = new IO.Options();
        options.transports = new String[]{"websocket"};
        options.reconnectionAttempts = 2;     // 重連嘗試次數
        options.reconnectionDelay = 1000;     // 失敗重連的時間間隔(ms)
        options.timeout = 20000;              // 連線超時時間(ms)
        options.forceNew = true;
        options.query = "username=test1&password=test1&appid=com.xncoding.apay2";
        socket = IO.socket("http://localhost:9092/", options);
        socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                // 客戶端一旦連線成功,開始發起登入請求
                LoginRequest message = new LoginRequest(12, "這是客戶端訊息體");
                socket.emit("login", JsonConverter.objectToJSONObject(message), (Ack) args1 -> {
                    logger.info("回執訊息=" + Arrays.stream(args1).map(Object::toString).collect(Collectors.joining(",")));
                });
            }
        }).on("login", new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                logger.info("接受到伺服器房間廣播的登入訊息:" + Arrays.toString(args));
            }
        }).on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                logger.info("Socket.EVENT_CONNECT_ERROR");
                socket.disconnect();
            }
        }).on(Socket.EVENT_CONNECT_TIMEOUT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                logger.info("Socket.EVENT_CONNECT_TIMEOUT");
                socket.disconnect();
            }
        }).on(Socket.EVENT_PING, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                logger.info("Socket.EVENT_PING");
            }
        }).on(Socket.EVENT_PONG, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                logger.info("Socket.EVENT_PONG");
            }
        }).on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                logger.info("-----------接受到訊息啦--------" + Arrays.toString(args));
            }
        }).on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                logger.info("客戶端斷開連線啦。。。");
                socket.disconnect();
            }
        });
        socket.connect();
    }
}

關於心跳機制

根據 Socket.IO文件 解釋, 客戶端會定期傳送心跳包,並觸發一個ping事件和一個pong事件,如下:

  • ping Fired when a ping packet is written out to the server.
  • pong Fired when a pong is received from the server. Parameters:
    • Number number of ms elapsed since ping packet (i.e.: latency)

這裡最重要的兩個伺服器引數如下:

  1. pingTimeout (Number): how many ms without a pong packet to consider the connection closed (60000)
  2. pingInterval (Number): how many ms before sending a new ping packet (25000).

也就是說握手協議的時候,客戶端從伺服器拿到這兩個引數,一個是ping訊息的傳送間隔時間,一個是從伺服器返回pong訊息的超時時間, 客戶端會在超時後斷開連線。心跳包傳送方向是客戶端向伺服器端傳送,以維持線上狀態。

關於斷線和超時

關閉瀏覽器、直接關閉客戶端程式、kill程序、主動執行disconnect方法都會導致立刻產生斷線事件。 而客戶端把網路斷開,伺服器端在 pingTimeout ms後產生斷線事件、客戶端在 pingTimeout ms後也產生斷線事件。

實際上,超時後會產生一個斷線事件,叫”disconnect”。客戶端和伺服器端都可以對這個事件作出應答,釋放連線。

客戶端程式碼:

.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
    @Override
    public void call(Object... args) {
        logger.info("客戶端斷開連線啦。。。");
        socket.disconnect();
    }
});

下面是客戶端日誌:連上伺服器後,斷開網路。超過了心跳超時時間後,產生斷線事件。如果客戶端不主動斷開連線的話,會自動重連, 這時候發現連線不上,又產生連線錯誤事件,然後重試2次,都失敗後自動斷開連線了。

1
2
3
4
5
SocketClient - 回執訊息=伺服器已成功收到客戶端登入請求,yeah
SocketClient - Socket.EVENT_PING
SocketClient - Socket.EVENT_PONG
SocketClient - 客戶端斷開連線啦。。。
SocketClient - Socket.EVENT_CONNECT_ERROR

伺服器端程式碼:

server.addDisconnectListener(new DisconnectListener() {
    @Override
    public void onDisconnect(SocketIOClient client) {
        System.out.println("伺服器收到斷線通知... sessionId=" + client.getSessionId());
    }
});

伺服器邏輯是,如果在心跳超時後,就直接斷開這個連線,並且產生一個斷開連線事件。

伺服器通過netty處理心跳包ping/pong的日誌如下:

1
2
3
WebSocket08FrameDecoder - Decoding WebSocket Frame opCode=1
WebSocket08FrameDecoder - Decoding WebSocket Frame length=1
WebSocket08FrameEncoder - Encoding WebSocket Frame opCode=1 length=1

瀏覽器客戶端演示

對於netty-socketio有一個demo工程,裡面通過一個網頁的聊天小程式演示了各種使用方法。

SpringBoot整合

最後重點講一下如何在SpringBoot中整合。

修改配置

首先maven依賴之前已經講過了,先修改下application.yml配置檔案來配置下幾個引數,比如主機、埠、心跳時間等等。

1
2
3
4
###################  自定義專案配置 ###################
xncoding:
  socket-hostname: localhost
  socket-port: 9096

新增Bean配置

然後增加一個SocketServer的Bean配置:

@Configuration
public class NettySocketConfig {

    @Resource
    private MyProperties myProperties;

    @Resource
    private ApiService apiService;
    @Resource
    private ManagerInfoService managerInfoService;

    private static final Logger logger = LoggerFactory.getLogger(NettySocketConfig.class);

    @Bean
    public SocketIOServer socketIOServer() {
        /*
         * 建立Socket,並設定監聽埠
         */
        com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
        // 設定主機名,預設是0.0.0.0
        // config.setHostname("localhost");
        // 設定監聽埠
        config.setPort(9096);
        // 協議升級超時時間(毫秒),預設10000。HTTP握手升級為ws協議超時時間
        config.setUpgradeTimeout(10000);
        // Ping訊息間隔(毫秒),預設25000。客戶端向伺服器傳送一條心跳訊息間隔
        config.setPingInterval(60000);
        // Ping訊息超時時間(毫秒),預設60000,這個時間間隔內沒有接收到心跳訊息就會發送超時事件
        config.setPingTimeout(180000);
        // 這個版本0.9.0不能處理好namespace和query引數的問題。所以為了做認證必須使用全域性預設名稱空間
        config.setAuthorizationListener(new AuthorizationListener() {
            @Override
            public boolean isAuthorized(HandshakeData data) {
                // 可以使用如下程式碼獲取使用者密碼資訊
                String username = data.getSingleUrlParam("username");
                String password = data.getSingleUrlParam("password");
                logger.info("連線引數:username=" + username + ",password=" + password);
                ManagerInfo managerInfo = managerInfoService.findByUsername(username);
                // MD5鹽
                String salt = managerInfo.getSalt();
                String encodedPassword = ShiroKit.md5(password, username + salt);
                // 如果認證不通過會返回一個Socket.EVENT_CONNECT_ERROR事件
                return encodedPassword.equals(managerInfo.getPassword());
            }
        });

        final SocketIOServer server = new SocketIOServer(config);

        return server;
    }

    @Bean
    public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
        return new SpringAnnotationScanner(socketServer);
    }
}

後面還有個SpringAnnotationScanner的定義不能忘記。注意,我在AuthorizationListener裡面通過呼叫service做了使用者名稱和密碼的認證。通過註解方式可以注入service, 執行相應的連線授權動作。

新增訊息結構類

預先定義好客戶端和伺服器端直接傳遞的訊息型別,使用簡單的JavaBean即可,比如

public class ReportParam {
    /**
     * IMEI碼
     */
    private String imei;
    /**
     * 位置
     */
    private String location;

    public String getImei() {
        return imei;
    }

    public void setImei(String imei) {
        this.imei = imei;
    }

    public String getLocation() {
        return location;
    }

    public void setLocation(String location) {
        this.location = location;
    }
}

這裡才是最核心的介面處理類,所有介面處理邏輯都應該寫在這裡面,我只舉了一個例子,就是POS上傳位置介面:

新增訊息處理類

/**
 * 訊息事件處理器
 *
 * @version 1.0
 * @since 2018/1015
 */
@Component
public class MessageEventHandler {

    private final SocketIOServer server;
    private final ApiService apiService;

    private static final Logger logger = LoggerFactory.getLogger(MessageEventHandler.class);

    @Autowired
    public MessageEventHandler(SocketIOServer server, ApiService apiService) {
        this.server = server;
        this.apiService = apiService;
    }

    //新增connect事件,當客戶端發起連線時呼叫
    @OnConnect
    public void onConnect(SocketIOClient client) {
        if (client != null) {
            String imei = client.getHandshakeData().getSingleUrlParam("imei");
            String applicationId = client.getHandshakeData().getSingleUrlParam("appid");
            logger.info("連線成功, applicationId=" + applicationId + ", imei=" + imei +
                    ", sessionId=" + client.getSessionId().toString() );
            client.joinRoom(applicationId);
            // 更新POS監控狀態為線上
            ReportParam param = new ReportParam();
            param.setImei(imei);
            apiService.updateJustState(param, client.getSessionId().toString(), 1);
        } else {
            logger.error("客戶端為空");
        }
    }

    //新增@OnDisconnect事件,客戶端斷開連線時呼叫,重新整理客戶端資訊
    @OnDisconnect
    public void onDisconnect(SocketIOClient client) {
        String imei = client.getHandshakeData().getSingleUrlParam("imei");
        logger.info("客戶端斷開連線, imei=" + imei + ", sessionId=" + client.getSessionId().toString());
        // 更新POS監控狀態為離線
        ReportParam param = new ReportParam();
        param.setImei(imei);
        apiService.updateJustState(param, "", 2);
        client.disconnect();
    }

    // 訊息接收入口
    @OnEvent(value = Socket.EVENT_MESSAGE)
    public void onEvent(SocketIOClient client, AckRequest ackRequest, Object data) {
        logger.info("接收到客戶端訊息");
        if (ackRequest.isAckRequested()) {
            // send ack response with data to client
            ackRequest.sendAckData("伺服器回答Socket.EVENT_MESSAGE", "好的");
        }
    }

    // 廣播訊息接收入口
    @OnEvent(value = "broadcast")
    public void onBroadcast(SocketIOClient client, AckRequest ackRequest, Object data) {
        logger.info("接收到廣播訊息");
        // 房間廣播訊息
        String room = client.getHandshakeData().getSingleUrlParam("appid");
        server.getRoomOperations(room).sendEvent("broadcast", "廣播啦啦啦啦");
    }

    /**
     * 報告地址介面
     * @param client 客戶端
     * @param ackRequest 回執訊息
     * @param param 報告地址引數
     */
    @OnEvent(value = "doReport")
    public void onDoReport(SocketIOClient client, AckRequest ackRequest, ReportParam param) {
        logger.info("報告地址介面 start....");
        BaseResponse result = postReport(param);
        ackRequest.sendAckData(result);
    }

    /*----------------------------------------下面是私有方法-------------------------------------*/
    private BaseResponse postReport(ReportParam param) {
        BaseResponse result = new BaseResponse();
        int r = apiService.report(param);
        if (r > 0) {
            result.setSuccess(true);
            result.setMsg("報告地址成功");
        } else {
            result.setSuccess(false);
            result.setMsg("該POS機還沒有入網,報告地址失敗。");
        }
        return result;
    }
}

還有一個步驟就是新增啟動器,在SpringBoot啟動之後立馬執行:

新增ServerRunner

/**
 * SpringBoot啟動之後執行
 *
 *
 * @version 1.0
 * @since 2018/10/15
 */
@Component
@Order(1)
public class ServerRunner implements CommandLineRunner {
    private final SocketIOServer server;
    private static final Logger logger = LoggerFactory.getLogger(ServerRunner.class);

    @Autowired
    public ServerRunner(SocketIOServer server) {
        this.server = server;
    }

    @Override
    public void run(String... args) throws Exception {
        logger.info("ServerRunner 開始啟動啦...");
        server.start();
    }
}

要實現通過域名並走標準80或443埠的話,最好使用nginx做反向代理,跟正常的http反向代理基本一致, 不過websocket需要增加一個upgrade的配置。

nginx反向代理

下面我以一個實際使用例子來說明如何配置nginx的https訪問websocket,並且開啟301自動http跳轉https。

首先要有一個域名,比如test.enzhico.net,然後申請letsencrypt的免費證書,這個過程我不講了

配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
map $http_upgrade $connection_upgrade {
    default upgrade;
    '' close;
}

server {
      server_name test.enzhico.net;
      location / {
          proxy_pass http://localhost:9096;
          proxy_read_timeout 300s;
          proxy_set_header Host $host;
          proxy_set_header X-Real-IP $remote_addr;
          proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
          proxy_http_version 1.1;
          proxy_set_header Upgrade $http_upgrade;
          proxy_set_header Connection $connection_upgrade;
      }
      #root /opt/www/test.enzhico.net;
      #index index.html index.htm;
      error_page 404 /404.html;
          location = /40x.html {
      }

      error_page 500 502 503 504 /50x.html;
          location = /50x.html {
      }

    listen 443 ssl; # managed by Certbot
    ssl_certificate /etc/letsencrypt/live/test.enzhico.net/fullchain.pem; # managed by Certbot
    ssl_certificate_key /etc/letsencrypt/live/test.enzhico.net/privkey.pem; # managed by Certbot
    include /etc/letsencrypt/options-ssl-nginx.conf; # managed by Certbot
    ssl_dhparam /etc/letsencrypt/ssl-dhparams.pem; # managed by Certbot

}

server {
    listen 80;
    server_name test.enzhico.net;
    return 301 https://$host$request_uri; # managed by Certbot
}

注意這其中和普通HTTP代理的關鍵不同是:

1
2
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;

參考文章

GitHub原始碼