1. 程式人生 > 程式設計 >通過入門demo簡單瞭解netty使用方法

通過入門demo簡單瞭解netty使用方法

這篇文章主要介紹了通過入門demo簡單瞭解netty使用方法,文中通過示例程式碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下

前言

最近做一個專案:

大概需求: 多個溫度感測器不斷向java服務傳送溫度資料,該感測器採用socket傳送資料;該資料以$符號開頭和結尾,最後將處理的資料存入資料庫;

我想到的處理方式:採用netty來接收和處理資料,然後用mybatis將處理後的資料存入資料庫;

我在這之前從來沒使用過netty,在網上倒是看到不少關於netty的文章,如今就趁著這個專案寫一下我所學到的東西和遇到的問題,又是怎麼去解決的;

接下來的幾篇文章都是圍繞著這個專案來寫的;本篇主要寫netty的入門demo;

正文

程式碼部分

新建一個maven專案

首先在pom.xml中匯入:

 <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>5.0.0.Alpha1</version>
    </dependency>

服務端

1. DiscardServer類,netty的服務端

public class DiscardServer {
  public void run(int port) throws Exception {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    System.out.println("準備執行埠:" + port);
    try {
      ServerBootstrap b = new ServerBootstrap();
      b = b.group(bossGroup,workerGroup)
          .channel(NioServerSocketChannel.class)
          .option(ChannelOption.SO_BACKLOG,128)
          .childHandler(new ChildChannelHandler());
      //繫結埠,同步等待成功
      ChannelFuture f = b.bind(port).sync();
      //等待服務監聽埠關閉
      f.channel().closeFuture().sync();
    } finally {
      //退出,釋放執行緒資源
      workerGroup.shutdownGracefully();
      bossGroup.shutdownGracefully();
    }
  }
  public static void main(String[] args) throws Exception {
    new DiscardServer().run(8080);
  }
}

2. ChildChannelHandler類:

public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

  protected void initChannel(SocketChannel socketChannel) throws Exception {
    socketChannel.pipeline().addLast(new DiscardServerHandler());
  }
}

3. DiscardServerHandler類

在這裡是繼承的ChannelHandlerAdapter類,當然還可以繼承其他的類,例如SimpleChannelInboundHandler,ChannelInboundHandlerAdapter都可以

public class DiscardServerHandler extends ChannelHandlerAdapter {
  @Override
  public void channelRead(ChannelHandlerContext ctx,Object msg) {

    try {
      ByteBuf in = (ByteBuf) msg;
      System.out.println("傳輸內容是");
      System.out.println(in.toString(CharsetUtil.UTF_8));
      ByteBuf resp= Unpooled.copiedBuffer("收到資訊$".getBytes());
      ctx.writeAndFlush(resp);
    } finally {
      ReferenceCountUtil.release(msg);
    }
  }
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {
    // 出現異常就關閉
    cause.printStackTrace();
    ctx.close();
  }
}

啟動netty服務;

好了,到這裡就能開始接收資料了;

客服端

1.TimeClient類

public class TimeClient {
  public void connect(int port,String host)throws Exception{
    //配置客戶端
    System.out.println(port+"--"+host);
    EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
    try {
      Bootstrap b=new Bootstrap();
      b.group(eventLoopGroup).channel(NioSocketChannel.class)
          .option(ChannelOption.TCP_NODELAY,true)
          .handler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel socketChannel) throws Exception {
              socketChannel.pipeline().addLast(new TimeClientHandler());
            }
          });
      //繫結埠,同步等待成功
      ChannelFuture f = b.connect(host,port).sync();
      //等待服務監聽埠關閉
      f.channel().closeFuture().sync();
    }finally {
      //優雅退出,釋放執行緒資源
      eventLoopGroup.shutdownGracefully();
    }
  }
  public static void main(String[] args) throws Exception {
    new TimeClient().connect(8090,"localhost");
  }
}

2.TimeClientHandler 類

public class TimeClientHandler extends ChannelHandlerAdapter {
  private byte[] req;
  public TimeClientHandler(){
    req="$tmb00035ET3318/08/22 11:5704026.75,027.31,20.00,20.00$".getBytes();
  }
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ByteBuf message=null;
    for(int i=0;i<100;i++){
      message=Unpooled.buffer(req.length);
      message.writeBytes(req);
      ctx.writeAndFlush(message);
    }
  }
  @Override
  public void channelRead(ChannelHandlerContext ctx,Object msg) {
    try {
      ByteBuf in = (ByteBuf) msg;
      System.out.println(in.toString(CharsetUtil.UTF_8));
    } finally {
      ReferenceCountUtil.release(msg);
    }
  }
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {
    // 出現異常就關閉
    cause.printStackTrace();
    ctx.close();
  }
}

在channelActive類中向服務端傳送100次訊息

先啟動服務端,再啟動客戶端;

測試結果一:

服務端:

傳輸內容是
$tmb00035ET3318/08/22 11:5704026.75,20.00$$tmb00035ET3318/08/22 11:5704026.75,20.00$$tmb00035ET3318/08/22 11:5704026.7
傳輸內容是
5,20.00$$tmb00035ET3318/08/22 

客戶端:

8080--localhost
收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊收到資訊

由於內容太多,就不都貼出來了j,直接寫結果吧:

客戶端傳送100次資料,但是服務端只收到了28次,然後服務端向客戶端返回28次資料,客戶端卻只收到一次;

可以發現服務端接收的資料不是完整接收的,這裡出現了拆包,粘包的問題

這裡就不討論拆包,粘包了,百度一大堆,相信你也能看明白;

解決粘包,拆包的問題

解決拆包粘包的方法有很多:

  • 訊息定長,固定每個訊息的固定長度
  • 在訊息末尾使用換行符對訊息進行分割,或者使用其他特殊字元來對訊息進行分割;
  • 將訊息分為訊息頭和訊息體,訊息頭中包含標識訊息總長度;
  • 更復雜的,或者其他的協議。

由於我負責的這個專案戶端傳送是由$開始和結束的資料,返回的資料我也設定的$結束,所以我選擇了第二種方法;

只需要在服務端的DiscardServerHandler中和客戶端的ChannelInitializer中新增幾行相同的程式碼就行了;

服務端:

public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

  protected void initChannel(SocketChannel socketChannel) throws Exception {
    ByteBuf byteBuf= Unpooled.copiedBuffer("$".getBytes());
    socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));
    socketChannel.pipeline().addLast(new DiscardServerHandler());
  }
}

客戶端:

在如下的位置新增如下的程式碼:

 .handler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel socketChannel) throws Exception {
              ByteBuf byteBuf= Unpooled.copiedBuffer("$".getBytes());
              socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));
              socketChannel.pipeline().addLast(new TimeClientHandler());
            }
          });

測試結果

這裡我就不傳送100次資料了,值傳送10次:

服務端:

傳輸內容是
tmb00035ET3318/08/22 11:5704026.75,20.00
傳輸內容是
tmb00035ET3318/08/22 11:5704026.75,20.00

客戶端:

收到資訊
收到資訊
收到資訊
收到資訊
收到資訊
收到資訊
收到資訊
收到資訊
收到資訊
收到資訊

解決我所遇到的問題了;

總結

  • 本來我只需要寫服務端的程式碼的,但是為了更好的演示,所以我寫了客戶端
  • 本篇文章主要就是使用netty傳送和接收資料,還有就是拆包和粘包的問題,當然,netty還可以做其他很多的事情;
  • netty針對對拆包粘包的問題有很多種解決辦法:例如可以用LineBasedFrameDecoder和StringDecoder組合將資訊已換行符來進行拆分;也可以用我上邊的解決方法來解決以特殊字元結束的資訊;
  • 在解決拆包粘包資訊的時候,注意資訊是否符合定義的規則,不然會處理不了資料:例如我上邊的例子,如果服務端在返回資訊是不以$符結尾的話,客戶端是打印不出來資訊的,因為客戶端會認為服務端還沒有傳送完資訊,會一直等待,而且打印不出資料;
  • 這篇文章只是我入門netty的一個小demo,對我還是很有幫助的,當然也希望對閱讀者有那麼一點點幫助;
  • 有什麼不對的地方還請指正,建議也是多多益善;
  • 原始碼地址

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。