Netty 工作原理解析
執行緒模型介紹
不同的執行緒模式,對程式的效能有很大影響,為了搞清 Netty 執行緒模式,下面來系統的講解下各個執行緒模式, 最後看看 Netty 執行緒模型有什麼優越性。
目前存在的執行緒模型有:
- 傳統阻塞 I/O 服務模型
- Reactor 模式
根據 Reactor 的數量和處理資源池執行緒的數量不同,有 3 種典型的實現:
- 單 Reactor 單執行緒
- 單 Reactor 多執行緒
- 主從 Reactor 多執行緒
Netty 執行緒模式:Netty 主要基於主從 Reactor 多執行緒模型做了一定的改進,其中主從 Reactor 多執行緒模型有多個 Reactor。
傳統阻塞 I/O 服務模型
工作原理示意圖
模型特點
- 採用阻塞 IO 模式獲取輸入的資料
- 每個連線都需要獨立的執行緒完成資料的輸入,業務處理, 資料返回
問題分析
- 當併發數很大,就會建立大量的執行緒,佔用很大的系統資源
- 連線建立後,如果當前執行緒暫時沒有資料可讀,該執行緒會阻塞在 read 操作,造成執行緒資源浪費
Reactor 模式
Reactor: 反應器模式,也被稱為分發者模式(Dispatcher)或通知者模式(notifier)。
針對傳統阻塞 I/O 服務模型的 2 個缺點,解決方案如下:
- 基於 I/O 複用模型:多個連線共用一個阻塞物件,應用程式只需要在一個阻塞物件等待,無需阻塞等待所有連線。當某個連線有新的資料可以處理時,作業系統通知應用程式,執行緒從阻塞狀態返回,開始進行業務處理。
- 基於執行緒池複用執行緒資源:不必再為每個連線建立執行緒,將連線完成後的業務處理任務分配給執行緒進行處理,一個執行緒可以處理多個連線的業務。
Reactor 模式設計思想
Reactor 模式基本設計思想是I/O 複用結合線程池,如下圖所示:
- Reactor 模式,通過一個或多個輸入同時傳遞給服務處理器(基於事件驅動) 。
- 伺服器端程式處理傳入的多個請求,並將它們同步分派到相應的處理執行緒,因此 Reactor 模式也叫 Dispatcher 模式。
- Reactor 模式使用 IO 複用監聽事件,收到事件後,分發給某個執行緒(程序), 這點就是網路伺服器高併發處理關鍵。
Reactor 模式核心組成
- Reactor:Reactor 在一個單獨的執行緒中執行,負責監聽和分發事件,分發給適當的處理程式來對 IO 事件做出反應。
- Handlers:處理程式執行 I/O 事件要完成的實際事件。Reactor 通過排程適當的處理程式來響應 I/O 事件,處理程式執行非阻塞操作。
Reactor 模式分類
根據 Reactor 的數量和處理資源池執行緒的數量不同,有 3 種典型的實現:
- 單 Reactor 單執行緒
- 單 Reactor 多執行緒
- 主從 Reactor 多執行緒
單Reactor 單執行緒模式
工作原理示意圖
工作流程說明
- Select 是前面 I/O 複用模型介紹的標準網路程式設計API,可以實現應用程式通過一個阻塞物件監聽多路連線請求。
- Reactor 物件通過 Select 監控客戶端請求事件,收到事件後通過 Dispatch 進行分發。
- 如果是建立連線請求事件,則由 Acceptor 通過 Accept 處理連線請求,然後建立一個 Handler 物件處理完成連線後的各種事件
- 如果不是建立連線事件,則 Reactor 會分發呼叫連線對應的 Handler 來響應。
- Handler 會完成 Read -> 業務處理 -> Send 的完整業務流程。
優缺點分析
- 優點:模型簡單,沒有多執行緒、程序通訊、競爭的問題,全部都在一個執行緒中完成。
- 缺點:
- 效能問題,只有一個執行緒,無法完全發揮多核 CPU 的效能。Handler 在處理某個連線上的業務時,整個程序無法處理其他連線事件,很容易導致效能瓶頸。
- 可靠性問題,執行緒意外終止,或者進入死迴圈,會導致整個系統通訊模組不可用,不能接收和處理外部訊息,造成節點故障。
使用場景:客戶端的數量有限,業務處理非常快速,比如 Redis 在業務處理的時間複雜度 O(1) 的情況。
單Reactor 多執行緒模式
工作原理示意圖
工作流程說明
- Reactor 物件通過 Select 監控客戶端請求事件,收到事件後,通過 dispatch 進行分發。
- 如果建立連線請求, 則由 Acceptor 通過 accept 處理連線請求,然後建立一個 Handler 物件處理完成連線後的各種事件。
- 如果不是連線請求,則由 Reactor 分發呼叫連線對應的 Handler 來處理。
- Handler 只負責響應事件,不做具體的業務處理,通過 read 讀取資料後,會分發給後面的 Worker 執行緒池的某個執行緒處理業務。
- Worker 執行緒池會分配獨立執行緒完成真正的業務,並將結果返回給 Handler。
- Handler 收到響應後,通過 send 將結果返回給 client。
優缺點分析
- 優點:可以充分的利用多核 CPU 的處理能力。
- 缺點:多執行緒資料共享和訪問比較複雜,Reactor 處理所有的事件的監聽和響應,在單執行緒執行,在高併發場景容易出現效能瓶頸。
主從Reactor 模式
針對單 Reactor 多執行緒模型中,Reactor 在單執行緒中執行,高併發場景下容易成為效能瓶頸,可以讓 Reactor 在 多執行緒中執行。
工作原理示意圖
工作流程說明
- Reactor 主執行緒 MainReactor 物件通過 select 監聽連線事件,收到事件後,通過Acceptor 處理連線事件。
- 當 Acceptor 處理連線事件後,MainReactor 將連線分配給 SubReactor。
- SubReactor 將連線加入到連線佇列進行監聽,並建立 handler 進行各種事件處理。
- 當有新事件發生時,SubReactor 就會呼叫對應的 handler 處理。
- handler 通過 read 讀取資料,分發給後面的 worker 執行緒處理。
- worker 執行緒池分配獨立的 worker 執行緒進行業務處理,並返回結果。
- handler 收到響應的結果後,再通過 send 將結果返回給 client。
- Reactor 主執行緒可以對應多個 Reactor 子執行緒, 即 MainRecator 可以關聯多個SubReactor。
優缺點分析
- 優點:
- 父執行緒與子執行緒的資料互動簡單職責明確,父執行緒只需要接收新連線,子執行緒完成後續的業務處理。
- 父執行緒與子執行緒的資料互動簡單,Reactor 主執行緒只需要把新連線傳給子執行緒,子執行緒無需返回資料。
- 缺點:程式設計複雜度較高
結合例項:這種模型在許多專案中廣泛使用,包括 Nginx 主從 Reactor 多程序模型,Memcached 主從多執行緒, Netty 主從多執行緒模型的支援。
Reactor 模式優點和缺點
優點
- 響應快,不必為單個同步時間所阻塞,雖然 Reactor 本身依然是同步的。
- 可以最大程度的避免複雜的多執行緒及同步問題,並且避免了多執行緒/程序的切換開銷。
- 擴充套件性好,可以方便的通過增加 Reactor 例項個數來充分利用 CPU 資源。
- 複用性好,Reactor 模型本身與具體事件處理邏輯無關,具有很高的複用性。
缺點
- 相比傳統的簡單模型,Reactor增加了一定的複雜性,因而有一定的門檻,並且不易於除錯。
- Reactor模式需要底層的Synchronous Event Demultiplexer支援,比如Java中的Selector 支援,作業系統的select系統呼叫支援,如果要自己實現Synchronous Event Demultiplexer 可能不會有那麼高效。
- Reactor模式在IO 讀寫資料時還是在同一個執行緒中實現的,即使使用多個Reactor 機制的情況下,那些共享一個Reactor 的Channel 如果出現一個長時間的資料讀寫,會影響這個Reactor 中其他Channel 的響應時間,比如在大檔案傳輸時,IO 操作就會影響其他Client 的響應時間,因而對這種操作,使用傳統的Thread-Per-Connection 或許是一個更好的選擇,或者此時使用改進版的Reactor 模式如Proactor 模式。
Netty 模型
Netty 主要基於主從 Reactors 多執行緒模型做了一定的改進,其中主從 Reactor 多執行緒模型有多個 Reactor。
工作原理示意圖
工作流程說明
- Netty 抽象出兩組執行緒池 BossGroup 專門負責接收客戶端的連線,WorkerGroup 專門負責網路的讀寫。
- BossGroup 和 WorkerGroup 型別都是 NioEventLoopGroup。
- NioEventLoopGroup 相當於一個事件迴圈組,這個組中含有多個事件迴圈,每一個事件迴圈是 NioEventLoop。
- NioEventLoop 表示一個不斷迴圈的執行處理任務的執行緒,每個 NioEventLoop 都有一個 selector ,用於監聽繫結在其上的 socket 的網路通訊。其內部採用序列化設計,從訊息的讀取 -> 解碼 -> 處理 -> 編碼 -> 傳送,始終由 IO 執行緒 NioEventLoop 負責。
- NioEventLoopGroup 可以有多個執行緒, 即可以含有多個 NioEventLoop。
- 每個 Boss NioEventLoop 迴圈執行的步驟:
- 輪詢 accept 事件。
- 處理 accept 事件,與 client 建立連線,生成 NioScocketChannel ,並將其註冊到某個 Worker NIOEventLoop 上 的 selector。
- 處理任務佇列的任務,即 runAllTasks。
- 每個 Worker NIOEventLoop 迴圈執行的步驟:
- 輪詢 read,write 事件。
- 處理 I/O 事件, 即read ,write 事件,在對應 NioScocketChannel 處理業務。
- 處理任務佇列的任務,即 runAllTasks。
- 每個 Worker NIOEventLoop 處理業務時,會使用pipeline(管道),pipeline 中包含了 channel , 即通過pipeline 可以獲取到對應通道, 管道中維護了很多的handler 處理器用來處理 channel 中的資料。
案例: TCP 服務
- 服務端
public class NettyServer {
public static void main(String[] args) {
// 建立兩個執行緒組 bossGroup 和 workerGroup
// bossGroup 只是處理連線請求, 真正的和客戶端業務處理, 會交給workerGroup 完成
// bossGroup 和 workerGroup 含有的子執行緒(NioEventLoop)的個數預設是實際cpu 核心數 * 2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) // 設定兩個執行緒組
.channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作為伺服器的通道實現
.option(ChannelOption.SO_BACKLOG, 128) // 設定執行緒佇列得到連線個數
.childOption(ChannelOption.SO_KEEPALIVE, true) // 設定保持活動的連線狀態
.childHandler(new ChannelInitializer<SocketChannel>() { // 建立一個通道測試物件(匿名物件)
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 給pipeline 設定處理器
ch.pipeline().addLast(new NettyServerHandler());
}
}); // 給workGroup 的EventLoop 對應的管道設定處理器
System.out.println("伺服器已準備就緒...");
try {
// 啟動伺服器, 繫結埠並設定同步
ChannelFuture channelFuture = bootstrap.bind(8080).sync();
// 對關閉通道監聽
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* 自定義handler處理器
*/
class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 讀取資料實際(這裡可以讀取客戶端傳送的訊息)
* @param ctx 上下文物件, 含有管道pipeline, 通道channel, 地址
* @param msg 客戶端傳送的資料
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("伺服器讀取執行緒: " + Thread.currentThread().getName());
System.out.println("server ctx = " + ctx);
Channel channel = ctx.channel();
// ChannelPipeline pipeline = ctx.pipeline(); // 本質是一個雙向連結串列
// 將msg 轉成一個ByteBuf
// ByteBuf是Netty提供的, 不是NIO的ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到客戶端訊息: " + buf.toString(CharsetUtil.UTF_8));
System.out.println("客戶端地址: " + channel.remoteAddress());
}
/**
* 資料讀取完畢
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// writeAndFlush 是write + flush
// 將資料寫入到快取, 並重新整理
ctx.writeAndFlush(Unpooled.copiedBuffer("bye ~", CharsetUtil.UTF_8));
}
}
- 客戶端
public class NettyClient {
public static void main(String[] args) {
// 客戶端需要一個事件迴圈組
EventLoopGroup group = new NioEventLoopGroup();
// 建立客戶端啟動物件
// 注意客戶端使用的不是ServerBootstrap 而是Bootstrap
Bootstrap bootstrap = new Bootstrap();
// 設定相關引數
bootstrap.group(group) //設定執行緒組
.channel(NioSocketChannel.class) // 設定客戶端通道的實現類(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler()); //加入自己的處理器
}
});
System.out.println("客戶端準備就緒...");
try {
// 啟動客戶端去連線伺服器端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
// 監聽關閉通道
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 當通道就緒就會觸發該方法
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client ctx = " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello ~", CharsetUtil.UTF_8));
}
/**
* 當通道有讀取事件時觸發
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("伺服器訊息: " + buf.toString(CharsetUtil.UTF_8));
System.out.println("伺服器地址: "+ ctx.channel().remoteAddress());
}
/**
* 異常事件
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
任務佇列 Task
使用場景
- 使用者程式自定義的普通任務
- 使用者自定義定時任務
- 非當前 Reactor 執行緒呼叫 Channel 的各種方法
程式碼演示
class NettyServerTaskHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 自定義普通任務, 該任務是提交到taskQueue中
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello ~ task", CharsetUtil.UTF_8));
System.out.println("channel hash =" + ctx.channel().hashCode());
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 注意此處第二個普通任務在任務一基礎上睡眠5s, 10s後輸出
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello ~ task2", CharsetUtil.UTF_8));
System.out.println("channel hash =" + ctx.channel().hashCode());
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 自定義定時任務, 該任務是提交到scheduledTaskQueue中
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello ~ timed task", CharsetUtil.UTF_8));
System.out.println("channel hash =" + ctx.channel().hashCode());
} catch (Exception e) {
e.printStackTrace();
}
}
}, 5, TimeUnit.SECONDS);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("bye ~", CharsetUtil.UTF_8));
}
}
非同步模型
基本介紹
非同步的概念和同步相對,當一個非同步過程呼叫發出後,呼叫者不能立刻得到結果。實際處理這個呼叫的元件在完成後,通過狀態、通知和回撥來通知呼叫者。
Netty 中的 I/O 操作是非同步的,包括 Bind、Write、Connect 等操作會簡單的返回一個 ChannelFuture,呼叫者並不能立刻獲得結果,而是通過 Future - Listener 機制,使用者可以方便的主動獲取或者通過通知機制獲得 IO 操作結果。Netty 的非同步模型是建立在 future 和 callback 的基礎上。
Future 表示非同步的執行結果, 可以通過它提供的方法來檢測執行是否完成,比如檢索計算等。
工作原理示意圖
說明:
- 在使用 Netty 程式設計時,攔截操作和轉換出入站資料只需要提供 callback 或利用 future 即可。這使得鏈式操作簡單、高效,並有利於編寫可重用的、通用的程式碼。
- Netty 框架的目標就是讓你的業務邏輯從網路基礎應用編碼中分離出來、解脫出來。
Future-Listener 機制
- 當 Future 物件剛剛建立時,處於非完成狀態,呼叫者可以通過返回的 ChannelFuture 來獲取操作執行的狀態,註冊監聽函式來執行完成後的操作。
- 常見有如下操作:
- 通過 isDone 方法來判斷當前操作是否完成
- 通過 isSuccess 方法來判斷已完成的當前操作是否成功
- 通過 getCause 方法來獲取已完成的當前操作失敗的原因
- 通過 isCancelled 方法來判斷已完成的當前操作是否被取消
- 通過 addListener 方法來註冊監聽器,當操作已完成(isDone 方法返回完成),將會通知指定的監聽器;如果 Future 物件已完成,則通知指定的監聽器
程式碼示例
繫結埠是非同步操作,當繫結操作處理完,將會呼叫相應的監聽器處理邏輯
// 啟動伺服器, 繫結埠並設定同步
ChannelFuture channelFuture = bootstrap.bind(8080).sync();
// 給ChannelFuture註冊監聽器, 監控關心的事件
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("監聽埠 8080 成功");
} else {
System.out.println("監聽埠 8080 失敗");
}
}
});
案例:HTTP 服務
public class HttpServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new CustomHttpServerInitializer());
try {
ChannelFuture channelFuture = bootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
class CustomHttpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 得到管道
ChannelPipeline pipeline = ch.pipeline();
// 加入一個netty提供的http編解碼器
pipeline.addLast("httpServerCodec", new HttpServerCodec());
// 增加一個自定義handler
pipeline.addLast(new CustomHttpServer());
}
}
class CustomHttpServer extends SimpleChannelInboundHandler<HttpObject> {
/**
* 讀取客戶端資料
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpRequest) {
System.out.println("pipeline hash = " + ctx.pipeline().hashCode() + "handler hash = " + this.hashCode());
System.out.println("客戶端地址: " + ctx.channel().remoteAddress());
// 請求資訊
HttpRequest httpRequest = (HttpRequest) msg;
// 獲取URI, 過濾指定資源
URI uri = new URI(httpRequest.uri());
if ("/favicon.ico".equals(uri.getPath())) {
return;
}
// 回覆資訊給瀏覽器
ByteBuf content = Unpooled.copiedBuffer("hello ~", CharsetUtil.UTF_8);
// 構造http響應, 即httpResponse
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
ctx.writeAndFlush(response);
}
}
}