NIO、Netty(Netty基礎)
一、概述
Netty是一個Java的開源框架。提供非同步的、事件驅動的網路應用程式框架和工具,用以快速開發高效能、高可靠性的網路伺服器和客戶端程式。
Netty是一個NIO客戶端,服務端框架。允許快速簡單的開發網路應用程式。例如:服務端和客戶端之間的協議,它簡化了網路程式設計規範。
二、NIO開發的問題
1、NIO類庫和API複雜,使用麻煩。
2、需要具備Java多執行緒程式設計能力(涉及到Reactor模式)。
3、客戶端斷線重連、網路不穩定、半包讀寫、失敗快取、網路阻塞和異常碼流等問題處理難度非常大
4、存在部分BUG
NIO進行伺服器開發的步驟:
1、建立ServerSocketChannel,配置為非阻塞模式;
2、繫結監聽,配置TCP引數;
3、建立一個獨立的IO執行緒,用於輪詢多路複用器Selector;
4、建立Selector,將之前建立的ServerSocketChannel註冊到Selector上,監聽Accept事件;
5、啟動IO執行緒,在迴圈中執行Select.select()方法,輪詢就緒的Channel;
6、當輪詢到處於就緒狀態的Channel時,需要對其進行判斷,如果是OP_ACCEPT狀態,說明有新的客戶端接入,則呼叫ServerSocketChannel.accept()方法接受新的客戶端;
7、設定新接入的客戶端鏈路SocketChannel為非阻塞模式,配置TCP引數;
8、將SocketChannel註冊到Selector上,監聽READ事件;
9、如果輪詢的Channel為OP_READ,則說明SocketChannel中有新的準備就緒的資料包需要讀取,則構造ByteBuffer物件,讀取資料包;
10、如果輪詢的Channel為OP_WRITE,則說明還有資料沒有傳送完成,需要繼續傳送。
/** * 服務端 */ public class TimeServer { public static void main(String[] args) throws Exception { int port=8080; //服務端預設埠 new TimeServer().bind(port); } public void bind(int port) throws Exception{ //1用於服務端接受客戶端的連線 EventLoopGroup acceptorGroup = new NioEventLoopGroup(); //2用於進行SocketChannel的網路讀寫 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //Netty用於啟動NIO伺服器的輔助啟動類 ServerBootstrap sb = new ServerBootstrap(); //將兩個NIO執行緒組傳入輔助啟動類中 sb.group(acceptorGroup, workerGroup) //設定建立的Channel為NioServerSocketChannel型別 .channel(NioServerSocketChannel.class) //配置NioServerSocketChannel的TCP引數 .option(ChannelOption.SO_BACKLOG, 1024) //設定繫結IO事件的處理類 .childHandler(new ChannelInitializer<SocketChannel>() { //建立NIOSocketChannel成功後,在進行初始化時,將它的ChannelHandler設定到ChannelPipeline中,用於處理網路IO事件 @Override protected void initChannel(SocketChannel arg0) throws Exception { arg0.pipeline().addLast(new TimeServerHandler()); } }); //繫結埠,同步等待成功(sync():同步阻塞方法,等待bind操作完成才繼續) //ChannelFuture主要用於非同步操作的通知回撥 ChannelFuture cf = sb.bind(port).sync(); System.out.println("服務端啟動在8080埠。"); //等待服務端監聽埠關閉 cf.channel().closeFuture().sync(); } finally { //優雅退出,釋放執行緒池資源 acceptorGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
/**
* 服務端channel
*/
public class TimeServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
//buf.readableBytes():獲取緩衝區中可讀的位元組數;
//根據可讀位元組數建立陣列
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
//將待發送的訊息放到傳送快取陣列中
ctx.writeAndFlush(resp);
}
}
/**
* 客戶端
*/
public class TimeClient {
public static void main(String[] args) throws Exception {
int port=8080; //服務端預設埠
new TimeClient().connect(port, "127.0.0.1");
}
public void connect(int port, String host) throws Exception{
//配置客戶端NIO執行緒組
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bs = new Bootstrap();
bs.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
//建立NIOSocketChannel成功後,在進行初始化時,將它的ChannelHandler設定到ChannelPipeline中,用於處理網路IO事件
protected void initChannel(SocketChannel arg0) throws Exception {
arg0.pipeline().addLast(new TimeClientHandler());
}
});
//發起非同步連線操作
ChannelFuture cf = bs.connect(host, port).sync();
//等待客戶端鏈路關閉
cf.channel().closeFuture().sync();
} finally {
//優雅退出,釋放NIO執行緒組
group.shutdownGracefully();
}
}
}
/**
* 客戶端channel
*/
public class TimeClientHandler extends ChannelHandlerAdapter {
@Override
//向伺服器傳送指令
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 1; i++) {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuf firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
ctx.writeAndFlush(firstMessage);
}
}
@Override
//接收伺服器的響應
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
//buf.readableBytes():獲取緩衝區中可讀的位元組數;
//根據可讀位元組數建立陣列
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("Now is : "+body);
}
@Override
//異常處理
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//釋放資源
ctx.close();
}
}
三、Netty的優點
1、API使用簡單,開發門檻低;
2、功能強大,預置了多種編解碼功能,支援多種主流協議;
3、定製功能強,可以通過ChannelHandler對通訊框架進行靈活的擴充套件;
4、效能高,通過與其他業界主流的NIO框架對比,Netty綜合性能最優;
5、成熟、穩定,Netty修復了已經發現的NIO所有BUG;
6、社群活躍;
7、經歷了很多商用專案的考驗。
四、粘包/拆包問題
TCP是一個“流”協議,所謂流,就是沒有界限的一串資料。可以想象為河流中的水,並沒有分界線。TCP底層並不瞭解上層業務資料的具體含義,它會根據TCP緩衝區的實際情況進行包的劃分,所以在業務上認為,一個完整的包可能會被TCP拆分成多個包進行傳送,也有可能把多個小的包封裝成一個大的資料包傳送,這就是所謂的TCP粘包和拆包問題。
TCP粘包拆包問題示例圖:
假設客戶端分別傳送了兩個資料包D1和D2給服務端,由於服務端一次讀取到的位元組數是不確定的,可能存在以下4種情況。
1、服務端分兩次讀取到了兩個獨立的資料包,分別是D1和D2,沒有粘包和拆包;
2、服務端一次接收到了兩個資料包,D1和D2粘合在一起,被稱為TCP粘包;
3、服務端分兩次讀取到了兩個資料包,第一次讀取到了完整的D1包和D2包的部分內容,第二次讀取到了D2包的剩餘部分內容,這被稱為TCP拆包;
4、服務端分兩次讀取到了兩個資料包,第一次讀取到了D1包的部分內容D1_1,第二次讀取到了D1包的剩餘內容D1_1和D2包的完整內容;
如果此時伺服器TCP接收滑窗非常小,而資料包D1和D2比較大,很有可能發生第五種情況,既服務端分多次才能將D1和D2包接收完全,期間發生多次拆包;
問題的解決策略
由於底層的TCP無法理解上層的業務資料,所以在底層是無法保證資料包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案可歸納如下:
1、訊息定長,例如每個報文的大小為固定長度200位元組,如果不夠,空位補空格;
2、在包尾增加回車換行符進行分割,例如FTP協議;
3、將訊息分為訊息頭和訊息體,訊息頭中包含訊息總長度(或訊息體總長度)的欄位,通常設計思路為訊息頭的第一個欄位使用int32來表示訊息的總程度;
4、更復雜的應用層協議;
LineBasedFrameDecoder
為了解決TCP粘包/拆包導致的半包讀寫問題,Netty預設提供了多種編解碼器用於處理半包。
LinkeBasedFrameDecoder的工作原理是它一次遍歷ByteBuf中的可讀位元組,判斷看是否有“\n”、“\r\n”,如果有,就一次位置為結束位置,從可讀索引到結束位置區間的位元組就組成一行。它是以換行符為結束標誌的編解碼,支援攜帶結束符或者不攜帶結束符兩種解碼方式,同時支援配置單行的最大長度。如果連續讀取到最大長度後任然沒有發現換行符,就會丟擲異常,同時忽略掉之前讀到的異常碼流。
/**
* 服務端
*/
public class TimeServer {
public static void main(String[] args) throws Exception {
int port=8080; //服務端預設埠
new TimeServer().bind(port);
}
public void bind(int port) throws Exception{
//Reactor執行緒組
//1用於服務端接受客戶端的連線
EventLoopGroup acceptorGroup = new NioEventLoopGroup();
//2用於進行SocketChannel的網路讀寫
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//Netty用於啟動NIO伺服器的輔助啟動類
ServerBootstrap sb = new ServerBootstrap();
//將兩個NIO執行緒組傳入輔助啟動類中
sb.group(acceptorGroup, workerGroup)
//設定建立的Channel為NioServerSocketChannel型別
.channel(NioServerSocketChannel.class)
//配置NioServerSocketChannel的TCP引數
.option(ChannelOption.SO_BACKLOG, 1024)
//設定繫結IO事件的處理類
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
//處理粘包/拆包問題
arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new StringDecoder());
arg0.pipeline().addLast(new TimeServerHandler());
}
});
//繫結埠,同步等待成功(sync():同步阻塞方法)
//ChannelFuture主要用於非同步操作的通知回撥
ChannelFuture cf = sb.bind(port).sync();
//等待服務端監聽埠關閉
cf.channel().closeFuture().sync();
} finally {
//優雅退出,釋放執行緒池資源
acceptorGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* 服務端channel
*/
public class TimeServerHandler extends ChannelHandlerAdapter {
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ByteBuf buf = (ByteBuf) msg;
// //buf.readableBytes():獲取緩衝區中可讀的位元組數;
// //根據可讀位元組數建立陣列
// byte[] req = new byte[buf.readableBytes()];
// buf.readBytes(req);
// String body = new String(req, "UTF-8");
String body = (String) msg;
System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
//將待發送的訊息放到傳送快取陣列中
ctx.writeAndFlush(resp);
}
}
/**
* 客戶端
*/
public class TimeClient {
public static void main(String[] args) throws Exception {
int port=8080; //服務端預設埠
new TimeClient().connect(port, "127.0.0.1");
}
public void connect(int port, String host) throws Exception{
//配置客戶端NIO執行緒組
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bs = new Bootstrap();
bs.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
//建立NIOSocketChannel成功後,在進行初始化時,將它的ChannelHandler設定到ChannelPipeline中,用於處理網路IO事件
protected void initChannel(SocketChannel arg0) throws Exception {
//處理粘包/拆包問題
arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
arg0.pipeline().addLast(new StringDecoder());
arg0.pipeline().addLast(new TimeClientHandler());
}
});
//發起非同步連線操作
ChannelFuture cf = bs.connect(host, port).sync();
//等待客戶端鏈路關閉
cf.channel().closeFuture().sync();
} finally {
//優雅退出,釋放NIO執行緒組
group.shutdownGracefully();
}
}
}
/**
* 客戶端channel
*/
public class TimeClientHandler extends ChannelHandlerAdapter {
private int counter;
private byte[] req;
@Override
//向伺服器傳送指令
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message=null;
//模擬一百次請求,傳送重複內容
for (int i = 0; i < 200; i++) {
req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes();
message=Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
@Override
//接收伺服器的響應
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ByteBuf buf = (ByteBuf) msg;
// //buf.readableBytes():獲取緩衝區中可讀的位元組數;
// //根據可讀位元組數建立陣列
// byte[] req = new byte[buf.readableBytes()];
// buf.readBytes(req);
// String body = new String(req, "UTF-8");
String body = (String) msg;
System.out.println("Now is : "+body+". the counter is : "+ ++counter);
}
@Override
//異常處理
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//釋放資源
ctx.close();
}
}
DelimiterBasedFrameDecoder
實現自定義分隔符作為訊息的結束標誌,完成解碼。
/**
* 服務端
*/
public class TimeServer {
public static void main(String[] args) throws Exception {
int port=8080; //服務端預設埠
new TimeServer().bind(port);
}
public void bind(int port) throws Exception{
//Reactor執行緒組
//1用於服務端接受客戶端的連線
EventLoopGroup acceptorGroup = new NioEventLoopGroup();
//2用於進行SocketChannel的網路讀寫
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//Netty用於啟動NIO伺服器的輔助啟動類
ServerBootstrap sb = new ServerBootstrap();
//將兩個NIO執行緒組傳入輔助啟動類中
sb.group(acceptorGroup, workerGroup)
//設定建立的Channel為NioServerSocketChannel型別
.channel(NioServerSocketChannel.class)
//配置NioServerSocketChannel的TCP引數
.option(ChannelOption.SO_BACKLOG, 1024)
//設定繫結IO事件的處理類
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
//處理粘包/拆包問題
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
arg0.pipeline().addLast(new StringDecoder());
arg0.pipeline().addLast(new TimeServerHandler());
}
});
//繫結埠,同步等待成功(sync():同步阻塞方法)
//ChannelFuture主要用於非同步操作的通知回撥
ChannelFuture cf = sb.bind(port).sync();
//等待服務端監聽埠關閉
cf.channel().closeFuture().sync();
} finally {
//優雅退出,釋放執行緒池資源
acceptorGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* 服務端channel
*/
public class TimeServerHandler extends ChannelHandlerAdapter {
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
currentTime += "$_";
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
//將待發送的訊息放到傳送快取陣列中
ctx.writeAndFlush(resp);
}
}
/**
* 客戶端
*/
public class TimeClient {
public static void main(String[] args) throws Exception {
int port=8080; //服務端預設埠
new TimeClient().connect(port, "127.0.0.1");
}
public void connect(int port, String host) throws Exception{
//配置客戶端NIO執行緒組
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bs = new Bootstrap();
bs.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
//建立NIOSocketChannel成功後,在進行初始化時,將它的ChannelHandler設定到ChannelPipeline中,用於處理網路IO事件
protected void initChannel(SocketChannel arg0) throws Exception {
//處理粘包/拆包問題
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
arg0.pipeline().addLast(new StringDecoder());
arg0.pipeline().addLast(new TimeClientHandler());
}
});
//發起非同步連線操作
ChannelFuture cf = bs.connect(host, port).sync();
//等待客戶端鏈路關閉
cf.channel().closeFuture().sync();
} finally {
//優雅退出,釋放NIO執行緒組
group.shutdownGracefully();
}
}
}
/**
* 客戶端channel
*/
public class TimeClientHandler extends ChannelHandlerAdapter {
private int counter;
private byte[] req;
@Override
//向伺服器傳送指令
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message=null;
//模擬一百次請求,傳送重複內容
for (int i = 0; i < 200; i++) {
req = ("QUERY TIME ORDER"+"$_").getBytes();
message=Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
@Override
//接收伺服器的響應
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("Now is : "+body+". the counter is : "+ ++counter);
}
@Override
//異常處理
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//釋放資源
ctx.close();
}
}
FixedLengthFrameDecoder
是固定長度解碼器,能夠按照指定的長度對訊息進行自動解碼,開發者不需要考慮TCP的粘包/拆包問題。
五、Netty的高效能
1、非同步非阻塞通訊
在IO程式設計過程中,當需要同時處理多個客戶端接入請求時,可以利用多執行緒或者IO多路複用技術進行處理。IO多路複用技術通過把多個IO的阻塞複用到同一個Selector的阻塞上,從而使得系統在單執行緒的情況下可以同時處理多個客戶端請求。與傳統的多執行緒/多程序模型相比,IO多路複用的最大優勢是系統開銷小,系統不需要建立新的額外程序或者執行緒,也不需要維護這些程序和執行緒的執行,降低了系統的維護工作量,節省了系統資源。
Netty的IO執行緒NioEventLoop由於聚合了多路複用器Selector,可以同時併發處理成百上千個客戶端SocketChannel。由於讀寫操作都是非阻塞的,這就可以充分提升IO執行緒的執行效率,避免由頻繁的IO阻塞導致的執行緒掛起。另外,由於Netty採用了非同步通訊模式,一個IO執行緒可以併發處理N個客戶端連線和讀寫操作,這從根本上解決了傳統同步阻塞IO中 一連線一執行緒模型,架構的效能、彈性伸縮能力和可靠性都得到了極大的提升。
2、高效的Reactor執行緒模型
常用的Reactor執行緒模型有三種,分別如下:
1.Reactor單執行緒模型;
2.Reactor多執行緒模型;
3.主從Reactor多執行緒模型;
Reactor單執行緒模型,指的是所有的IO操作都在同一個NIO執行緒上面完成,NIO執行緒職責如下:
1、作為NIO服務端,接收客戶端的TCP連線;
2、作為NIO客戶端,向服務端發起TCP連線;
3、讀取通訊對端的請求或者應答訊息;
4、向通訊對端傳送請求訊息或者應答訊息;
由於Reactor模式使用的是非同步非阻塞IO,所有的IO操作都不會導致阻塞,理論上一個執行緒可以獨立處理所有IO相關操作。從架構層面看,一個NIO執行緒確實可以完成其承擔的職責。例如,通過Acceptor接收客戶端的TCP連線請求訊息,鏈路建立成功之後,通過Dispatch將對應的ByteBuffer派發到指定的Handler上進行訊息編碼。使用者Handler可以通過NIO執行緒將訊息傳送給客戶端。
對於一些小容量應用場景,可以使用單執行緒模型,但是對於高負載、大併發的應用卻不合適,主要原因如下:
1、一個NIO執行緒同時處理成百上千的鏈路,效能上無法支撐。即便NIO執行緒的CPU負荷達到100%,也無法滿足海量訊息的編碼、解碼、讀取和傳送;
2、當NIO執行緒負載過重後,處理速度將變慢,這會導致大量客戶端連線超時,超時之後往往會進行重發,這更加重了NIO執行緒的負載,最終會導致大量訊息積壓和處理超時,NIO執行緒會成為系統的效能瓶頸;
3、可靠性問題。一旦NIO執行緒意外進入死迴圈,會導致整個系統通訊模組不可用,不能接收和處理外部訊息,造成節點故障。
為了解決這些問題,從而演進出了Reactor多執行緒模型。
Reactor多執行緒模型與單執行緒模型最大的區別就是有一組NIO執行緒處理IO操作,特點如下:
1、有一個專門的NIO執行緒——Acceptor執行緒用於監聽服務端,接收客戶端TCP連線請求;
2、網路IO操作——讀、寫等由一個NIO執行緒池負責,執行緒池可以採用標準的JDK執行緒池實現,它包含一個任務佇列和N個可用的執行緒,由這些NIO執行緒負責訊息的讀取、編碼、解碼和傳送;
3、1個NIO執行緒可以同時處理N條鏈路,但是1個鏈路只對應1個NIO執行緒,防止發生併發操作問題。
在絕大多數場景下,Reactor多執行緒模型都可以滿足效能需求;但是,在極特殊應用場景中,一個NIO執行緒負責監聽和處理所有的客戶端連線可能會存在效能問題。例如百萬客戶端併發連線,或者服務端需要對客戶端的握手訊息進行安全認證,認證本身非常損耗效能。在這類場景下,單獨一個Acceptor執行緒可能會存在效能不足問題,為了解決效能問題,產生了第三種Reactor執行緒模型——主從Reactor多執行緒模型。
主從Reactor執行緒模型的特點是:服務端用於接收客戶端連線的不再是一個單獨的NIO執行緒,而是一個獨立的NIO執行緒池。Acceptor接收到客戶端TCP連線請求處理完成後(可能包含接入認證等),將新建立的SocketChannel註冊到IO執行緒池(subReactor執行緒池)的某個IO執行緒上,由它負責SocketChannel的讀寫和編解碼工作。Acceptor執行緒池只用於客戶端的登入、握手和安全認證,一旦鏈路建立成功,就將鏈路註冊到後端subReactor執行緒池的IO執行緒上,由IO執行緒負責後續的IO操作。
利用主從NIO執行緒模型,可以解決1個服務端監聽執行緒無法有效處理所有客戶端連線的效能不足問題。Netty官方推薦使用該執行緒模型。它的工作流程總結如下:
1、從主執行緒池中隨機選擇一個Reactor執行緒作為Acceptor執行緒,用於繫結監聽埠,接收客戶端連線;
2、Acceptor執行緒接收客戶端連線請求之後,建立新的SocketChannel,將其註冊到主執行緒池的其他Reactor執行緒上,由其負責接入認證、IP黑白名單過濾、握手等操作;
3、然後也業務層的鏈路正式建立成功,將SocketChannel從主執行緒池的Reactor執行緒的多路複用器上摘除,重新註冊到Sub執行緒池的執行緒上,用於處理IO的讀寫操作。
3、無鎖化的序列設計
在大多數場景下,並行多執行緒處理可以提升系統的併發效能。但是,如果對於共享資源的併發訪問處理不當,會帶來嚴重的鎖競爭,這最終會導致效能的下降。為了儘可能地避免鎖競爭帶來的效能損耗,可以通過序列化設計,既訊息的處理儘可能在同一個執行緒內完成,期間不進行執行緒切換,這樣就避免了多執行緒競爭和同步鎖。
為了儘可能提升效能,Netty採用了序列無鎖化設計,在IO執行緒內部進行序列操作,避免多執行緒競爭導致的效能下降。表面上看,序列化設計似乎CPU利用率不高,併發程度不夠。但是,通過調整NIO執行緒池的執行緒引數,可以同時啟動多個序列化的執行緒並行執行,這種區域性無鎖化的序列執行緒設計相比一個佇列——多個工作執行緒模型效能更優。
Netty序列化設計工作原理圖如下:
Netty的NioEventLoop讀取到訊息後,直接呼叫ChannelPipeline的fireChannelRead(Object msg),只要使用者不主動切換執行緒,一直會由NioEventLoop呼叫到使用者的Handler,期間不進行執行緒切換。這種序列化處理方式避免了多執行緒導致的鎖競爭,從效能角度看是最優的。
4、高效的併發程式設計
Netty中高效併發程式設計主要體現:
1、volatile的大量、正確使用;
2、CAS和原子類的廣泛使用;
3、執行緒安全容器的使用;
4、通過讀寫鎖提升併發效能。
5、高效能的序列化框架
影響序列化效能的關鍵因素總結如下:
1、序列化後的碼流大小(網路寬頻的佔用);
2、序列化與反序列化的效能(CPU資源佔用);
3、是否支援跨語言(異構系統的對接和開發語言切換)。
Netty預設提供了對GoogleProtobuf的支援,通過擴充套件Netty的編解碼介面,使用者可以實現其他的高效能序列化框架
6、零拷貝
Netty的“零拷貝”主要體現在三個方面:
1)、Netty的接收和傳送ByteBuffer採用DIRECT BUFFERS,使用堆外直接記憶體進行Socket讀寫,不需要進行位元組緩衝區的二次拷貝。如果使用傳統的堆記憶體(HEAP BUFFERS)進行Socket讀寫,JVM會將堆記憶體Buffer拷貝一份到直接記憶體中,然後才寫入Socket中。相比於堆外直接記憶體,訊息在傳送過程中多了一次緩衝區的記憶體拷貝。
2)、第二種“零拷貝 ”的實現CompositeByteBuf,它對外將多個ByteBuf封裝成一個ByteBuf,對外提供統一封裝後的ByteBuf介面。
3)、第三種“零拷貝”就是檔案傳輸,Netty檔案傳輸類DefaultFileRegion通過transferTo方法將檔案傳送到目標Channel中。很多作業系統直接將檔案緩衝區的內容傳送到目標Channel中,而不需要通過迴圈拷貝的方式,這是一種更加高效的傳輸方式,提升了傳輸效能,降低了CPU和記憶體佔用,實現了檔案傳輸的“零拷貝”。
7、記憶體池
隨著JVM虛擬機器和JIT即時編譯技術的發展,物件的分配和回收是個非常輕量級的工作。但是對於緩衝區Buffer,情況卻稍有不同,特別是對於堆外直接記憶體的分配和回收,是一件耗時的操作。為了儘量重用緩衝區,Netty提供了基於記憶體池的緩衝區重用機制。
8、靈活的TCP引數配置能力
Netty在啟動輔助類中可以靈活的配置TCP引數,滿足不同的使用者場景。合理設定TCP引數在某些場景下對於效能的提升可以起到的顯著的效果,總結一下對效能影響比較大的幾個配置項:
1)、SO_RCVBUF和SO_SNDBUF:通常建議值為128KB或者256KB;
2)、TCP_NODELAY:NAGLE演算法通過將緩衝區內的小封包自動相連,組成較大的封包,阻止大量小封包的傳送阻塞網路,從而提高網路應用效率。但是對於時延敏感的應用場景需要關閉該優化演算法;
3)、軟中斷:如果Linux核心版本支援RPS(2.6.35以上版本),開啟RPS後可以實現軟中斷,提升網路吞吐量。RPS根據資料包的源地址,目的地址以及目的和源埠,計算出一個hash值,然後根據這個hash值來選擇軟中斷執行的CPU。從上層來看,也就是說將每個連線和CPU繫結,並通過這個hash值,來均衡軟中斷在多個CPU上,提升網路並行處理效能。