1. 程式人生 > >NIO、Netty(Netty基礎)

NIO、Netty(Netty基礎)

一、概述

Netty是一個Java的開源框架。提供非同步的、事件驅動的網路應用程式框架和工具,用以快速開發高效能、高可靠性的網路伺服器和客戶端程式。

Netty是一個NIO客戶端,服務端框架。允許快速簡單的開發網路應用程式。例如:服務端和客戶端之間的協議,它簡化了網路程式設計規範。

 

二、NIO開發的問題

1、NIO類庫和API複雜,使用麻煩。

2、需要具備Java多執行緒程式設計能力(涉及到Reactor模式)。

3、客戶端斷線重連、網路不穩定、半包讀寫、失敗快取、網路阻塞和異常碼流等問題處理難度非常大

4、存在部分BUG

 

NIO進行伺服器開發的步驟:

1、建立ServerSocketChannel,配置為非阻塞模式;

2、繫結監聽,配置TCP引數;

3、建立一個獨立的IO執行緒,用於輪詢多路複用器Selector;

4、建立Selector,將之前建立的ServerSocketChannel註冊到Selector上,監聽Accept事件;

5、啟動IO執行緒,在迴圈中執行Select.select()方法,輪詢就緒的Channel;

6、當輪詢到處於就緒狀態的Channel時,需要對其進行判斷,如果是OP_ACCEPT狀態,說明有新的客戶端接入,則呼叫ServerSocketChannel.accept()方法接受新的客戶端;

7、設定新接入的客戶端鏈路SocketChannel為非阻塞模式,配置TCP引數;

8、將SocketChannel註冊到Selector上,監聽READ事件;

9、如果輪詢的Channel為OP_READ,則說明SocketChannel中有新的準備就緒的資料包需要讀取,則構造ByteBuffer物件,讀取資料包;

10、如果輪詢的Channel為OP_WRITE,則說明還有資料沒有傳送完成,需要繼續傳送。

/**
 * 服務端
 */
public class TimeServer {

	public static void main(String[] args) throws Exception {
		int port=8080; //服務端預設埠
		new TimeServer().bind(port);
	}
	
	public void bind(int port) throws Exception{
		//1用於服務端接受客戶端的連線
		EventLoopGroup acceptorGroup = new NioEventLoopGroup();
		//2用於進行SocketChannel的網路讀寫
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			//Netty用於啟動NIO伺服器的輔助啟動類
			ServerBootstrap sb = new ServerBootstrap();
			//將兩個NIO執行緒組傳入輔助啟動類中
			sb.group(acceptorGroup, workerGroup)
				//設定建立的Channel為NioServerSocketChannel型別
				.channel(NioServerSocketChannel.class)
				//配置NioServerSocketChannel的TCP引數
				.option(ChannelOption.SO_BACKLOG, 1024)
				//設定繫結IO事件的處理類
				.childHandler(new ChannelInitializer<SocketChannel>() {
					//建立NIOSocketChannel成功後,在進行初始化時,將它的ChannelHandler設定到ChannelPipeline中,用於處理網路IO事件
					@Override
					protected void initChannel(SocketChannel arg0) throws Exception {
						
						arg0.pipeline().addLast(new TimeServerHandler());
					}
				});
			//繫結埠,同步等待成功(sync():同步阻塞方法,等待bind操作完成才繼續)
			//ChannelFuture主要用於非同步操作的通知回撥
			ChannelFuture cf = sb.bind(port).sync();
			System.out.println("服務端啟動在8080埠。");
			//等待服務端監聽埠關閉
			cf.channel().closeFuture().sync();
		} finally {
			//優雅退出,釋放執行緒池資源
			acceptorGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
}
/**
 * 服務端channel
 */
public class TimeServerHandler extends ChannelHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf buf = (ByteBuf) msg;
		//buf.readableBytes():獲取緩衝區中可讀的位元組數;
		//根據可讀位元組數建立陣列
		byte[] req = new byte[buf.readableBytes()];
		buf.readBytes(req);
		String body = new String(req, "UTF-8");
		System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body);
		String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
		
		ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
		//將待發送的訊息放到傳送快取陣列中
		ctx.writeAndFlush(resp);
	}
}
/**
 * 客戶端
 */
public class TimeClient {
	public static void main(String[] args) throws Exception {
		int port=8080; //服務端預設埠
		new TimeClient().connect(port, "127.0.0.1");
	}
	public void connect(int port, String host) throws Exception{
		//配置客戶端NIO執行緒組
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap bs = new Bootstrap();
			bs.group(group)
				.channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					//建立NIOSocketChannel成功後,在進行初始化時,將它的ChannelHandler設定到ChannelPipeline中,用於處理網路IO事件
					protected void initChannel(SocketChannel arg0) throws Exception {
						arg0.pipeline().addLast(new TimeClientHandler());
					}
				});
			//發起非同步連線操作
			ChannelFuture cf = bs.connect(host, port).sync();
			//等待客戶端鏈路關閉
			cf.channel().closeFuture().sync();
		} finally {
			//優雅退出,釋放NIO執行緒組
			group.shutdownGracefully();
		}
	}
}
/**
 * 客戶端channel
 */
public class TimeClientHandler extends ChannelHandlerAdapter {

	@Override
	//向伺服器傳送指令
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		for (int i = 0; i < 1; i++) {
			byte[] req = "QUERY TIME ORDER".getBytes();
			ByteBuf firstMessage = Unpooled.buffer(req.length);
			firstMessage.writeBytes(req);
			ctx.writeAndFlush(firstMessage);
		}
	}

	@Override
	//接收伺服器的響應
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf buf = (ByteBuf) msg;
		//buf.readableBytes():獲取緩衝區中可讀的位元組數;
		//根據可讀位元組數建立陣列
		byte[] req = new byte[buf.readableBytes()];
		buf.readBytes(req);
		String body = new String(req, "UTF-8");
		System.out.println("Now is : "+body);
	}

	@Override
	//異常處理
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		//釋放資源
		ctx.close();
	}
	
}

 

三、Netty的優點

1、API使用簡單,開發門檻低;

2、功能強大,預置了多種編解碼功能,支援多種主流協議;

3、定製功能強,可以通過ChannelHandler對通訊框架進行靈活的擴充套件;

4、效能高,通過與其他業界主流的NIO框架對比,Netty綜合性能最優;

5、成熟、穩定,Netty修復了已經發現的NIO所有BUG;

6、社群活躍;

7、經歷了很多商用專案的考驗。

 

四、粘包/拆包問題

TCP是一個“流”協議,所謂流,就是沒有界限的一串資料。可以想象為河流中的水,並沒有分界線。TCP底層並不瞭解上層業務資料的具體含義,它會根據TCP緩衝區的實際情況進行包的劃分,所以在業務上認為,一個完整的包可能會被TCP拆分成多個包進行傳送,也有可能把多個小的包封裝成一個大的資料包傳送,這就是所謂的TCP粘包和拆包問題。

TCP粘包拆包問題示例圖:

假設客戶端分別傳送了兩個資料包D1和D2給服務端,由於服務端一次讀取到的位元組數是不確定的,可能存在以下4種情況。

1、服務端分兩次讀取到了兩個獨立的資料包,分別是D1和D2,沒有粘包和拆包;

2、服務端一次接收到了兩個資料包,D1和D2粘合在一起,被稱為TCP粘包;

3、服務端分兩次讀取到了兩個資料包,第一次讀取到了完整的D1包和D2包的部分內容,第二次讀取到了D2包的剩餘部分內容,這被稱為TCP拆包;

4、服務端分兩次讀取到了兩個資料包,第一次讀取到了D1包的部分內容D1_1,第二次讀取到了D1包的剩餘內容D1_1和D2包的完整內容;

如果此時伺服器TCP接收滑窗非常小,而資料包D1和D2比較大,很有可能發生第五種情況,既服務端分多次才能將D1和D2包接收完全,期間發生多次拆包;

問題的解決策略

由於底層的TCP無法理解上層的業務資料,所以在底層是無法保證資料包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案可歸納如下:

1、訊息定長,例如每個報文的大小為固定長度200位元組,如果不夠,空位補空格;

2、在包尾增加回車換行符進行分割,例如FTP協議;

3、將訊息分為訊息頭和訊息體,訊息頭中包含訊息總長度(或訊息體總長度)的欄位,通常設計思路為訊息頭的第一個欄位使用int32來表示訊息的總程度;

4、更復雜的應用層協議;

LineBasedFrameDecoder

為了解決TCP粘包/拆包導致的半包讀寫問題,Netty預設提供了多種編解碼器用於處理半包。

LinkeBasedFrameDecoder的工作原理是它一次遍歷ByteBuf中的可讀位元組,判斷看是否有“\n”、“\r\n”,如果有,就一次位置為結束位置,從可讀索引到結束位置區間的位元組就組成一行。它是以換行符為結束標誌的編解碼,支援攜帶結束符或者不攜帶結束符兩種解碼方式,同時支援配置單行的最大長度。如果連續讀取到最大長度後任然沒有發現換行符,就會丟擲異常,同時忽略掉之前讀到的異常碼流。

/**
 * 服務端 
 */
public class TimeServer {

	public static void main(String[] args) throws Exception {
		int port=8080; //服務端預設埠
		new TimeServer().bind(port);
	}
	public void bind(int port) throws Exception{
		//Reactor執行緒組
		//1用於服務端接受客戶端的連線
		EventLoopGroup acceptorGroup = new NioEventLoopGroup();
		//2用於進行SocketChannel的網路讀寫
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			//Netty用於啟動NIO伺服器的輔助啟動類
			ServerBootstrap sb = new ServerBootstrap();
			//將兩個NIO執行緒組傳入輔助啟動類中
			sb.group(acceptorGroup, workerGroup)
				//設定建立的Channel為NioServerSocketChannel型別
				.channel(NioServerSocketChannel.class)
				//配置NioServerSocketChannel的TCP引數
				.option(ChannelOption.SO_BACKLOG, 1024)
				//設定繫結IO事件的處理類
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel arg0) throws Exception {
						//處理粘包/拆包問題
						arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
						arg0.pipeline().addLast(new StringDecoder());
						
						arg0.pipeline().addLast(new TimeServerHandler());
					}
				});
			//繫結埠,同步等待成功(sync():同步阻塞方法)
			//ChannelFuture主要用於非同步操作的通知回撥
			ChannelFuture cf = sb.bind(port).sync();
				
			//等待服務端監聽埠關閉
			cf.channel().closeFuture().sync();
		} finally {
			//優雅退出,釋放執行緒池資源
			acceptorGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
}
/**
 * 服務端channel
 */
public class TimeServerHandler extends ChannelHandlerAdapter {

	private int counter;
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//		ByteBuf buf = (ByteBuf) msg;
//		//buf.readableBytes():獲取緩衝區中可讀的位元組數;
//		//根據可讀位元組數建立陣列
//		byte[] req = new byte[buf.readableBytes()];
//		buf.readBytes(req);
//		String body = new String(req, "UTF-8");
		String body = (String) msg;
		System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter);
		String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
		currentTime = currentTime + System.getProperty("line.separator");
		
		ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
		//將待發送的訊息放到傳送快取陣列中
		ctx.writeAndFlush(resp);
	}

}
/**
 * 客戶端
 */
public class TimeClient {
	public static void main(String[] args) throws Exception {
		int port=8080; //服務端預設埠
		new TimeClient().connect(port, "127.0.0.1");
	}
	public void connect(int port, String host) throws Exception{
		//配置客戶端NIO執行緒組
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap bs = new Bootstrap();
			bs.group(group)
				.channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					//建立NIOSocketChannel成功後,在進行初始化時,將它的ChannelHandler設定到ChannelPipeline中,用於處理網路IO事件
					protected void initChannel(SocketChannel arg0) throws Exception {
						//處理粘包/拆包問題
						arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
						arg0.pipeline().addLast(new StringDecoder());
						
						arg0.pipeline().addLast(new TimeClientHandler());
					}
				});
			//發起非同步連線操作
			ChannelFuture cf = bs.connect(host, port).sync();
			//等待客戶端鏈路關閉
			cf.channel().closeFuture().sync();
		} finally {
			//優雅退出,釋放NIO執行緒組
			group.shutdownGracefully();
		}
	}
}
/**
 * 客戶端channel
 */
public class TimeClientHandler extends ChannelHandlerAdapter {

	private int counter;
	private byte[] req;
	
	@Override
	//向伺服器傳送指令
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		ByteBuf message=null;
		//模擬一百次請求,傳送重複內容
		for (int i = 0; i < 200; i++) {
			req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes();
			message=Unpooled.buffer(req.length);
			message.writeBytes(req);
			ctx.writeAndFlush(message);
		}
		
	}

	@Override
	//接收伺服器的響應
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//		ByteBuf buf = (ByteBuf) msg;
//		//buf.readableBytes():獲取緩衝區中可讀的位元組數;
//		//根據可讀位元組數建立陣列
//		byte[] req = new byte[buf.readableBytes()];
//		buf.readBytes(req);
//		String body = new String(req, "UTF-8");
		String body = (String) msg;
		System.out.println("Now is : "+body+". the counter is : "+ ++counter);
	}

	@Override
	//異常處理
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		//釋放資源
		ctx.close();
	}
	
}

DelimiterBasedFrameDecoder

實現自定義分隔符作為訊息的結束標誌,完成解碼。

/**
 * 服務端
 */
public class TimeServer {
	public static void main(String[] args) throws Exception {
		int port=8080; //服務端預設埠
		new TimeServer().bind(port);
	}

	public void bind(int port) throws Exception{
		//Reactor執行緒組
		//1用於服務端接受客戶端的連線
		EventLoopGroup acceptorGroup = new NioEventLoopGroup();
		//2用於進行SocketChannel的網路讀寫
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			//Netty用於啟動NIO伺服器的輔助啟動類
			ServerBootstrap sb = new ServerBootstrap();
			//將兩個NIO執行緒組傳入輔助啟動類中
			sb.group(acceptorGroup, workerGroup)
				//設定建立的Channel為NioServerSocketChannel型別
				.channel(NioServerSocketChannel.class)
				//配置NioServerSocketChannel的TCP引數
				.option(ChannelOption.SO_BACKLOG, 1024)
				//設定繫結IO事件的處理類
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel arg0) throws Exception {
						//處理粘包/拆包問題
						ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
						arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
						arg0.pipeline().addLast(new StringDecoder());
						arg0.pipeline().addLast(new TimeServerHandler());
					}
				});
			//繫結埠,同步等待成功(sync():同步阻塞方法)
			//ChannelFuture主要用於非同步操作的通知回撥
			ChannelFuture cf = sb.bind(port).sync();
				
			//等待服務端監聽埠關閉
			cf.channel().closeFuture().sync();
		} finally {
			//優雅退出,釋放執行緒池資源
			acceptorGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
}
/**
 * 服務端channel
 */
public class TimeServerHandler extends ChannelHandlerAdapter {

	private int counter;
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		String body = (String) msg;
		System.out.println("The time server(Thread:"+Thread.currentThread()+") receive order : "+body+". the counter is : "+ ++counter);
		String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
		currentTime += "$_";
		
		ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
		//將待發送的訊息放到傳送快取陣列中
		ctx.writeAndFlush(resp);
	}
}
/**
 * 客戶端
 */
public class TimeClient {
	public static void main(String[] args) throws Exception {
		int port=8080; //服務端預設埠
		new TimeClient().connect(port, "127.0.0.1");
	}
	public void connect(int port, String host) throws Exception{
		//配置客戶端NIO執行緒組
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap bs = new Bootstrap();
			bs.group(group)
				.channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					//建立NIOSocketChannel成功後,在進行初始化時,將它的ChannelHandler設定到ChannelPipeline中,用於處理網路IO事件
					protected void initChannel(SocketChannel arg0) throws Exception {
						//處理粘包/拆包問題
						ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
						arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
						arg0.pipeline().addLast(new StringDecoder());
						
						arg0.pipeline().addLast(new TimeClientHandler());
					}
				});
			//發起非同步連線操作
			ChannelFuture cf = bs.connect(host, port).sync();
			//等待客戶端鏈路關閉
			cf.channel().closeFuture().sync();
		} finally {
			//優雅退出,釋放NIO執行緒組
			group.shutdownGracefully();
		}
	}
}
/**
 * 客戶端channel
 */
public class TimeClientHandler extends ChannelHandlerAdapter {
	
	private int counter;
	private byte[] req;
	
	@Override
	//向伺服器傳送指令
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		ByteBuf message=null;
		//模擬一百次請求,傳送重複內容
		for (int i = 0; i < 200; i++) {
			req = ("QUERY TIME ORDER"+"$_").getBytes();
			message=Unpooled.buffer(req.length);
			message.writeBytes(req);
			ctx.writeAndFlush(message);
		}
		
	}

	@Override
	//接收伺服器的響應
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		String body = (String) msg;
		System.out.println("Now is : "+body+". the counter is : "+ ++counter);
	}

	@Override
	//異常處理
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		//釋放資源
		ctx.close();
	}
	
}

FixedLengthFrameDecoder

是固定長度解碼器,能夠按照指定的長度對訊息進行自動解碼,開發者不需要考慮TCP的粘包/拆包問題。

 

五、Netty的高效能

1、非同步非阻塞通訊

在IO程式設計過程中,當需要同時處理多個客戶端接入請求時,可以利用多執行緒或者IO多路複用技術進行處理。IO多路複用技術通過把多個IO的阻塞複用到同一個Selector的阻塞上,從而使得系統在單執行緒的情況下可以同時處理多個客戶端請求。與傳統的多執行緒/多程序模型相比,IO多路複用的最大優勢是系統開銷小,系統不需要建立新的額外程序或者執行緒,也不需要維護這些程序和執行緒的執行,降低了系統的維護工作量,節省了系統資源。

Netty的IO執行緒NioEventLoop由於聚合了多路複用器Selector,可以同時併發處理成百上千個客戶端SocketChannel。由於讀寫操作都是非阻塞的,這就可以充分提升IO執行緒的執行效率,避免由頻繁的IO阻塞導致的執行緒掛起。另外,由於Netty採用了非同步通訊模式,一個IO執行緒可以併發處理N個客戶端連線和讀寫操作,這從根本上解決了傳統同步阻塞IO中 一連線一執行緒模型,架構的效能、彈性伸縮能力和可靠性都得到了極大的提升。

 

2、高效的Reactor執行緒模型

常用的Reactor執行緒模型有三種,分別如下:

1.Reactor單執行緒模型;

2.Reactor多執行緒模型;

3.主從Reactor多執行緒模型;

 

Reactor單執行緒模型,指的是所有的IO操作都在同一個NIO執行緒上面完成,NIO執行緒職責如下:

1、作為NIO服務端,接收客戶端的TCP連線;

2、作為NIO客戶端,向服務端發起TCP連線;

3、讀取通訊對端的請求或者應答訊息;

4、向通訊對端傳送請求訊息或者應答訊息;

 

由於Reactor模式使用的是非同步非阻塞IO,所有的IO操作都不會導致阻塞,理論上一個執行緒可以獨立處理所有IO相關操作。從架構層面看,一個NIO執行緒確實可以完成其承擔的職責。例如,通過Acceptor接收客戶端的TCP連線請求訊息,鏈路建立成功之後,通過Dispatch將對應的ByteBuffer派發到指定的Handler上進行訊息編碼。使用者Handler可以通過NIO執行緒將訊息傳送給客戶端。

對於一些小容量應用場景,可以使用單執行緒模型,但是對於高負載、大併發的應用卻不合適,主要原因如下:

1、一個NIO執行緒同時處理成百上千的鏈路,效能上無法支撐。即便NIO執行緒的CPU負荷達到100%,也無法滿足海量訊息的編碼、解碼、讀取和傳送;

2、當NIO執行緒負載過重後,處理速度將變慢,這會導致大量客戶端連線超時,超時之後往往會進行重發,這更加重了NIO執行緒的負載,最終會導致大量訊息積壓和處理超時,NIO執行緒會成為系統的效能瓶頸;

3、可靠性問題。一旦NIO執行緒意外進入死迴圈,會導致整個系統通訊模組不可用,不能接收和處理外部訊息,造成節點故障。

為了解決這些問題,從而演進出了Reactor多執行緒模型。

 

Reactor多執行緒模型與單執行緒模型最大的區別就是有一組NIO執行緒處理IO操作,特點如下:

1、有一個專門的NIO執行緒——Acceptor執行緒用於監聽服務端,接收客戶端TCP連線請求;

2、網路IO操作——讀、寫等由一個NIO執行緒池負責,執行緒池可以採用標準的JDK執行緒池實現,它包含一個任務佇列和N個可用的執行緒,由這些NIO執行緒負責訊息的讀取、編碼、解碼和傳送;

3、1個NIO執行緒可以同時處理N條鏈路,但是1個鏈路只對應1個NIO執行緒,防止發生併發操作問題。

在絕大多數場景下,Reactor多執行緒模型都可以滿足效能需求;但是,在極特殊應用場景中,一個NIO執行緒負責監聽和處理所有的客戶端連線可能會存在效能問題。例如百萬客戶端併發連線,或者服務端需要對客戶端的握手訊息進行安全認證,認證本身非常損耗效能。在這類場景下,單獨一個Acceptor執行緒可能會存在效能不足問題,為了解決效能問題,產生了第三種Reactor執行緒模型——主從Reactor多執行緒模型。

 

主從Reactor執行緒模型的特點是:服務端用於接收客戶端連線的不再是一個單獨的NIO執行緒,而是一個獨立的NIO執行緒池。Acceptor接收到客戶端TCP連線請求處理完成後(可能包含接入認證等),將新建立的SocketChannel註冊到IO執行緒池(subReactor執行緒池)的某個IO執行緒上,由它負責SocketChannel的讀寫和編解碼工作。Acceptor執行緒池只用於客戶端的登入、握手和安全認證,一旦鏈路建立成功,就將鏈路註冊到後端subReactor執行緒池的IO執行緒上,由IO執行緒負責後續的IO操作。

利用主從NIO執行緒模型,可以解決1個服務端監聽執行緒無法有效處理所有客戶端連線的效能不足問題。Netty官方推薦使用該執行緒模型。它的工作流程總結如下:

1、從主執行緒池中隨機選擇一個Reactor執行緒作為Acceptor執行緒,用於繫結監聽埠,接收客戶端連線;

2、Acceptor執行緒接收客戶端連線請求之後,建立新的SocketChannel,將其註冊到主執行緒池的其他Reactor執行緒上,由其負責接入認證、IP黑白名單過濾、握手等操作;

3、然後也業務層的鏈路正式建立成功,將SocketChannel從主執行緒池的Reactor執行緒的多路複用器上摘除,重新註冊到Sub執行緒池的執行緒上,用於處理IO的讀寫操作。

 

3、無化的序列設計

在大多數場景下,並行多執行緒處理可以提升系統的併發效能。但是,如果對於共享資源的併發訪問處理不當,會帶來嚴重的鎖競爭,這最終會導致效能的下降。為了儘可能地避免鎖競爭帶來的效能損耗,可以通過序列化設計,既訊息的處理儘可能在同一個執行緒內完成,期間不進行執行緒切換,這樣就避免了多執行緒競爭和同步鎖。

為了儘可能提升效能,Netty採用了序列無鎖化設計,在IO執行緒內部進行序列操作,避免多執行緒競爭導致的效能下降。表面上看,序列化設計似乎CPU利用率不高,併發程度不夠。但是,通過調整NIO執行緒池的執行緒引數,可以同時啟動多個序列化的執行緒並行執行,這種區域性無鎖化的序列執行緒設計相比一個佇列——多個工作執行緒模型效能更優。

Netty序列化設計工作原理圖如下:

Netty的NioEventLoop讀取到訊息後,直接呼叫ChannelPipeline的fireChannelRead(Object msg),只要使用者不主動切換執行緒,一直會由NioEventLoop呼叫到使用者的Handler,期間不進行執行緒切換。這種序列化處理方式避免了多執行緒導致的鎖競爭,從效能角度看是最優的。

 

4、高效的併發程式設計

Netty高效併發程式設計主要體現

1、volatile的大量、正確使用;

2、CAS和原子類的廣泛使用;

3、執行緒安全容器的使用;

4、通過讀寫鎖提升併發效能。

 

5、高效能的序列化框架

    影響序列化效能的關鍵因素總結如下:

    1、序列化後的碼流大小(網路寬頻的佔用);

    2、序列化與反序列化的效能(CPU資源佔用);

    3、是否支援跨語言(異構系統的對接和開發語言切換)。

    Netty預設提供了對GoogleProtobuf的支援,通過擴充套件Netty的編解碼介面,使用者可以實現其他的高效能序列化框架

 

6、零拷貝

    Netty的“零拷貝”主要體現在三個方面:

    1)、Netty的接收和傳送ByteBuffer採用DIRECT BUFFERS,使用堆外直接記憶體進行Socket讀寫,不需要進行位元組緩衝區的二次拷貝。如果使用傳統的堆記憶體(HEAP BUFFERS)進行Socket讀寫,JVM會將堆記憶體Buffer拷貝一份到直接記憶體中,然後才寫入Socket中。相比於堆外直接記憶體,訊息在傳送過程中多了一次緩衝區的記憶體拷貝。

    2)、第二種“零拷貝 ”的實現CompositeByteBuf,它對外將多個ByteBuf封裝成一個ByteBuf,對外提供統一封裝後的ByteBuf介面。

    3)、第三種“零拷貝”就是檔案傳輸,Netty檔案傳輸類DefaultFileRegion通過transferTo方法將檔案傳送到目標Channel中。很多作業系統直接將檔案緩衝區的內容傳送到目標Channel中,而不需要通過迴圈拷貝的方式,這是一種更加高效的傳輸方式,提升了傳輸效能,降低了CPU和記憶體佔用,實現了檔案傳輸的“零拷貝”。

        

7、記憶體池

    隨著JVM虛擬機器和JIT即時編譯技術的發展,物件的分配和回收是個非常輕量級的工作。但是對於緩衝區Buffer,情況卻稍有不同,特別是對於堆外直接記憶體的分配和回收,是一件耗時的操作。為了儘量重用緩衝區,Netty提供了基於記憶體池的緩衝區重用機制。

  

8、靈活的TCP引數配置能力

    Netty在啟動輔助類中可以靈活的配置TCP引數,滿足不同的使用者場景。合理設定TCP引數在某些場景下對於效能的提升可以起到的顯著的效果,總結一下對效能影響比較大的幾個配置項:

    1)、SO_RCVBUF和SO_SNDBUF:通常建議值為128KB或者256KB;

    2)、TCP_NODELAY:NAGLE演算法通過將緩衝區內的小封包自動相連,組成較大的封包,阻止大量小封包的傳送阻塞網路,從而提高網路應用效率。但是對於時延敏感的應用場景需要關閉該優化演算法;

    3)、軟中斷:如果Linux核心版本支援RPS(2.6.35以上版本),開啟RPS後可以實現軟中斷,提升網路吞吐量。RPS根據資料包的源地址,目的地址以及目的和源埠,計算出一個hash值,然後根據這個hash值來選擇軟中斷執行的CPU。從上層來看,也就是說將每個連線和CPU繫結,並通過這個hash值,來均衡軟中斷在多個CPU上,提升網路並行處理效能。