Netty框架的簡單使用,實現socket通訊
個人部落格:haichenyi.com。感謝關注
題外話,很多人都把JDK1.4提供的NIO稱之為非同步非阻塞I/O;其實,並不然,從嚴格意義上面講,它只能稱為非阻塞I/O。在JDK1.7提供的NIO 2.0,新增了非同步的套接字通道Channel,它才是真正的非同步非阻塞I/O。下表是不同I/O模型的對比:
表1-1 幾種I/O模型和同能對比
同步阻塞I/O(BIO) | 偽非同步I/O | 非阻塞I/O(NIO) | 非同步I/O(AIO) |
---|---|---|---|
客戶端個數:I/O執行緒 | 1:1 | M:N(M可以大於N) | M:1(1個I/O執行緒處理多個客戶端連線) |
I/O型別(阻塞) | 阻塞I/O | 阻塞I/O | 非阻塞I/O |
I/O型別(同步) | 同步I/O | 同步I/O | 同步I/O(I/O多路複用) |
API使用難度 | 簡單 | 簡單 | 非常複雜 |
除錯難度 | 簡單 | 簡單 | 複雜 |
可靠性 | 非常差 | 差 | 高 |
吞吐量 | 低 | 中 | 高 |
簡介
Netty是由JBOSS提供的一個java開源框架。Netty提供非同步的、事件驅動的網路應用程式框架和工具,用以快速開發高效能、高可靠性的網路伺服器和客戶端程式。
也就是說,Netty 是一個基於NIO的客戶、伺服器端程式設計框架,使用Netty 可以確保你快速和簡單的開發出一個網路應用,例如實現了某種協議的客戶,
“快速”和“簡單”並不用產生維護性或效能上的問題。Netty 是一個吸收了多種協議的實現經驗,這些協議包括FTP,SMTP,HTTP,各種二進位制,文字協議,並經過相當精心設計的專案,最終,Netty 成功的找到了一種方式,在保證易於開發的同時還保證了其應用的效能,穩定性和伸縮性。——百度百科
優點
API使用簡單,開發門檻低
功能強大,預置了多種編解碼功能,支援多種主流協議
效能高,通過與業界其他主流NIO框架相比,netty的綜合性能最高
成熟,穩定,Netty已經修復了已經發現的所有的JDK NIO BUG,業務開發人員不用再為NIO的bug而煩惱
社群活躍,版本迭代週期短,發現bug可以及時被修復,同時有更多的新功能加入
經歷了大規模的商界業務考驗,只能得到了驗證
粘包、拆包
概念
TCP是一個流協議,所謂的流,就是沒有界限的一串資料。可以考慮河裡的流水,他們並沒有界限。tcp底層並不瞭解業務層資料的具體含義,他會根據tcp緩衝區的實際情況進行包的劃分,所以在業務上認為,一個完整的包可能會被tcp拆分成多個包進行傳送,也可能把多個小包封裝成一個大資料一起傳送,這就是所謂的tcp粘包,拆包問題
產生原因
應用程式write寫入的位元組大小大於套介面傳送緩衝區的大小
進行MSS大小的tcp分段
乙太網幀的payload大於MTU進行IP分片
解決辦法
訊息定長,每個報文大小固定長度,不夠的補0
包尾增加回車換行符進行分割。例如:FTP協議
將訊息分為訊息頭和訊息體。訊息頭中包含訊息的總長度欄位
更復雜的應用層協議
Netty框架的解決辦法
LineBasedFrameDecoder和StringDecoder兩個類
LineBasedFrameDecoder
LineBasedFrameDecoder的工作原理是依次遍歷ByteBuf中的可讀位元組,判斷看是否有”\r”或者”\r\n”,如果有就以此為結束位置,從可讀索引位置到結束區間的位元組就組成了一行。他是以換行符為結束標誌的解碼器,支援攜帶結束符和不帶結束符兩種解碼方式。同時支援配置單行的最大長度。如果連續讀取到最大長度後仍然沒有發現換行符,就會丟擲異常,同時忽略掉之前讀取的異常流
StringDecoder
StringDecoder的功能就非常簡單,就是將接收到的物件換成字串,然後繼續呼叫後面的handler,LineBasedFrameDecoder+StringDecoder組合就是按換行符切換文字解碼器,他被設計用來支援TCP粘包和拆包。Netty支援其他其他符號的解碼器(DelimiterBasedFrameDecode)
說了這麼多,程式碼來了,就是用Netty實現的心跳。對於懶癌晚期,已經風裝好,可以直接拿過去用,註釋也寫的很清楚
import android.util.Log;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
/**
* @author 海晨憶
* @date 2018/2/6
* @desc
*/
public class SocketTcp {
private static SocketTcp socketTcp = new SocketTcp();
private Channel channel = null;
private EventLoopGroup group;
private int port;
private String host;
public static SocketTcp getInstance() {
return socketTcp;
}
public SocketTcp setPort(int port) {
this.port = port;
return this;
}
public SocketTcp setHost(String host) {
this.host = host;
return this;
}
public void connect() {
if (channel != null) return;
if (group == null) {
//NIO執行緒組
group = new NioEventLoopGroup();
}
try {//配置Bootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//以換行符為結束標記
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new MyHeartSocket());
//以"$_"作為分隔符
/*ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
String s = "$_";
ByteBuf byteBuf = Unpooled.copiedBuffer(s.getBytes());
pipeline.addLast(new DelimiterBasedFrameDecoder(Integer.MAX_VALUE,byteBuf));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new MyHeartSocket());*/
}
});
//發起非同步連線操作
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
channel = channelFuture.channel();
//等待服務端監聽埠關閉
channel.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
disConnect();
}
}
/**
* 斷開tcp連線.
*/
private void disConnect() {
if (null != group) {
group.shutdownGracefully();
}
group = null;
channel = null;
}
public void sendMessage(String msg) {//連線成功後,通過Channel提供的介面進行IO操作
try {
if (channel != null && channel.isOpen()) {
channel.writeAndFlush(msg).sync();
Log.d("wz", "send succeed " + msg);
} else {
reConnect();
throw new Exception("channel is null | closed");
}
} catch (Exception e) {
reConnect();
e.printStackTrace();
}
}
/**
* 重連.
*/
private void reConnect() {
new Thread(this::connect);
}
}
package com.example.zwang.myapplication.socket;
import android.os.SystemClock;
import android.util.Log;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyHeartSocket extends SimpleChannelInboundHandler<Object> {
private ChannelHandlerContext ctx;
private boolean isConnect = false;
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
Log.v("WZ", "連線正常messageReceived");
ByteBuf msg1 = (ByteBuf) msg;
byte[] bytes = new byte[msg1.readableBytes()];
msg1.readBytes(bytes);
String s = new String(bytes, "UTF-8");
Log.v("WZ", "接收到的訊息:" + s);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
Log.v("WZ", "連線正常channelActive");
isConnect = true;
if (this.ctx == null) {
synchronized (MyHeartSocket.class) {
if (this.ctx == null) {
this.ctx = ctx;
MyAppHeart();
}
}
}
}
private void MyAppHeart() {
new Thread(() -> {
while (ctx != null && isConnect) {
String data = "123";
byte[] bytes = data.getBytes();
if (isConnect) {
ctx.writeAndFlush(Unpooled.buffer(bytes.length).writeBytes(bytes));
SystemClock.sleep(3000);
}
}
}).start();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
EventLoop loop = ctx.channel().eventLoop();
loop.schedule(() -> SocketTcp.getInstance().connect(), 5, TimeUnit.SECONDS);
super.channelInactive(ctx);
Log.v("WZ", "重新連線socket伺服器");
isConnect = false;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
Log.v("WZ", "傳送資料包");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
Log.v("WZ", "連接出現異常");
this.ctx.close();
this.ctx = null;
}
}
結束