三、Netty高效能架構設計
@
目錄三、Netty高效能架構設計
1、Netty概述
本文原始碼:
1.1、原生NIO存在的問題
- NIO 的類庫和 API 繁雜,使用麻煩:需要熟練掌握
Selector
、ServerSocketChannel
、SocketChannel
、ByteBuffer
等。 - 需要具備其他的額外技能:要熟悉
Java
多執行緒程式設計,因為NIO
程式設計涉及到Reactor
模式,你必須對多執行緒和網路程式設計非常熟悉,才能編寫出高質量的 NIO 程式。 - 開發工作量和難度都非常大:例如客戶端面臨斷連重連、網路閃斷
- JDK NIO 的 Bug:例如臭名昭著的
Epoll Bug
,它會導致Selector
空輪詢,最終導致CPU
100%。直到 JDK 1.7 版本該問題仍舊存在,沒有被根本解決。
1.2、Netty優點
Netty
對JDK
自帶的NIO
的API
進行了封裝,解決了上述問題。
- 設計優雅:適用於各種傳輸型別的統一
API
阻塞和非阻塞Socket
;基於靈活且可擴充套件的事件模型,可以清晰地分離關注點;高度可定製的執行緒模型 - 單執行緒,一個或多個執行緒池. - 安全:完整的
SSL/TLS
和StartTLS
支援 - 高效能、吞吐量更高:延遲更低;減少資源消耗;最小化不必要的記憶體複製。
2、I/O執行緒模型
- 目前存在的執行緒模型主要有:
- 傳統阻塞I/O服務模型
- Reactor模式
- 根據
Reactor
的數量和處理資源池執行緒的數量不同,有如下3
種典型的實現- 單
Reactor
單執行緒 - 單
Reactor
多執行緒 - 主從
Reactor
多執行緒
- 單
Netty
執行緒模型主要基於主從Reactor多執行緒模型做了一定的改進,其中主從Reactor
多執行緒模型有多個Reactor
。
2.1、傳統阻塞I/O服務模型
圖解說明:黃色的框表示物件,藍色的框表示執行緒、白色的框表示方法(API)。之後的圖相同。
2.1.1、模型分析
模型特點:
- 採用阻塞
IO
模式獲取輸入的資料 - 每個連結都需要獨立的執行緒完成資料的輸入,業務處理、資料返回。
問題分析:
- 當併發數很大,就會建立大量的執行緒,佔用很大系統資源
- 連線建立後,如果當前執行緒暫時沒有資料可讀,該執行緒會阻塞在
read
操作,造成執行緒資源浪費。
2.1.2、模型實現程式碼示例
由於模型的邏輯主要集中在服務端,所以所有模型程式碼示例基本上都是服務端的示例
public static void main(String[] args) throws IOException {
//1、建立一個執行緒池
//2、如果有客戶端連線,就建立一個執行緒,與之通訊(單獨寫一個方法)
ExecutorService executorService = Executors.newCachedThreadPool();
//建立ServerSocket
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("伺服器啟動了");
while (true) {
//監聽,等待客戶端連線
final Socket socket = serverSocket.accept();
System.out.println("連線到一個客戶端");
//建立一個執行緒,與之通訊
executorService.execute(() -> {
//重寫Runnable方法,與客戶端進行通訊
handler(socket);
});
}
}
//編寫一個Handler方法,和客戶端通訊。主要進行資料的讀取和業務處理。
public static void handler(Socket socket) {
try {
byte[] bytes = new byte[1024];
//通過socket獲取輸入流
InputStream inputStream = socket.getInputStream();
//迴圈的讀取客戶端傳送的資料
while (true){
int read = inputStream.read(bytes);
if (read != -1){
System.out.println(new String(bytes, 0, read));//輸出客戶端傳送的資料
} else {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
System.out.println("關閉和client的連線");
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
2.2、Reactor模型概述
針對傳統阻塞I/O服務模型的2個缺點,解決方案如下:
- 基於
I/O
複用模型:多個連線共用一個阻塞物件,應用程式只需要在一個阻塞物件等待,無需阻塞等待所有連線。當某個連線有新的資料可以處理時,作業系統通知應用程式,執行緒從阻塞狀態返回,開始進行業務處理。Reactor
對應的叫法: 1. 反應器模式 2. 分發者模式(Dispatcher
) 3. 通知者模式(notifier
) - 基於執行緒池複用執行緒資源:不必再為每個連線建立執行緒,將連線完成後的業務處理任務分配給執行緒進行處理,一個執行緒可以處理多個連線的業務。
I/O複用結合線程池,就是Reactor模式基本設計思想,如圖所示:
Reactor
模式,通過一個或多個輸入同時傳遞給服務處理器的模式(基於事件驅動)- 伺服器端程式處理傳入的多個請求,並將它們同步分派到響應的處理執行緒,因此
Reactor
模式也叫Dispatcher
模式。 Reactor
模式使用IO
複用監聽事件,收到事件後,分發的某個執行緒(程序),這點就是網路服務高併發處理的關鍵。
Reactor模式中的核心組成部分:
在一個單獨的執行緒中執行,負責監聽和分發事件,分發給適當的處理程式來對
事件作出反應。
-
我的理解是將
Reactor
理解成一個Selector
,它可以對建立新的連線,也可以將產生的讀寫事件交換給Handler
進行處理 -
Handlers
:處理程式執行I/O
事件要完成的實際事件,類似於客戶想要與之交談的公司中的實際官員。Reactor
通過排程適當的處理程式來響應I/O
事件,處理程式執行非阻塞操作。
2.3、單Reactor單執行緒模式
類似於nio中的Selector(選擇器) 根據事件驅動 處理不同的連線請求
方案說明:
Select
是前面I/O
複用模型介紹的標準網路程式設計 API,可以實現應用程式通過一個阻塞物件監聽多路連線請求Reactor
物件通過Select
監控客戶端請求事件,收到事件後通過Dispatch
進行分發- 如果是建立連線請求事件,則由
Acceptor
通過Accept
處理連線請求,然後建立一個Handler
物件處理連線完成後的後續業務處理 - 如果不是建立連線事件,則
Reactor
會分發呼叫連線對應的 Handler 來響應 Handler
會完成Read
→業務處理→Send
的完整業務流程
結合例項:伺服器端用一個執行緒通過多路複用搞定所有的 IO
操作(包括連線,讀、寫等),編碼簡單,清晰明瞭,但是如果客戶端連線數量較多,將無法支撐,前面的 NIO
案例就屬於這種模型。
2.2.1、模型分析
- 優點:模型簡單,沒有多執行緒、程序通訊、競爭的問題,全部都在一個執行緒中完成
- 缺點:效能問題,只有一個執行緒,無法完全發揮多核
CPU
的效能。Handler
在處理某個連線上的業務時,整個程序無法處理其他連線事件,很容易導致效能瓶頸 - 缺點:可靠性問題,執行緒意外終止,或者進入死迴圈,會導致整個系統通訊模組不可用,不能接收和處理外部訊息,造成節點故障
- 使用場景:客戶端的數量有限,業務處理非常快速,比如
Redis
在業務處理的時間複雜度O(1)
的情況
2.2.2、模型實現程式碼示例
這裡面我為了簡便,我將Reactor和Acceptor和Handler三個物件搞成了方法。
public class SReactorSThread {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private int PORT = 6666;
public SReactorSThread() {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
//對客戶端進行監聽
public void listen() {
try {
while (true) {
int count = selector.select();
//表示有客戶端產生事件
if (count > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();//取出產生事件的Channel
Iterator<SelectionKey> iterator = selectionKeys.iterator();//準備對其進行遍歷
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//將key交給dispatch去處理
dispatch(key);
iterator.remove();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
//dispatch
private void dispatch(SelectionKey key) {
if (key.isAcceptable()){
accept(key);
}else {
handler(key);
}
}
//建立新的連線
private void accept(SelectionKey key) {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
//對請求進行處理,接收訊息---業務處理---返回訊息
private void handler(SelectionKey key) {
SocketChannel channel = null;
try {
channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(3);
StringBuilder msg = new StringBuilder();
while (channel.read(buffer) > 0) {
msg.append(new String(buffer.array()));
buffer.clear();
}
System.out.println("接收到訊息:" + msg.toString());
//傳送訊息
String ok = "OK";
buffer.put(ok.getBytes());
//這個flip非常重要哦,是將position置0,limit置於position的位置,以便下面程式碼進行寫入操作能夠正確寫入buffer中的所有資料
buffer.flip();
channel.write(buffer);
buffer.clear();
} catch (IOException e) {
try {
System.out.println(channel.getRemoteAddress() + "離線了");
//取消該通道的註冊並關閉通道,這裡非常重要,沒有這一步的話當客戶端斷開連線就會不斷丟擲IOException
//是因為,select會一直產生該事件。
key.cancel();
channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
/********呼叫**************/
public static void main(String[] args) {
SReactorSThread sReactorSThread = new SReactorSThread();
sReactorSThread.listen();
}
這裡有更牛逼更完整的
Reactor
單執行緒模型的程式碼案例https://www.cnblogs.com/hama1993/p/10611229.html
https://www.cnblogs.com/eason-ou/p/11910484.html
2.4、單Reactor多執行緒模型
方案說明:
Reactor
物件通過select
監控客戶端請求事件, 收到事件後,通過dispatch
進行分發- 如果建立連線請求, 則右
Acceptor
通過accept 處理連線請求, 然後建立一個Handler
物件處理完成連線後的各種事件 - 如果不是連線請求,則由
reactor
分發呼叫連線對應的handler
來處理 - handler 只負責響應事件,不做具體的業務處理, 通過
read
讀取資料後,會分發給後面的worker
執行緒池的某個執行緒處理業務 worker
執行緒池會分配獨立執行緒完成真正的業務,並將結果返回給handler
handler
收到響應後,通過send
將結果返回給client
2.4.1、模型分析
- 優點:可以充分的利用多核
cpu
的處理能力 - 缺點:多執行緒資料共享和訪問比較複雜,
reactor
處理所有的事件的監聽和響應,在單執行緒執行, 在高併發場景容易出現效能瓶頸.
2.4.2、模型實現程式碼示例
參考 https://www.cnblogs.com/eason-ou/p/11910523.html
也就是每次讀寫操作利用執行緒池開個執行緒來進行讀寫操作 主執行緒繼續監聽事件
不知道為什麼要
- 在寫完成之後切換為讀事件
- 在讀事件之後切換為寫事件
//讀完後,將通道註冊為寫
Selector selector = selectionKey.selector();
SelectionKey sk = sc.register(selector, SelectionKey.OP_WRITE);上述問題 的原因在於sun.nio.ch.SelectionKeyImpl#nioInterestOps(int ops)方法
- 可以看到直接覆蓋了之前設定的interestOps,也就是說我們在客戶端完成連線,設定SelectionKey.OP_READ事件,其READ實已經把之前的事件全部覆蓋了。
問題又來了
- 為什麼不在連線事件觸發 並且連線成功的時候註冊
SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
直接註冊兩個事件呢? -> SelectionKey.OP_READ | SelectionKey.OP_WRITE最終解決問題:
- 參考連線: selector 為什麼無限觸發就緒事件
2.5、主從Reactor多執行緒
方案說明:
Reactor
主執行緒MainReactor
物件就只註冊一個用於監聽連線請求的ServerSocketChannel
,通過select
監聽連線事件, 收到事件後,通過Acceptor
處理連線事件- 當
Acceptor
處理連線事件後,MainReactor
通過accept
獲取新的連線,並將連線註冊到SubReactor
subreactor
將連線加入到連線佇列進行監聽,並建立handler
進行各種事件處理- 當有新事件發生時,
subreactor
就會呼叫對應的handler
處理 handler
通過read
讀取資料,分發給後面的worker
執行緒處理worker
執行緒池分配獨立的worker
執行緒進行業務處理,並返回結果handler
收到響應的結果後,再通過send
將結果返回給client
Reactor
主執行緒可以對應多個Reactor
子執行緒, 即MainRecator
可以關聯多個SubReactor
2.5.1、模型分析
- 優點:父執行緒與子執行緒的資料互動簡單職責明確,父執行緒只需要接收新連線,子執行緒完成後續的業務處理。
- 優點:父執行緒與子執行緒的資料互動簡單,
Reactor
主執行緒只需要把新連線傳給子執行緒,子執行緒無需返回資料 - 缺點:程式設計複雜度較高
- 結合例項:這種模型在許多專案中廣泛使用,包括
Nginx
主從Reactor
多程序模型,Memcached
主從多執行緒,Netty
主從多執行緒模型的支援
2.5.2、模型實現程式碼示例
還是去看別人大佬寫的吧。https://www.cnblogs.com/eason-ou/p/11912010.html
3種模式用生活案例來理解
- 1)單Reactor單執行緒,前臺接待員和服務員是同一個人,全程為顧客服
- 2)單Reactor多執行緒,1個前臺接待員,多個服務員,接待員只負責接待
- 3)主從Reactor多執行緒,多個前臺接待員,多個服務生
3、Netty模型
3.1、主從Reactor進階
Netty
主要是基於主從Reactor
多執行緒模式做了一定的改進,其中主從Reactor
都有單一的一個變成了多個。下面是簡單的改進圖。
- 如圖所示,增加了
BossGroup
來維護多個主Reactor
,主Reactor
還是隻關注連線的Accept
;增加了WorkGroup
來維護多個從Reactor
,從Reactor
將接收到的請求交給Handler
進行處理。 - 在主
Reactor
中接收到Accept
事件,獲取到對應的SocketChannel
,Netty
會將它進一步封裝成NIOSocketChannel
物件,這個封裝後的物件還包含了該Channel
對應的SelectionKey
、通訊地址等詳細資訊 Netty
會將裝個封裝後的Channel
物件註冊到WorkerGroup
中的從Reactor
中。- 當
WorkerGroup
中的從Reactor
監聽到事件後,就會將之交給與此Reactor
對應的Handler
進行處理。
3.2、再進階版
Netty
將Selector
以及Selector
相關的事件及任務封裝了NioEventLoop
,這樣BossGroup
就可以通過管理NioEventLoop
去管理各個Selector
。- 同時,
Netty
模型中主要存在兩個大的執行緒池組BossGroup
和WorkerGroup
,用於管理主Reactor
執行緒和從Reactor
執行緒。
3.3、Netty模型
-
Netty
抽象出兩組執行緒池,BossGroup
專門負責接收客戶端的連線,WorkerGroup
專門負責網路的讀寫 -
BossGroup
和WorkerGroup
型別的本質都是NioEventLoopGroup
型別。 -
NioEventLoopGroup
相當於一個執行緒管理器(類似於ExecutorServevice
),它下面維護很多個NioEventLoop
執行緒。(我認為圖中的NioEventGroup
的地方應該改成NioEventLoop
,可能我的理解有點差錯吧)- 在初始化這兩個
Group
執行緒組時,預設會在每個Group
中生成CPU*2
個NioEventLoop
執行緒 - 當
n
個連線來了,Group
預設會按照連線請求的順序分別將這些連線分給各個NioEventLoop
去處理。 - 同時
Group
還負責管理EventLoop
的生命週期。
- 在初始化這兩個
-
NioEventLoop
表示一個不斷迴圈的執行處理任務的執行緒- 它維護了一個執行緒和任務佇列。
- 每個
NioEventLoop
都包含一個Selector
,用於監聽繫結在它上面的socket
通訊。 - 每個
NioEventLoop
相當於Selector
,負責處理多個Channel
上的事件 - 每增加一個請求連線,
NioEventLoopGroup
就將這個請求依次分發給它下面的NioEventLoop
處理。
-
每個
Boss NioEventLoop
迴圈執行的步驟有3步:- 輪詢
accept
事件 - 處理
accept
事件,與client
建立連線,生成NioSocketChannel
,並將其註冊到某個Worker NioEventLoop
的selector
上。 - 處理任務佇列到任務,即
runAllTasks
- 輪詢
-
每個
Worker NioEventLoop
迴圈執行的步驟:- 輪詢
read
,write
事件 - 處理
I/O
事件,即read
,write
事件,在對應的NioSocketChannel
中進行處理 - 處理任務佇列的任務,即
runAllTasks
- 輪詢
-
每個
Worker NioEventLoop
處理業務時,會使用pipeline
(管道),pipeline
中維護了一個ChannelHandlerContext
連結串列,而ChannelHandlerContext
則儲存了Channel
相關的所有上下文資訊,同時關聯一個ChannelHandler
物件。如圖所示,Channel
和pipeline
一一對應,ChannelHandler和ChannelHandlerContext
一一對應。 -
ChannelHandler
是一個介面,負責處理或攔截I/O
操作,並將其轉發到Pipeline
中的下一個處理Handler
進行處理。I/O Request via Channel or ChannelHandlerContext | +---------------------------------------------------+---------------+ | ChannelPipeline | | | \|/ | | +---------------------+ +-----------+----------+ | | | Inbound Handler N | | Outbound Handler 1 | | | +----------+----------+ +-----------+----------+ | | /|\ | | | | \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler N-1 | | Outbound Handler 2 | | | +----------+----------+ +-----------+----------+ | | /|\ . | | . . | | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| | [ method call] [method call] | | . . | | . \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler 2 | | Outbound Handler M-1 | | | +----------+----------+ +-----------+----------+ | | /|\ | | | | \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler 1 | | Outbound Handler M | | | +----------+----------+ +-----------+----------+ | | /|\ | | +---------------+-----------------------------------+---------------+ | \|/ +---------------+-----------------------------------+---------------+ | | | | | [ Socket.read() ] [ Socket.write() ] | | | | Netty Internal I/O Threads (Transport Implementation) | +-------------------------------------------------------------------+
3.4、程式碼示例
服務端程式碼:
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//建立BossGroup 和 WorkerGroup
//1、建立兩個執行緒組,bossGroup 和 workerGroup
//2、bossGroup 只是處理連線請求,真正的和客戶端業務處理,會交給 workerGroup 完成
//3、兩個都是無限迴圈
//4、bossGroup 和 workerGroup 含有的子執行緒(NioEventLoop)個數為實際 cpu 核數 * 2
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup worderGroup = new NioEventLoopGroup();
try {
//建立伺服器端的啟動物件,配置引數
ServerBootstrap bootstrap = new ServerBootstrap();
//使用鏈式程式設計來進行設定,配置
bootstrap.group(bossGroup, worderGroup) //設定兩個執行緒組
.channel(NioServerSocketChannel.class) //使用 NioServerSocketChannel 作為伺服器的通道實現
.option(ChannelOption.SO_BACKLOG, 128) //設定執行緒佇列得到連線個數
.childOption(ChannelOption.SO_KEEPALIVE, true) //設定保持活動連線狀態
.childHandler(new ChannelInitializer<SocketChannel>() { //為accept channel的pipeline預新增的handler
//給 pipeline 新增處理器,每當有連線accept時,就會執行到此處。
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerHandler());
}
}); //給我們的 workerGroup 的 EventLoop 對應的管道設定處理器
System.out.println("........伺服器 is ready...");
//繫結一個埠並且同步,生成了一個ChannelFuture 物件
//啟動伺服器(並繫結埠)
ChannelFuture future = bootstrap.bind(6668).sync();
//對關閉通道進行監聽
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
worderGroup.shutdownGracefully();
}
}
}
服務端Handler處理:
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
*讀取客戶端傳送過來的訊息
* @param ctx 上下文物件,含有 管道pipeline,通道channel,地址
* @param msg 就是客戶端傳送的資料,預設Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("伺服器讀取執行緒:" + Thread.currentThread().getName());
System.out.println("server ctx = " + ctx);
//看看Channel和Pipeline的關係
Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline(); //本質是個雙向連結串列,出棧入棧
//將msg轉成一個ByteBuf,比NIO的ByteBuffer效能更高
ByteBuf buf = (ByteBuf)msg;
System.out.println("客戶端傳送的訊息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客戶端地址:" + ctx.channel().remoteAddress());
}
//資料讀取完畢
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//它是 write + flush,將資料寫入到快取buffer,並將buffer中的資料flush進通道
//一般講,我們對這個傳送的資料進行編碼
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~", CharsetUtil.UTF_8));
}
//處理異常,一般是關閉通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客戶端:
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//客戶端需要一個事件迴圈組
EventLoopGroup group = new NioEventLoopGroup();
try {
//建立客戶端啟動物件
//注意:客戶端使用的不是 ServerBootStrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//設定相關引數
bootstrap.group(group) //設定執行緒組
.channel(NioSocketChannel.class) //設定客戶端通道的實現類(使用反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler()); //加入自己的處理器
}
});
System.out.println("客戶端 OK...");
//啟動客戶端去連線伺服器端
//關於 channelFuture 涉及到 netty 的非同步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//給關閉通道進行監聽
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
客戶端Handler處理:
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 當通道就緒就會觸發
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client: " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server", CharsetUtil.UTF_8));
}
/**
* 當通道有讀取事件時,會觸發
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
System.out.println("伺服器回覆的訊息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("伺服器的地址:" + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
3.5、任務佇列
任務佇列由NioEventLoop
維護並不斷執行。當我們就收到請求之後,在當前channel
對應的pipeline
中的各個Handler
裡面進行業務處理和請求過濾。當某些業務需要耗費大量時間的時候,我們可以將任務提交到由NioEventLoop
維護的taskQueue
或scheduleTaskQueue
中,讓當前的NioEventLoop
執行緒在空閒時間去執行這些任務。下面將介紹提交任務的3種方式
使用者程式自定義的普通任務:
該方式會將任務提交到
taskQueue
佇列中。提交到該佇列中的任務會按照提交順序依次執行 。
例如: 提交兩個任務到佇列中 第一個任務消耗10秒 第二個任務消耗20秒
- 先執行第一個任務用時10秒 -->0-10秒第一個任務
- 第一個任務完成後才執行第二個 -->10秒-30秒第二個任務
- 總共耗時30秒
channelHandlerContext.channel().eventLoop().execute(new Runnable(){
@Override
public void run() {
//...
}
});
使用者自定義的定時任務:
該方式會將任務提交到
scheduleTaskQueue
定時任務佇列中。該佇列是底層是優先佇列PriorityQueue
實現的,固該佇列中的任務會按照時間的先後順序定時執行。
channelHandlerContext.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
}
}, 60, TimeUnit.SECONDS);
為其他EventLoop執行緒對應的Channel新增任務
可以在ChannelInitializer
中,將剛建立的各個Channel
以及對應的標識加入到統一的集合中去,然後可以根據標識獲取Channel
以及其對應的NioEventLoop
,然後就課程呼叫execute()
或者schedule()
方法。
3.6、非同步模型
基本介紹
- 非同步的概念和同步相對。當一個非同步過程呼叫發出後,呼叫者不能立刻得到結果。實際處理這個呼叫的元件在完成後,通過狀態、通知和回撥來通知呼叫者。
Netty
中的I/O
操作是非同步的,包括Bind
、Write
、Connect
等操作會簡單的返回一個ChannelFuture
。- 呼叫者並不能立刻獲得結果,而是通過
Future-Listener
機制,使用者可以方便的主動獲取或者通過通知機制獲得IO
操作結果 Netty
的非同步模型是建立在future
和callback
的之上的。callback
就是回撥。重點說Future
,它的核心思想是:假設一個方法fun
,計算過程可能非常耗時,等待fun
返回顯然不合適。那麼可以在呼叫fun
的時候,立馬返回一個Future
,後續可以通過Future
去監控方法fun
的處理過程(即 :Future-Listener
機制)
Future說明
- 表示非同步的執行結果, 可以通過它提供的方法來檢測執行是否完成,比如檢索計算等等.
ChannelFuture
是一個介面 :public interface ChannelFuture extends Future
。我們可以新增監聽器,當監聽的事件發生時,就會通知到監聽器
工作原理示意圖
- 在使用
Netty
進行程式設計時,攔截操作和轉換出入站資料只需要您提供callback
或利用future
即可。這使得鏈式操作簡單、高效, 並有利於編寫可重用的、通用的程式碼。 Netty
框架的目標就是讓你的業務邏輯從網路基礎應用編碼中分離出來、解脫出來
Future-Listener機制
-
當
Future
物件剛剛建立時,處於非完成狀態,呼叫者可以通過返回的ChannelFuture
來獲取操作執行的狀態,註冊監聽函式來執行完成後的操作。 -
常用方法如下:
方法名稱 方法作用 isDone() 判斷當前操作是否完成 isSuccess() 判斷已完成的當前操作是否成功 getCause() 獲取已完成當前操作失敗的原因 isCancelled() 判斷已完成的當前操作是否被取消 addListener() 註冊監聽器,當前操作(Future)已完成,將會通知指定的監聽器
舉例說明
繫結埠操作時非同步操作,當繫結操作處理完,將會呼叫相應的監聽器處理邏輯。
serverBootstrap.bind(port).addListener(future -> {
if(future.isSuccess()) {
System.out.println(newDate() + ": 埠["+ port + "]繫結成功!");
} else{
System.err.println("埠["+ port + "]繫結失敗!");
}
});
3.7、快速入門例項-HTTP HelloWorld
瀏覽器訪問Netty
伺服器後,返回HelloWorld
- 每個瀏覽器的pipeline和Handler是獨立的不是共享的
- 同一個瀏覽器關閉後 再開啟 也是獨立的不是共享的
- HTTP協議用完就斷掉 ,TCP協議要保持連線
- 課上演示重新整理時 也會是新的pipeline和Handler ,自己測試發現沒有更換 不知原因
啟動
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = bootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
定義ChannelInitializer
用於給Channel
對應的pipeline
新增handler
。該ChannelInitializer
中的程式碼在SocketChannel
被建立時都會執行
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//向管道加入處理器
//得到管道
ChannelPipeline pipeline = socketChannel.pipeline();
//加入一個 netty 提供的 httpServerCodec:codec => [coder - decoder]
//1、HttpServerCodec 是 netty 提供的處理http的編解碼器
pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());
//2、增加自定義的Handler
pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());
}
}
自定義Handler
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
/**
* 讀取客戶端資料。
*
* @param channelHandlerContext
* @param httpObject 客戶端和伺服器端互相通訊的資料被封裝成 HttpObject
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
//判斷 msg 是不是 HTTPRequest 請求
if (httpObject instanceof HttpRequest) {
System.out.println("msg 型別 = " + httpObject.getClass());
System.out.println("客戶端地址:" + channelHandlerContext.channel().remoteAddress());
//獲取
HttpRequest request = (HttpRequest) httpObject;
//獲取uri,進行路徑過濾
URI uri = new URI(request.uri());
if ("/favicon.ico".equals(uri.getPath())) {
System.out.println("請求了 favicon.ico,不做響應");
}
//回覆資訊給瀏覽器[http協議]
ByteBuf content = Unpooled.copiedBuffer("HelloWorld", CharsetUtil.UTF_8);
//構造一個http的響應,即HTTPResponse
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
//將構建好的 response 返回
channelHandlerContext.writeAndFlush(response);
}
}
}
原文連線:https://blog.csdn.net/qq_35751014/article/details/104443715
本文在原文的基礎上新增一下自己的理解和筆記