05 netty的基本元件概述和EventLoop(執行緒池)的介紹
1 Netty的背景
Netty:非同步的事件驅動的網路應用框架,該框架用於快速開發可維護的高效能協議伺服器和客戶端。
- 非同步:這裡的非同步不是非同步IO,而是指總體流程上的非同步,處理流程由多個執行緒協同完成
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.
Netty的特點:
-
Netty實際上可以看成Java在網路程式設計方面的標準框架,很多基於Java的專案如果涉及到網路通訊,那麼該專案底層很有可能使用netty框架進行網路通訊功能的實現。
-
Netty也可以看成是多Java的NIO的進一步封裝,我們可以採用Java的NIO實現網路通訊,但採用Netty的話,我們的開發更為迅速。
Netty在NIO上的改進 |
---|
不需要自己構建網路協議 |
提供解決 TCP 傳輸問題(粘包、半包)的工具 |
解決epoll 空輪詢導致 CPU 100% |
對 NIO的API 進行增強,更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer |
概述:事件迴圈機制(訊息排程)是一種等待或分派事件(訊息)的程式設計模型。該機制在工作時向外部的“事件提供者”發出請求,然後呼叫相關的事件處理程式。
In computer science, the event loop is a programming construct or design pattern that waits for and dispatches events or messages in a program. The event loop works by making a request to some internal or external "event provider" (that generally blocks the request until an event has arrived), then calls the relevant event handler ("dispatches the event"). The event loop is also sometimes referred to as the message dispatcher, message loop, message pump, or run loop.) dispatcher [dɪsˈpætʃɚ] 發報機; 收發; 排程;
2 基本的客戶/服務端程式實現
服務端程式
package netty_basic;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
// Netty的API實現服務端程式
public class HelloServer {
public static void main(String[] args) {
/*
ServerBootstrap:啟動器,負責組裝Netty元件,啟動伺服器
group-NioEventLoopGroup:組中包括BossEventLoop,WorkEventLoop,EventLoop用於處理事件,
在Java的NIO的API中,EventLoop包括selector和thread物件
channel-NioServerSocketChannel.class:伺服器Socket Channel的實現方式選擇,這裡選擇的NIO方式
childHandler:bossEventLoop負責處理連線事件,workerEventLoop(child)負責處理讀寫事件
childHandler通過新增處理程式實現worker的工作流程,
*/
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
/*
為服務端讀寫通道新增具體的處理程式(accept事件發生,與客戶端建立連線後才執行):
-StringDecoder:用於解碼,將bytebuffer中內容轉換為字串
-ChannelInboundHandlerAdapter:自定義的handler,這裡實現的邏輯:當觸發讀事件時,列印內容
*/
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
System.out.println(msg);
}
});
}
})
.bind(8080);
}
}
客戶端程式
package netty_basic;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
import java.net.Socket;
//Netty的API實現客戶端程式
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 新增客戶端通道的處理程式,對String進行編碼,這裡邏輯與服務端程式要進行對照
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost",8080))
.sync() // 阻塞方法,只有與服務端建立連線後才會往下執行
.channel()
.writeAndFlush("hello world"); // 寫入資料,會呼叫通道的處理程式(handler)
}
}
執行結果(先執行服務端,在執行客戶端)
hello world // 服務端列印hello world
元件關係梳理
上面程式中涉及了channel、EventLoop、Handler等基本的元件,其含義理解如下:
元件 | 直觀含義 | 備註 |
---|---|---|
channel | 資料讀寫通道 | |
handler | 資料的處理程式(工序),多個handler形成pipeline(流水線) | handler分為Inbound和Outbound兩個類別,其中Inbound表示入站,Outbound表示出戰,對於網路資料流,其資料的讀入和寫出所採用的處理程式是分開的 |
eventLoop | 處理資料的“工人”,與channel進行繫結 |
直覺上的理解:主機與其他主機進行網路通訊的時候會建立多個數據通道(channel),每個資料通道都需要某個
“工人”進行管理(建立/讀/寫)。eventLoop就是承擔“通道管理者”這個角色。每個eventLoop可以管理多個“資料通道”,並且每個channel與evenLoop進行終生繫結。
3 基本元件-EventLoop
基本概念
EventLoop:本質是單執行緒執行器(維護1個 Selector),用於處理 Channel 上源源不斷的 io 事件。
package io.netty.channel;
import io.netty.util.concurrent.OrderedEventExecutor;
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
EventLoopGroup parent();
}
Netty實現了EventLoop介面,該介面繼承了OrderedEventExecutor, EventLoopGroup這兩個介面。從上圖可以看到有兩條線:
- 一條線是繼承自 juc工具包中的ScheduledExecutorService, 因此包含了執行緒池中所有的方法
- 另一條線是繼承自 netty的 OrderedEventExecutor,用於管理例項物件
- 提供了 boolean inEventLoop(Thread thread) 方法判斷一個執行緒是否屬於此 EventLoop
- 提供了 parent 方法來看看自己屬於哪個 EventLoopGroup
EventLoopGroup :一組 EventLoop,Channel 一般會呼叫 EventLoopGroup 的 register 方法來繫結其中一個 EventLoop
-
Channel 的 io 事件由組中繫結的 EventLoop 進行處理(保證執行緒安全)
-
繼承自 netty 自己的 EventExecutorGroup
- 實現了 Iterable 介面提供遍歷 EventLoop 的能力
- 另有 next 方法獲取集合中下一個 EventLoop
EventLoopGroup介面的實現和關係圖如下所示:
package io.netty.channel;
import io.netty.util.concurrent.EventExecutorGroup;
public interface EventLoopGroup extends EventExecutorGroup {
EventLoop next();
ChannelFuture register(Channel var1);
ChannelFuture register(ChannelPromise var1);
/** @deprecated */
@Deprecated
ChannelFuture register(Channel var1, ChannelPromise var2);
}
程式碼實踐
目標:測試EventLoopGropu中的EventLoop是迴圈訪問的,並且每個eventLoop是個單執行緒執行緒池,能夠提交普通任務和定時任務。
EventLoopGroup的預設執行緒數目設定
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
- 沒有在配置檔案指定的話,通常設為CPU核心數*2
package netty_basic;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j
public class TestEventLoop {
public static void main(String[] args) {
// 定義具有兩個EventLoop的物件組
EventLoopGroup group = new NioEventLoopGroup(2); // 本質上是執行緒池,可以提交定時/普通任務,IO事件
for(int i = 0;i < 4;++i) System.out.println(group.next()); // 訪問eventLoop物件
// 提交普通任務給group中的1個eventLoop執行緒池
group.next().execute(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("common task for eventLoop execute");
});
log.debug("main thread");
// 提交定時任務給group中的1個eventLoop的執行緒池,定時任務可以用於連線的保活
group.next().scheduleAtFixedRate(()->{
log.debug("timed task");
},0,1, TimeUnit.SECONDS);
}
}
執行結果
io.netty.channel.nio.NioEventLoop@6b71769e
io.netty.channel.nio.NioEventLoop@2752f6e2
io.netty.channel.nio.NioEventLoop@6b71769e
io.netty.channel.nio.NioEventLoop@2752f6e2
14:20:34.158 [main] DEBUG netty_basic.TestEventLoop - main thread
14:20:34.161 [nioEventLoopGroup-2-2] DEBUG netty_basic.TestEventLoop - timed task
14:20:35.164 [nioEventLoopGroup-2-2] DEBUG netty_basic.TestEventLoop - timed task
14:20:35.164 [nioEventLoopGroup-2-1] DEBUG netty_basic.TestEventLoop - common task for eventLoop execute
14:20:36.169 [nioEventLoopGroup-2-2] DEBUG netty_basic.TestEventLoop - timed task
14:20:37.174 [nioEventLoopGroup-2-2] DEBUG netty_basic.TestEventLoop - timed task
14:20:38.164 [nioEventLoopGroup-2-2] DEBUG netty_basic.TestEventLoop - timed task
14:20:39.168 [nioEventLoopGroup-2-2] DEBUG netty_basic.TestEventLoop - timed task
14:20:40.173 [nioEventLoopGroup-2-2] DEBUG netty_basic.TestEventLoop - timed task
14:20:41.163 [nioEventLoopGroup-2-2] DEBUG netty_basic.TestEventLoop - timed task
...
- 輸出中對EventLoop的遍歷本質上是迴圈遍歷即輪詢。
- 執行緒池中的定時任務能夠實現1s中執行一次
使用例項2
目標:測試每個客戶端是與服務端的1個eventLoop繫結的
注意點:上圖中IDEA除錯時斷點暫停執行緒預設暫停所有執行緒,多執行緒程式進行測試時由於提交任務的執行緒和執行任務的執行緒並非同一執行緒則將其設為暫停當前執行緒
注意點: 設定同一段客戶端程式執行多次(allow parallel run)模擬多個客戶端訪問伺服器
服務端程式
package netty_basic.eventLoop;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
// 注意:客戶端與伺服器的字符集要保持一致,不然會出現亂碼
@Slf4j
public class EventLoopServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg){
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(StandardCharsets.UTF_8)); //將資訊轉化為字串
}
});
}
})
.bind(8080);
}
}
客戶端程式
package netty_basic.eventLoop;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
//Netty的API實現客戶端程式
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 新增客戶端通道的處理程式,對String進行編碼,這裡邏輯與服務端程式要進行對照
nioSocketChannel.pipeline().addLast(new StringEncoder(StandardCharsets.UTF_8));
}
})
.connect(new InetSocketAddress("localhost",8080))
.sync() // 阻塞方法,只有與服務端建立連線後才會往下執行
.channel();
System.out.println(channel);
System.out.println("");
}
}
將客戶端程式碼執行兩個模擬兩個客戶端並向服務端傳送資訊,此時服務端輸出如下:
15:01:02.494 [nioEventLoopGroup-2-3] DEBUG netty_basic.eventLoop.EventLoopServer - hello
15:05:07.445 [nioEventLoopGroup-2-3] DEBUG netty_basic.eventLoop.EventLoopServer - hello1
15:05:22.116 [nioEventLoopGroup-2-3] DEBUG netty_basic.eventLoop.EventLoopServer - hello1
15:05:26.735 [nioEventLoopGroup-2-3] DEBUG netty_basic.eventLoop.EventLoopServer - hello2
15:05:53.103 [nioEventLoopGroup-2-4] DEBUG netty_basic.eventLoop.EventLoopServer - client2:hello1
15:05:56.714 [nioEventLoopGroup-2-4] DEBUG netty_basic.eventLoop.EventLoopServer - client2:hello2
15:05:59.820 [nioEventLoopGroup-2-4] DEBUG netty_basic.eventLoop.EventLoopServer - client2:hello2
15:06:00.508 [nioEventLoopGroup-2-4] DEBUG netty_basic.eventLoop.EventLoopServer - client2:hello2
15:06:01.948 [nioEventLoopGroup-2-4] DEBUG netty_basic.eventLoop.EventLoopServer - client2:hello2
15:06:02.132 [nioEventLoopGroup-2-4] DEBUG netty_basic.eventLoop.EventLoopServer - client2:hello2
從日誌資訊可以看出,客戶端1與nioEventLoopGroup-2-3繫結,客戶端2與nioEventLoopGroup-2-4繫結
- Channel 呼叫 EventLoopGroup 的 register 方法來繫結其中一個 EventLoop
總結
對於EventLoopGroup:
- 每個channel與組中的1個EventLoop進行繫結
- 多個channel建立時,輪流與組中eventLoop進行繫結,實現1個執行緒(EventLoop)管理多個Channel(如上圖中chanenl1和3繫結EventLoop1,而channel2繫結EventLoop2,如果再有新的channel則可以與EventLoop1繫結)
職責劃分思想(服務端)
劃分1:服務端程式可以採用2個EventLoopGroup進行職責劃分
動機:將accept事件與IO事件分開處理。
/**
* Set the EventLoopGroupfor the parent (acceptor) and the child (client). These
*/
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
ObjectUtil.checkNotNull(childGroup, "childGroup");
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
程式碼修改
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2)) //
- 定義兩個EventLoopGroup進行分工即boss和worker其中boss只負責ServerSocketChannel(服務端)上的accept事件,而worker負責socketChannel(客戶端)上的讀寫事件
劃分2:除了boss和worker所對應的EventLoopGroup,定義其他EventLoopGroup去進行執行時間較長的操作
動機:服務端處理某個channel的讀寫事件處理時,有時業務操作時間比較長,會造成該單執行緒的EventLoop所管理的其他channel變的讀寫事件得不到及時處理。這個時候需要將耗時比較長的操作單獨分出來交給其他執行緒處理。具體的做法就是採用多個EventGroup對handler的內容進行拆分。
程式碼實現例項:
package netty_basic.eventLoop;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
// 注意:客戶端與伺服器的字符集要保持一致,不然會出現亂碼
@Slf4j
public class EventLoopServer {
public static void main(String[] args) {
/*
定義兩個EventLoopGroup進行分工即boss和worker
其中boss只負責ServerSocketChannel(服務端)上的accept事件
而worker負責socketChannel上的讀寫事件
*/
// 建立獨立的EventLoop用於執行處理程式中執行時間較長的操作
EventLoopGroup anotherGroup = new DefaultEventLoop(); // DefaultEventLoop只能處理普通任務和定時任務
new ServerBootstrap()
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("handler1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(StandardCharsets.UTF_8));
ctx.fireChannelRead(msg); // 將訊息傳遞給下一個handler!!!!
}
});
// 採用額外的anotherGroup進行處理
ch.pipeline().addLast(anotherGroup,"handler2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(StandardCharsets.UTF_8));
}
});
}
})
.bind(8080);
}
}
模擬兩個客戶端訪問服務端的輸出:
16:11:40.917 [nioEventLoopGroup-4-2] DEBUG netty_basic.eventLoop.EventLoopServer - client1:hello
16:11:40.918 [defaultEventLoop-1-1] DEBUG netty_basic.eventLoop.EventLoopServer - client1:hello
16:12:01.469 [nioEventLoopGroup-4-1] DEBUG netty_basic.eventLoop.EventLoopServer - client2:hello
16:12:01.476 [defaultEventLoop-1-1] DEBUG netty_basic.eventLoop.EventLoopServer - client2:hello
從輸出可以看出,每個客戶端傳送的訊息經過兩個執行緒(EventLoop)處理。客戶端1中是nioEventLoopGroup-4-2和defaultEventLoop-1-1進行處理。而客戶端2中是nioEventLoopGroup-4-1和defaultEventLoop-1-1進行處理
EventLoop的切換程式碼:
ctx.fireChannelRead(msg); // 將訊息傳遞給下一個handler!!!!
原始的pipeline中多個handler都是由同一EventLoop進行處理,而上述程式碼實現資訊傳遞給其他EventLoop進行處理。
原始碼分析:EventLoop切換原理
本質:多個單執行緒執行緒池協同完成任務,實現任務拆分後分工完成。
呼叫鏈:
ctx.fireChannelRead(msg); // 將訊息傳遞給下一個handler!!!!
------------------------------------------------------------------------
介面ChannelHandlerContext定義的方法:
@Override
ChannelHandlerContext fireChannelRead(Object msg);
-------------------------------------------------------------------------
抽象類AbstractChannelHandlerContext(實現ChannelHandlerContext介面)的方法:
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
/*
next.executor():獲取下一個handler的EventLoop
*/
EventExecutor executor = next.executor();
// 判斷下一個handler的EventLoop是否和當前執行緒是同一執行緒
if (executor.inEventLoop()) {
next.invokeChannelRead(m); // 與當前執行緒是同一執行緒,則直接呼叫
} else {
// 不是同一個執行緒,則將呼叫的程式碼封裝為任務物件(實現Runable的物件)讓下一個handler的EventLoop執行
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
上述程式碼中invokeChannelRead中展示了EventLoop切換的核心邏輯,核心邏輯是判斷當前執行緒是否是下一個handler的EventLoop的執行緒,是的話就執行,不是則讓下一個handler的EventLoop進行execute(提交任務給該執行緒池)
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}