Netty(十四)——EventLoop之式
前邊講了ByteBuf、Channel、Unsafe、ChannelPipeline、ChannelHandler等核心的類。這篇來學習學習EventLoop(EventLoopGroup)——Netty的執行緒。Netty的執行緒模型是經過精心的設計,既提高了框架的併發效能,又能在很大程度上避免死鎖,區域性還是實現了無鎖化設計。非常值得學習的。
一,Reactor執行緒模型:Netty執行緒模型本質上也是經典的Reactor執行緒模型,看下前邊轉過的一篇文章《
型別 | 特點 | 不足 |
---|---|---|
Reactor單執行緒模型 | 1,指所有的I/O操作都是在同一個NIO執行緒上完成,包括:a,接收客戶端的TCP連線;b,向服務端發起TCP連線;c,讀取通訊對端的請求或應答訊息;d,向通訊對端傳送訊息或者應答訊息。 2,簡單、容易理解並實現 |
1,一個NIO執行緒同時處理成百上千的鏈路,效能無法支撐; 2,當NIO執行緒負載過重,處理速度變慢,導致客戶端超時,超時後重發,更加重NIO執行緒的負載,最終導致大量訊息積壓和處理超時,成為系統瓶頸; 3,可靠性問題:一旦NIO執行緒意外,或者進入死迴圈,會導致整個系統通訊模組不可用,造成節點故障。 |
Reactor多執行緒模型 | 1,有一個專門NIO執行緒——acceptor執行緒用於監聽服務端,接受客戶端的TCP連線; 2,網路IO操作——讀寫等由一個NIO執行緒池負責,包含一個任務佇列和N個可用執行緒,由這些NIO執行緒負責訊息的讀取、解碼、編碼和傳送等。 3,一個NIO執行緒可以同時處理N條鏈路,但是一個鏈路只對應一個NIO執行緒,防止發生併發操作問題。 |
1,一個NIO執行緒負責監聽和處理所有的客戶端連線可能會存在效能問題。例如百萬客戶端連線,或者服務端對客戶端握手進行安全認證(耗費效能),可能會出現效能不足。 |
主從Reactor多執行緒模型 | 1,服務端用於接受客戶端連線的不再是一個單獨的NIO執行緒,而是一個獨立的NIO執行緒池; 2,sub reactor執行緒池用來負責SocketChannel的讀寫、編解碼等工作。 |
1,相對於上邊兩種更加複雜吧。 |
二,Netty執行緒模型:Netty可以通過配置不同的啟動引數,支援上邊的幾種執行緒模型的,看下原理圖和程式碼吧:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap b =new ServerBootstrap();
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//解碼
socketChannel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingDecoder());
//編碼
socketChannel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingEncoder());
socketChannel.pipeline().addLast(new SubReqServerHandler());
}
});
ChannelFuture f =b.bind(port).sync();
f.channel().closeFuture().sync();
類別 | 職責 |
---|---|
用於接收Client請求的執行緒池 | 1,接收Client的TCP連線,初始化Channel引數; 2,將鏈路狀態變更時間通知給ChannelPipeline; |
用於處理I/O操作的執行緒池 | 1,非同步讀取通訊對端的資料報,傳送讀時間到ChannelPipeline; 2,非同步傳送訊息到通訊對端,呼叫ChannelPipeline的訊息傳送介面; 3,執行系統呼叫Task; 4,執行定時任務Task,例如鏈路空閒狀態監測定時任務。 |
序列操作(無鎖化設計):上篇Netty(十二)——ChannelPipeline之觀、Netty(十三)——ChannelHandler之意 我們看到事件的處理是在ChannelPipeline中傳輸像職責鏈一樣,經過ChannelHandler時進行處理,這種啟動多個序列化的執行緒並行執行(避免鎖的競爭),比一個佇列一個工作執行緒效能更優。
三,Netty實踐建議:
1,建立兩個NioEventLoopGroup,用於邏輯隔離NIO Acceptor和NIO IO操作執行緒;
2,儘量不要在ChannelHandler中啟動使用者執行緒(解碼後用於將POJO訊息派發到後端業務執行緒的除外);
3,解碼要放在NIO執行緒呼叫的解碼Handler中進行,不要切換到使用者執行緒中完成訊息的解碼;
4,如果業務邏輯操作簡單,沒有複雜的業務邏輯計算,沒有可能導致執行緒被阻塞的磁碟操作、資料庫操作、網路操作等,可以直接在NIO執行緒上完成業務邏輯編排,不需要切換到使用者執行緒。
5,如果業務邏輯處理複雜,不要在NIO執行緒上完成,建議將解碼後的POJO訊息封裝成Task,派發到業務執行緒中執行,以保證NIO執行緒儘快被釋放,處理其他的IO操作。
四,NioEventLoop的原始碼分析:
1,先看下NioEventLoop的類關係圖:
2,看一下NioEventLoop的原始碼:
2.1,首先看下多路複用器Selector在NioEventLoop中的初始化。
/**
* 一,聚合的多路複用器selector
*/
Selector selector;
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;
/**
* 二,初始化NioEventLoop,selector = openSelector();
*/
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) {
super(parent, executor, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}
/**
* 二-1,selector = openSelector();
*/
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
//如果沒有開啟selectedKeys優化開關,直接返回provider.openSelector()。
if (DISABLE_KEYSET_OPTIMIZATION) {
return selector;
}
//如果開啟了,通過反射從selector中獲取selectedKeys和publicSelectedKeys,將其設定為可寫,通過反射的方式使用Netty構造的selectedKeys = selectedKeySet;替換JDK的。
try {
SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Class<?> selectorImplClass =
Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
// Ensure the current selector implementation is what we can instrument.
if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
return selector;
}
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
selectedKeys = selectedKeySet;
logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
} catch (Throwable t) {
selectedKeys = null;
logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
}
return selector;
}
2.2,分析run方法的實現,這個方法基本呼叫到NioEventLoop中的所有封裝方法,有的呼叫層級還比較深,不過耐心檢視,跟下去,大概流程就清楚了。(這裡我在註釋中用數字表示了層級跟蹤,例如4-1-1 為三級方法呼叫)
@Override
protected void run() {
//1-將wakeup還原false,並將之前的狀態儲存到oldWakeUp中
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
//2判斷當前訊息佇列中是否有訊息尚未處理,如果有,selectNow返回一次select操作。
if (hasTasks()) {
selectNow();
} else {
//3,輪詢,看是否有準備就緒的Channel
select(oldWakenUp);
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
//先後執行IO任務和非IO任務,兩類任務的執行時間比由變數ioRatio控制,預設是非IO任務允許執行和IO任務相同的時間
//Netty中控制IO執行比例佔比-分析,得到就緒狀態的SocketChannel
if (ioRatio == 100) {
//4
processSelectedKeys();
//5
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
//如果關閉,則進行優雅停機,呼叫closeAll,釋放資源
if (isShuttingDown()) {
//6,
closeAll();
if (confirmShutdown()) {
cleanupAndTerminate(true);
return;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
scheduleExecution();
}
/**
* [email protected] {@link Queue#isEmpty()}
*/
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}
/**
* 2-2 selectNow
*/
void selectNow() throws IOException {
try {
selector.selectNow();
} finally {
// restore wakup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}
/**
* 3-1 select
*/
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
//取當前系統的納秒時間
long currentTimeNanos = System.nanoTime();
//呼叫delayNanos計算定時任務的觸發時間
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
//死迴圈
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
//如果需要立即執行,或已經超時,則selector.selectNow();並退出當前迴圈
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
//將定時任務剩餘的超時時間作為引數進行select操作,沒完成一次select操作,對selectCnt+1
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
//如果有下列情況,則進行退出迴圈
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
//thread interrupted也break退出
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
//如果死迴圈了,則進行重建Selector方式,讓系統恢復正常rebuildSelector
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding selector.",
selectCnt);
//3-1-1
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
}
// Harmless exception - log anyway
}
}
/**
* 3-1-1
* Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
* around the infamous epoll 100% CPU bug.
*/
public void rebuildSelector() {
//如果為其它執行緒發起,則為了避免多執行緒併發操作,將rebuildSelector()封裝成task放到訊息佇列中,由NioEventLoop負責呼叫。
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector();
}
});
return;
}
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
}
try {
//建立新的Selector
newSelector = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.(將SocketChannel從舊的Selector移動到新的上)
int nChannels = 0;
for (;;) {
try {
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
break;
}
selector = newSelector;
try {
// time to close the old selector as everything else is registered to the new one 銷燬舊的selector
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
/**
* 4-1
*/
private void processSelectedKeys() {
//如果開啟selectedKeys優化功能走processSelectedKeysOptimized,否則走processSelectedKeysPlain
if (selectedKeys != null) {
//4-1-1
processSelectedKeysOptimized(selectedKeys.flip());
} else {
//4-1-2
processSelectedKeysPlain(selector.selectedKeys());
}
}
/**
*4-1-1
**/
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
for (;;) {
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
i++;
}
selectAgain();
// Need to flip the optimized selectedKeys to get the right reference to the array
// and reset the index to -1 which will then set to 0 on the for loop
// to start over again.
//
// See https://github.com/netty/netty/issues/1523
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
/**
*4-1-2
**/
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
//迴圈遍歷selectedKeys,進行操作
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
//獲取單個的進行,並從迭代器中刪除
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
//如果為AbstractNioChannel型別,進行IO讀寫相關操作
if (a instanceof AbstractNioChannel) {
//4-1-2-1
processSelectedKey(k, (AbstractNioChannel) a);
} else {
//taks型別
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
/**
*4-1-2-1
**/
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
//判斷是否可用,不可用直接關閉返回
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
//進行readyOps判斷並呼叫unsafe進行相應的操作,讀、寫
try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
/**
* 5-1 執行定時任務
* Poll all tasks from the task queue and run them via {@link Runnable#run()} method.
*
* @return {@code true} if and only if at least one task was run
*/
protected boolean runAllTasks() {
//5-1-1
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
return false;
}
for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
return true;
}
}
}
/**
* 5-1-1將到時間的任務加入到taskqueue中供執行
*/
private void fetchFromScheduledTaskQueue() {
if (hasScheduledTasks()) {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
break;
}
taskQueue.add(scheduledTask);
}
}
}
/**
* 6-1 關閉所有鏈路,釋放執行緒、各種資源
*/
private void closeAll() {
selectAgain();
Set<SelectionKey> keys = selector.keys();
Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
for (SelectionKey k: keys) {
Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
channels.add((AbstractNioChannel) a);
} else {
k.cancel();
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, k, null);
}
}
for (AbstractNioChannel ch: channels) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
}
Netty的執行緒模型,Reactor的設計非常牛逼的,直接決定了軟體的效能和併發處理能力。多學習,多思考,多反覆,多總結…… 繼續中……