1. 程式人生 > >netty+spring(整合實戰)

netty+spring(整合實戰)

Netty Spring (整合實戰)

瘋狂創客圈 死磕Netty 系列之 11 【 部落格園 總入口

主要介紹的是SpringBoot整合Netty。在使用Netty之前,建議先了解Netty的基本原理,請參閱瘋狂創客圈。

這裡僅僅是使用Netty的第一步,這裡介紹一個最簡單的Demo——EchoServer,也就是回寫伺服器。就是無論客戶端發啥字串到伺服器端,伺服器端接收字串後直接回寫到客戶端。

本篇內容綱要

  • 環境要求
  • Spring +netty 伺服器端
  • Spring +netty 客戶端
  • Spring讀取配置檔案中的屬性值
原始碼 下載連結:

點選下載

環境要求

  • JDK::1.8
  • Netty::4.0或以上(不包括5)
<java.version>1.8</java.version>
<springboot>1.5.9.RELEASE</springboot>
<netty.version>4.0.33.Final</netty.version>

Spring +netty 伺服器端

回寫伺服器 Echo Server 程式主要由兩部分組成:

  • ServerBootstrap:
    伺服器啟動引導器。負責配置伺服器端基本資訊,並且完成伺服器的啟動

  • EchoServerHandler:
    回寫的業務邏輯處理器

ServerBootstrap

首先是編寫服務端的啟動類,程式碼中相應的註釋在寫得很詳細。
主要的步驟如下:

  1. 建立一個ServerBootstrap例項
  2. 建立一個EventLoopGroup來處理各種事件,如處理連結請求,傳送接收資料等。
  3. 設定本地監聽埠 InetSocketAddress( port)
  4. 設定 childHandler 來設定通道初始化類。並且在通道初始化時,加入回寫的業務邏輯處理器EchoServerHandler到伺服器通道的pipeline中 。childHandler 在通道初始化時,會被執行一次。
  5. 所有準備好之後呼叫ServerBootstrap.bind() 方法繫結 Server

不過需要注意的是,在不使用Spring的環境中,是通過main方法直接啟動服務端,因此是直接new一個處理器echoServerHandler 物件。而在和Spring 整合之後,我們需要將 echoServerHandler 處理器交給springBoot去管理。

ServerBootstrap 程式碼如下:
@Service("EchoServer")
public class EchoServer
{
    // 伺服器埠
    @Value("${server.port}")
    private int port;
    // 通過nio方式來接收連線和處理連線
    private static EventLoopGroup boss = new NioEventLoopGroup();
    private static EventLoopGroup work = new NioEventLoopGroup();

    // 啟動引導器
    private static ServerBootstrap b = new ServerBootstrap();
    @Autowired
    private EchoServerHandler echoServerHandler;

    public void run()
    {
        try
        {
            b.group(boss, work);
            // 設定nio型別的channel
            b.channel(NioServerSocketChannel.class);
            // 設定監聽埠
            b.localAddress(new InetSocketAddress(port));
            // 設定通道初始化
            b.childHandler(new ChannelInitializer<SocketChannel>()
            {
                //有連線到達時會建立一個channel
                protected void initChannel(SocketChannel ch) throws Exception
                {
                    // pipeline管理channel中的Handler
                    // 在channel佇列中新增一個handler來處理業務
                    ch.pipeline().addLast("echoServerHandler",echoServerHandler);
                }
            });
            // 配置完成,開始繫結server
            // 通過呼叫sync同步方法阻塞直到繫結成功

            ChannelFuture f = b.bind().sync();
            System.out.println(EchoServer.class.getName() +
                    " started and listen on " + f.channel().localAddress());

            // 監聽伺服器關閉事件
            // 應用程式會一直等待,直到channel關閉
            f.channel().closeFuture().sync();
        } catch (Exception e)
        {
            e.printStackTrace();
        } finally
        {
            // 關閉EventLoopGroup,釋放掉所有資源包括建立的執行緒
            work.shutdownGracefully();
            boss.shutdownGracefully();
        }

    }
}

業務邏輯ServerHandler:

要想處理接收到的資料,我們必須繼承ChannelInboundHandlerAdapter介面,重寫裡面的channelRead方法,每當有資料到達,此方法就會被呼叫(一般是Byte型別陣列),我們就在這裡寫我們的業務邏輯:


@Service("echoServerHandler")
public class EchoServerHandler extends ChannelInboundHandlerAdapter
{

    /**
     * 建立連線時,傳送一條訊息
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {
        System.out.println("連線的客戶端地址:" + ctx.channel().remoteAddress());
        super.channelActive(ctx);
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg)
    {
        try
        {
            System.out.println("server received data :" + msg);
            ctx.write(msg);//寫回資料,

        } finally
        {
            ReferenceCountUtil.release(msg);
        }
    }

    public void channelReadComplete(ChannelHandlerContext ctx)
    {
        //flush掉所有寫回的資料
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE); //當flush完成後關閉channel
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
    {
        //捕捉異常資訊
        cause.printStackTrace();
        //出現異常時關閉channel
        ctx.close();
    }
}

關於異常處理:

我們在上面程式中也重寫了exceptionCaught方法,這裡就是對當異常出現時的處理。

Spring +netty 客戶端

EchoClient 扮演如下角色:

  • 連線到Server
  • 向Server寫資料,
  • 等待Server返回資料

回寫客戶端程式EchoClient 主要由兩部分組成:

  • Bootstrap:
    客戶端啟動引導器。負責配置客戶端基本資訊,並且完成客戶端的啟動
  • EchoClientHandler :
    客戶端業務邏輯處理器
EchoClient Bootstrap的過程:

和Server端類似,只不過Client端要同時指定連線主機的IP和Port。

  1. 建立一個Bootstrap例項
  2. 建立一個EventLoopGroup 來處理各種事件,如處理連結請求,傳送接收資料等。
  3. 定義需要連線到的遠端伺服器的InetSocketAddress,包含了IP+埠
  4. 設定 childHandler 來設定通道初始化類。並且在通道初始化時,加入客戶端的業務邏輯處理器echoClientHandler 到伺服器通道的pipeline中 。當連線完成之後,childHandler 會被執行一次 。
  5. 所有準備好之後呼叫 ServerBootstrap.connect() 方法連線Server
EchoClient Bootstrap的程式碼:
@Service("EchoClient")
public class EchoClient
{
    // 伺服器ip地址
    @Value("${server.ip}")
    private String host;
    // 伺服器埠
    @Value("${server.port}")
    private int port;

    // 通過nio方式來接收連線和處理連線
    private EventLoopGroup group = new NioEventLoopGroup();

    @Autowired
    private EchoClientHandler echoClientHandler;

    /**
     * 唯一標記
     */
    private boolean initFalg = true;

    /**
     * 客戶端的是Bootstrap,服務端的則是 ServerBootstrap。
     * 都是AbstractBootstrap的子類。
     **/
    public void run()
    {
        doConnect(new Bootstrap(), group);
    }

    /**
     * 重連
     */
    public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup)
    {
        ChannelFuture f = null;
        try
        {
            if (bootstrap != null)
            {
                bootstrap.group(eventLoopGroup);
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
                bootstrap.remoteAddress(host, port);

                // 設定通道初始化
                bootstrap.handler(
                        new ChannelInitializer<SocketChannel>()
                        {
                            public void initChannel(SocketChannel ch) throws Exception
                            {
                                ch.pipeline().addLast(echoClientHandler);
                            }
                        }
                );
                f = bootstrap.connect().addListener((ChannelFuture futureListener) ->
                {
                    final EventLoop eventLoop = futureListener.channel().eventLoop();
                    if (!futureListener.isSuccess())
                    {
                        System.out.println("與服務端斷開連線!在10s之後準備嘗試重連!");
                        eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);
                    }
                });
                if (initFalg)
                {
                    System.out.println("EchoClient客戶端連線成功!");
                    initFalg = false;
                }
                // 阻塞
                f.channel().closeFuture().sync();
            }
        } catch (Exception e)
        {
            System.out.println("客戶端連線失敗!" + e.getMessage());
        }

    }
}
EchoClientHandler 客戶端業務邏輯處理器

要想處理接收到的資料,我們必須繼承ChannelInboundHandlerAdapter基類,重寫裡面的channelRead方法,每當有資料到達,此方法就會被呼叫(一般是Byte型別陣列),我們就在這裡寫我們的業務邏輯:


@Service("echoClientHandler")
public class EchoClientHandler extends ChannelInboundHandlerAdapter
{
    /**
     * 此方法會在連線到伺服器後被呼叫
     */
    public void channelActive(ChannelHandlerContext ctx)
    {
        ctx.write(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
    }

    /**
     * 業務邏輯處理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
    {
        // 如果不是protobuf型別的資料
        if (!(msg instanceof ByteBuf))
        {
            System.out.println("未知資料!" + msg);
            return;
        }
        try
        {
            ByteBuf in = (ByteBuf) msg;
            System.out.println("Client received: " +
                    ByteBufUtil.hexDump(in.readBytes(in.readableBytes())));
        } catch (Exception e)
        {
            e.printStackTrace();
        } finally
        {
            ReferenceCountUtil.release(msg);
        }
    }
    /**
     * 捕捉到異常
     */
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
    {
        cause.printStackTrace();
        ctx.close();
    }
}

除了繼承ChannelInboundHandlerAdapter基類,我們的業務Handler還可以繼承 SimpleChannelInboundHandler 基類。

那麼這兩個有什麼區別呢?

  • SimpleChannelInboundHandler在接收到資料後會自動release掉資料佔用的Bytebuffer資源(自動呼叫Bytebuffer.release())。如果在channelRead方法返回前還沒有寫完資料,也就是當不能讓它自動release時,就不能繼承 SimpleChannelInboundHandler 基類。而繼承ChannelInboundHandlerAdapter則不會自動釋放,需要手動呼叫ReferenceCountUtil.release()等方法進行釋放。

  • SimpleChannelInboundHandler還有一個好處,可以在泛型引數中,可以直接指定好傳輸的資料格式。所以繼承該類,在處理資料時,不需要判斷資料格式。而繼承ChannelInboundHandlerAdapter則需要進行資料格式的判斷和轉換。

  • 推薦在服務端去繼承ChannelInboundHandlerAdapter,建議手動進行釋放,防止資料未處理完就自動釋放了。

Spring 讀取配置檔案中的屬性值

在Netty 的程式中,一般需要用到伺服器ip和埠,最好的方式是放在配置檔案中,方便修改。

Spring Boot 預設的配置檔名稱為 application.properties,SpringApplication將從以下位置載入此檔案:

  • 當前目錄下的/config子目錄,
  • 當前目錄
  • 一個classpath下的/config包
  • classpath 根路徑(root)

一般情況下,工程在編譯之後,application.properties 放在classpath 根路徑下。

配置檔案 application.properties

#埠號
server.port=8081
#伺服器IP
server.ip=127.0.0.1

注意:檔名字不能錯哦,是application.properties

關聯配置項到類屬性

在類域屬性上通過@Value("${配置項}")指定關聯屬性,Spring Application會自動載入。

public class EchoServer
{
    // 伺服器埠
    @Value("${server.port}")
   private int port;
   //...
}

啟動配置項自動掃描

使用 @Configuration、@EnableAutoConfiguration 啟動配置項的自動掃描。

//自動載入配置資訊
@Configuration
@EnableAutoConfiguration
//使包路徑下帶有@Value的註解自動注入
//使包路徑下帶有@Autowired的類可以自動注入
@ComponentScan("com.crazymakercircle.nettydemo.server")
@SpringBootApplication
public class ServerApp {

     // ........
}

瘋狂創客圈 實戰計劃
  • Netty 億級流量 高併發 IM後臺 開源專案實戰
  • Netty 原始碼、原理、JAVA NIO 原理
  • Java 面試題 一網打盡
  • 瘋狂創客圈 【 部落格園 總入口 】