1. 程式人生 > >springboot+netty+websocket

springboot+netty+websocket

前段時間碰到個專案,需求是使用者在第三方會議系統簽到後需要把使用者頭像實時傳送顯示到大屏上展示,因為簽到時間持續時間比較長,前端ajax輪詢的方式不是很理想,所以考慮使用websocket,就拿公司其他的專案來研究了一下,在此記錄下初識springboot + netty + websocket的過程,主要是Server端的實現過程。

  1. 在pom.xml中新增以下依賴

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.31.Final</version>
    </dependency>
  2. 新增啟動類實現CommandLineRunner介面並重寫run方法,啟動自執行,即啟動springboot時就啟動netty

    import com.stt.experiment.trynetty.server.NettyServer;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.stereotype.Component;
    
    /**
     * 啟動服務
     */
    @Slf4j
    @Component
    public class ServerRunner implements CommandLineRunner {
    
        @Autowired
        private NettyServer nettyServer;
    
        @Override
        public void run(String... strings) throws Exception {
            Thread thread = new Thread(nettyServer);
            thread.start();
        }
    }
  3. netty服務

    import com.stt.experiment.trynetty.server.config.ServerConfig;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    public class NettyServer implements Runnable{
    
        private final ServerConfig serverConfig;
        private final ServerChannelInitializer serverChannelInitializer;
    
        @Autowired
        public NettyServer(ServerConfig serverConfig, ServerChannelInitializer serverChannelInitializer) {
            this.serverConfig = serverConfig;
            this.serverChannelInitializer = serverChannelInitializer;
        }
    
        @Override
        public void run() {
            String host = serverConfig.getHost();
            int port = serverConfig.getPort();
            // 服務端需要2個執行緒組,boss處理客戶端連結,work進行客戶端連線之後的處理
            // boss這個EventLoopGroup作為一個acceptor負責接收來自客戶端的請求,然後分發給worker這個EventLoopGroup來處理所有的事件event和channel的IO
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                // 伺服器配置
                // childHandler(serverChannelInitializer)
                // 該函式的主要作用是設定channelHandler來處理客戶端的請求的channel的IO。
                bootstrap.group(boss, worker)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(serverChannelInitializer);
                //繫結埠,開始接收進來的連線
                ChannelFuture f = bootstrap.bind(host, port).sync();
                log.info("===========啟動成功===========");
                f.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    }

    該類實現Runnable介面,實現run方法,在ServerRunner中用new Thread(Runnable target).start()方法來啟動。


@Autowired的使用:推薦對建構函式進行註釋 感謝博友的分享,這裡是這篇文章的原文連結

這是我之前不明白的一個點,為什麼要使用@Autowired對建構函式進行註釋,而不是直接注入bean。看了這篇分享,做了試驗之後發現:
直接使用@Autowired注入,發現IDE報了warning:Spring Team recommends “Always use constructor based dependency injection in your beans. Always use assertions for mandatory dependencies”.
這段程式碼原來的寫法是:

@Autowired
private ServerConfig serverConfig;
@Autowired
private ServerChannelInitializer serverChannelInitializer;

根據IDE的建議修改後的程式碼寫法是:

private final ServerConfig serverConfig;
private final ServerChannelInitializer serverChannelInitializer;

@Autowired
public NettyServer(ServerConfig serverConfig, ServerChannelInitializer serverChannelInitializer) {
    this.serverConfig = serverConfig;
    this.serverChannelInitializer = serverChannelInitializer;
}

@Autowired 可以對成員變數、方法以及建構函式進行註釋,@Autowired注入bean,相當於在配置檔案中配置bean,並且使用setter注入。而對建構函式進行註釋,就相當於是使用建構函式進行依賴注入。
那為什麼IDE會這麼建議處理呢,主要原因就是使用構造器注入的方法,可以明確成員變數的載入順序(PS:Java變數的初始化順序為:靜態變數或靜態語句塊–>例項變數或初始化語句塊–>構造方法–>@Autowired)。

來看一個例子:

@Autowired
 private User user;
 private String school;

 public UserAccountServiceImpl(){
     this.school = user.getSchool();
 }

比如以上這段程式碼,能執行成功嗎?答案是不能,因為Java類會先執行構造方法,然後再給註解了@Autowired 的user注入值,所以在執行構造方法的時候,就會報錯。


4.netty 相關配置檔案

import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;

/**
 * netty 相關配置檔案
 */
@Getter
@Setter
@Component
@Configuration
// 引用自定義配置檔案
@PropertySource("classpath:netty_config.properties")
public class ServerConfig {

    @Value("${netty.server.host}")
    private String host; // 服務啟動地址

    @Value("${netty.server.port}")
    private int port; // 服務啟動埠

    @Value("${netty.server.workerThreads}")
    private int workerThreads; // 服務執行緒池中的執行緒數

    @Value("${netty.server.maxContentLength}")
    private int maxContentLength; // 可接受的最大的訊息長度

    @Value("${netty.server.readTimeout}")
    private int readTimeout; // 連線讀取超時時間

    @Value("${netty.server.writeTimeout}")
    private int writeTimeout; // 寫操作超時時間

    @Value("${netty.server.webSocketPath}")
    private String webSocketPath; // websocket 介面地址
}

5.構建Handler處理流程

import com.stt.experiment.trynetty.server.config.ServerConfig;
import com.stt.experiment.trynetty.server.handler.WebSocketServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import io.netty.channel.socket.SocketChannel;

/**
 * 構建Handler處理流程
 */
@Component
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel>{

    @Autowired
    private ServerConfig serverConfig;
    @Autowired
    private WebSocketServerHandler webSocketServerHandler;

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 新增各種功能handler,依次執行
        ChannelPipeline pipeline = socketChannel.pipeline();
        // HttpServerCodec: 將請求和應答的訊息解碼為HTTP訊息
        pipeline.addLast(new HttpServerCodec());
        // HttpObjectAggregator: 將HTTP訊息的多個部分合成一條完整的HTTP訊息
        pipeline.addLast(new HttpObjectAggregator(serverConfig.getMaxContentLength()));
        // ChunckedWriteHandler: 處理大資料傳輸,支援非同步寫大資料流,不引起高記憶體消耗。
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new ReadTimeoutHandler(serverConfig.getReadTimeout()));
        pipeline.addLast(new WriteTimeoutHandler(serverConfig.getWriteTimeout()));
        pipeline.addLast(new WebSocketServerProtocolHandler(serverConfig.getWebSocketPath()));

        // 新增自定義預設處理器
        pipeline.addLast(webSocketServerHandler);
    }
}

可將ChannelPipeline視為ChannelHandler例項鏈,可攔截流經通道的入站和出站事件,當建立一個新的Channel時,都會分配一個新的ChannelPipeline,該關聯是永久的,該通道既不能附加另一個ChannelPipeline也不能分離當前的ChannelPipeline。 文章出處

6.預設訊息處理器

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * 預設訊息處理器
 */
@Slf4j
@ChannelHandler.Sharable // 可以被新增至多個ChannelPipiline中
@Component
public class WebSocketServerHandler extends SimpleChannelInboundHandler<WebSocketFrame>{

    // 心跳資訊,可移到constants檔案
    private static final String HEART_BEAT = "heart_beat";

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, WebSocketFrame frame) throws Exception {
        String res = null;
        if (frame instanceof TextWebSocketFrame) {
            // 獲取請求訊息內容
            String requestContent = ((TextWebSocketFrame) frame).text();
            // if 心跳訊息
            if (HEART_BEAT.equalsIgnoreCase(requestContent)) {
                res = requestContent;
            } else {
                // else 其他請求則請求分發

            }
        } else {
            log.warn("unsupport dataType: {}", frame.getClass().getName());
        }
        if (null != res) {
            channelHandlerContext.writeAndFlush(new TextWebSocketFrame(res));
        }
    }
}

訊息型別:
BinaryWebSocketFrame 二進位制資料
TextWebSocketFrame 文字資料
ContinuationWebSocketFrame 超大文字或者二進位制資料

這裡只對心跳進行了處理,簡單的業務(即請求比較少的情況下),可以直接在這裡進行資料處理,如果業務比較複雜,可自行做請求分發,到各自的Action中處理相關的業務邏輯。