Netty入門基礎筆記
阿新 • • 發佈:2018-12-13
一、Netty 入門基礎
1.1、基礎概念
Netty
是一個 NIO
client-server(客戶端伺服器)框架,使用Netty可以快速開發網路應用,例如伺服器和客戶端協議。Netty提供了一種新的方式來開發網路應用程式,這種新的方式使得它很容易使用和有很強的擴充套件性。Netty
的內部實現是很複雜的,但是Netty提供了簡單易用的api從網路處理程式碼中解耦業務邏輯。Netty是完全基於NIO實現的,所以整個Netty都是非同步的。
網路應用程式通常需要有較高的可擴充套件性,無論是Netty還是其他的基於JavaNIO的框架,都會提供可擴充套件性的解決方案。Netty中 一個關鍵組成部分是它的非同步特性
Netty
是最流行的NIO
框架, 他的健壯性、功能、效能、可定製性和可擴充套件性在同類框架都是首屈一指的。 它已經得到成百,上千的商業/商用專案驗證,如Hadoop
的RPC
框架Avro
、 強大的RocketMQ
、還有主流的分散式通訊框架Dubbox
等等
1.2 架構組成
二、入門案例
2.1 實現步驟
Netty實現通訊的步驟:
- 建立兩個的NIO執行緒組,-一個專門用於網路事件處理(接受客戶端的連線),另一-個則進行網路通訊讀寫。
- 建立一個
ServerBootstrap
物件, 配置Netty
的一系列引數, 例如接受傳出資料的快取大小等等。 - 建立一個實際處理資料的類
Channellnitializer,
- 繫結埠,執行同步阻塞方法等待伺服器端啟動即可。
2.2 入門案例
- maven依賴
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</ version>
</dependency>
- 服務端程式碼
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Server {
public static void main(String[] args) throws Exception {
//1 建立線兩個程組
//一個是用於處理伺服器端接收客戶端連線的
//一個是進行網路通訊的(網路讀寫的)
EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();
//2 建立輔助工具類,用於伺服器通道的一系列配置
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup) //繫結倆個執行緒組
.channel(NioServerSocketChannel.class) //指定NIO的模式
.option(ChannelOption.SO_BACKLOG, 1024) //設定tcp緩衝區
.option(ChannelOption.SO_SNDBUF, 32*1024) //設定傳送緩衝大小
.option(ChannelOption.SO_RCVBUF, 32*1024) //這是接收緩衝大小
.option(ChannelOption.SO_KEEPALIVE, true) //保持連線
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//3 在這裡配置具體資料接收方法的處理
sc.pipeline().addLast(new ServerHandler());
}
});
//4 進行繫結
ChannelFuture cf1 = b.bind(8765).sync();
//5 等待關閉
cf1.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();
}
}
- 服務端的handler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class ServerHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("server channel active... ");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
System.out.println("Server :" + body );
String response = "進行返回給客戶端的響應:" + body ;
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
//.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
System.out.println("讀完了");
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
throws Exception {
ctx.close();
}
}
- 客戶端程式碼
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Client {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
//傳送訊息
Thread.sleep(1000);
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
Thread.sleep(2000);
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
cf1.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
- 客戶端handler
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class ClientHandler extends ChannelHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
System.out.println("Client :" + body );
String response = "收到伺服器端的返回資訊:" + body;
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}
三、TCP粘包、拆包問題
3.1 概念引入
TCP是一個“流”協議,所謂流就是沒有界限的。大家可以想象下如果河裡的水就好比資料,他們是連成一片的, 沒有分界線,TCP底層並不瞭解上層的業務資料具體的含義,它會根據TCP緩衝區的實際情況進行包的劃分,也就是說,在業務上, 我們一個完整的包可能會被TCP分成多個包進行傳送, 也可能把多個小包封裝成一個大的資料包傳送出去,這就是所謂的TCP粘包、拆包問題。
3.2 問題原因與解決方案:
分析TCP粘包、拆包問題的產生原因:
- 應用程式write寫入的位元組大小大於套介面傳送緩衝區的大小
- 進行MSS大小的TCP分段
- 乙太網幀的payload大於MTU進行IP分片
粘包拆包問題的解決方案,常有三種方案:
- 訊息定長,例如每個報文的大小固定為200個位元組,如果不夠,空位補空格;
- 在包尾部增加特殊字元進行分割,例如加回車等
- 講訊息分為訊息頭和訊息體,在訊息頭中包含表示訊息總長度的欄位,然後進行業務邏輯的處理
3.3 Netty解決方法
- 分隔符類
DelimiterBasedFrameDecoder
(自定義分隔符) FixedLengthFrameDecoder
(定長)
3.3.1 分隔符方式
- 在服務端,修改輔助類
//伺服器輔助類
ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_SNDBUF, 32*1024)
.option(ChannelOption.SO_RCVBUF, 32*1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//設定特殊分隔符
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
//設定字串形式的解碼
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ServerHandler());
}
});
- client端同時修改分隔符
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ClientHandler());
}
});
3.3.2 長連線和短連線
長短連線問題,通過新增監聽器,去監聽服務端訊息是否寫完
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "utf-8");
System.out.println("Server :" + body );
String response = "進行返回給客戶端的響應:" + body ;
//當確認服務端確認寫完,則斷開連線
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()))
.addListener(ChannelFutureListener.CLOSE);
}
3.3.3 服務端多次write,一次flush
public class Client {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
//傳送訊息
Thread.sleep(1000);
//寫的快取區
cf1.channel().write(Unpooled.copiedBuffer("hello netty".getBytes()));
cf1.channel().write(Unpooled.copiedBuffer("hello netty".getBytes()));
cf1.channel().write(Unpooled.copiedBuffer("hello netty".getBytes()));
//需要flush。否則不會寫出
cf1.channel().flush();
cf1.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
- 控制檯輸出情況:
Client :進行返回給客戶端的響應:hello nettyhello nettyhello netty
3.3.4 停頓傳送
public class Client {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
//傳送訊息
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
//停頓,當一個間隔,則會發生一次輸出。
Thread.sleep(2000);
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
cf1.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
- 控制檯輸出情況
Client :進行返回給客戶端的響應:777666
Client :進行返回給客戶端的響應:888
3.3.5 繫結多個 埠
- 服務端
public class Serv