1. 程式人生 > >Netty框架的簡單使用,實現socket通訊

Netty框架的簡單使用,實現socket通訊

個人部落格:haichenyi.com。感謝關注

  題外話,很多人都把JDK1.4提供的NIO稱之為非同步非阻塞I/O;其實,並不然,從嚴格意義上面講,它只能稱為非阻塞I/O。在JDK1.7提供的NIO 2.0,新增了非同步的套接字通道Channel,它才是真正的非同步非阻塞I/O。下表是不同I/O模型的對比:

表1-1 幾種I/O模型和同能對比

同步阻塞I/O(BIO) 偽非同步I/O 非阻塞I/O(NIO) 非同步I/O(AIO)
客戶端個數:I/O執行緒 1:1 M:N(M可以大於N) M:1(1個I/O執行緒處理多個客戶端連線)
I/O型別(阻塞) 阻塞I/O 阻塞I/O 非阻塞I/O
I/O型別(同步) 同步I/O 同步I/O 同步I/O(I/O多路複用)
API使用難度 簡單 簡單 非常複雜
除錯難度 簡單 簡單 複雜
可靠性 非常差
吞吐量

簡介

  Netty是由JBOSS提供的一個java開源框架。Netty提供非同步的、事件驅動的網路應用程式框架和工具,用以快速開發高效能、高可靠性的網路伺服器和客戶端程式。

  也就是說,Netty 是一個基於NIO的客戶、伺服器端程式設計框架,使用Netty 可以確保你快速和簡單的開發出一個網路應用,例如實現了某種協議的客戶,

服務端應用。Netty相當簡化和流線化了網路應用的程式設計開發過程,例如,TCP和UDP的socket服務開發。

  “快速”和“簡單”並不用產生維護性或效能上的問題。Netty 是一個吸收了多種協議的實現經驗,這些協議包括FTP,SMTP,HTTP,各種二進位制,文字協議,並經過相當精心設計的專案,最終,Netty 成功的找到了一種方式,在保證易於開發的同時還保證了其應用的效能,穩定性和伸縮性。——百度百科

優點

  1. API使用簡單,開發門檻低

  2. 功能強大,預置了多種編解碼功能,支援多種主流協議

  3. 效能高,通過與業界其他主流NIO框架相比,netty的綜合性能最高

  4. 成熟,穩定,Netty已經修復了已經發現的所有的JDK NIO BUG,業務開發人員不用再為NIO的bug而煩惱

  5. 社群活躍,版本迭代週期短,發現bug可以及時被修復,同時有更多的新功能加入

  6. 經歷了大規模的商界業務考驗,只能得到了驗證

粘包、拆包

概念

  TCP是一個流協議,所謂的流,就是沒有界限的一串資料。可以考慮河裡的流水,他們並沒有界限。tcp底層並不瞭解業務層資料的具體含義,他會根據tcp緩衝區的實際情況進行包的劃分,所以在業務上認為,一個完整的包可能會被tcp拆分成多個包進行傳送,也可能把多個小包封裝成一個大資料一起傳送,這就是所謂的tcp粘包,拆包問題

產生原因

  1. 應用程式write寫入的位元組大小大於套介面傳送緩衝區的大小

  2. 進行MSS大小的tcp分段

  3. 乙太網幀的payload大於MTU進行IP分片

解決辦法

  1. 訊息定長,每個報文大小固定長度,不夠的補0

  2. 包尾增加回車換行符進行分割。例如:FTP協議

  3. 將訊息分為訊息頭和訊息體。訊息頭中包含訊息的總長度欄位

  4. 更復雜的應用層協議

Netty框架的解決辦法

  LineBasedFrameDecoder和StringDecoder兩個類

LineBasedFrameDecoder

  LineBasedFrameDecoder的工作原理是依次遍歷ByteBuf中的可讀位元組,判斷看是否有”\r”或者”\r\n”,如果有就以此為結束位置,從可讀索引位置到結束區間的位元組就組成了一行。他是以換行符為結束標誌的解碼器,支援攜帶結束符和不帶結束符兩種解碼方式。同時支援配置單行的最大長度。如果連續讀取到最大長度後仍然沒有發現換行符,就會丟擲異常,同時忽略掉之前讀取的異常流

StringDecoder

  StringDecoder的功能就非常簡單,就是將接收到的物件換成字串,然後繼續呼叫後面的handler,LineBasedFrameDecoder+StringDecoder組合就是按換行符切換文字解碼器,他被設計用來支援TCP粘包和拆包。Netty支援其他其他符號的解碼器(DelimiterBasedFrameDecode)

  說了這麼多,程式碼來了,就是用Netty實現的心跳。對於懶癌晚期,已經風裝好,可以直接拿過去用,註釋也寫的很清楚

import android.util.Log;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

/**
 * @author 海晨憶
 * @date 2018/2/6
 * @desc
 */
public class SocketTcp {
  private static SocketTcp socketTcp = new SocketTcp();
  private Channel channel = null;
  private EventLoopGroup group;
  private int port;
  private String host;

  public static SocketTcp getInstance() {
    return socketTcp;
  }

  public SocketTcp setPort(int port) {
    this.port = port;
    return this;
  }

  public SocketTcp setHost(String host) {
    this.host = host;
    return this;
  }

  public void connect() {
    if (channel != null) return;
    if (group == null) {
      //NIO執行緒組
      group = new NioEventLoopGroup();
    }
    try {//配置Bootstrap
      Bootstrap bootstrap = new Bootstrap();
      bootstrap.group(group)
          .channel(NioSocketChannel.class)
          .option(ChannelOption.TCP_NODELAY, true)
          .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
              //以換行符為結束標記
              ChannelPipeline pipeline = ch.pipeline();
              pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
              pipeline.addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE));
              pipeline.addLast(new StringDecoder());
              pipeline.addLast(new MyHeartSocket());

              //以"$_"作為分隔符
              /*ChannelPipeline pipeline = ch.pipeline();
              pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
              String s = "$_";
              ByteBuf byteBuf = Unpooled.copiedBuffer(s.getBytes());
              pipeline.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,byteBuf));
              pipeline.addLast(new StringDecoder());
              pipeline.addLast(new MyHeartSocket());*/

            }
          });
      //發起非同步連線操作
      ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
      channel = channelFuture.channel();
      //等待服務端監聽埠關閉
      channel.closeFuture().sync();
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      disConnect();
    }
  }

  /**
   * 斷開tcp連線.
   */
  private void disConnect() {
    if (null != group) {
      group.shutdownGracefully();
    }
    group = null;
    channel = null;
  }

  public void sendMessage(String msg) {//連線成功後,通過Channel提供的介面進行IO操作
    try {
      if (channel != null && channel.isOpen()) {
        channel.writeAndFlush(msg).sync();
        Log.d("wz", "send succeed " + msg);
      } else {
        reConnect();
        throw new Exception("channel is null | closed");
      }
    } catch (Exception e) {
      reConnect();
      e.printStackTrace();
    }
  }

  /**
   * 重連.
   */
  private void reConnect() {
    new Thread(this::connect);
  }
}
package com.example.zwang.myapplication.socket;

import android.os.SystemClock;
import android.util.Log;

import java.util.concurrent.TimeUnit;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;


public class MyHeartSocket extends SimpleChannelInboundHandler<Object> {
  private ChannelHandlerContext ctx;
  private boolean isConnect = false;

  @Override
  protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
    Log.v("WZ", "連線正常messageReceived");
    ByteBuf msg1 = (ByteBuf) msg;
    byte[] bytes = new byte[msg1.readableBytes()];
    msg1.readBytes(bytes);
    String s = new String(bytes, "UTF-8");
    Log.v("WZ", "接收到的訊息:" + s);
  }

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    super.channelActive(ctx);
    Log.v("WZ", "連線正常channelActive");
    isConnect = true;
    if (this.ctx == null) {
      synchronized (MyHeartSocket.class) {
        if (this.ctx == null) {
          this.ctx = ctx;
          MyAppHeart();
        }
      }
    }
  }

  private void MyAppHeart() {
    new Thread(() -> {
      while (ctx != null && isConnect) {
        String data = "123";
        byte[] bytes = data.getBytes();
        if (isConnect) {
          ctx.writeAndFlush(Unpooled.buffer(bytes.length).writeBytes(bytes));
          SystemClock.sleep(3000);
        }
      }
    }).start();
  }

  @Override
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    EventLoop loop = ctx.channel().eventLoop();
    loop.schedule(() -> SocketTcp.getInstance().connect(), 5, TimeUnit.SECONDS);
    super.channelInactive(ctx);
    Log.v("WZ", "重新連線socket伺服器");
    isConnect = false;
  }

  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    super.userEventTriggered(ctx, evt);
    Log.v("WZ", "傳送資料包");
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    super.exceptionCaught(ctx, cause);
    Log.v("WZ", "連接出現異常");
    this.ctx.close();
    this.ctx = null;
  }
}

結束