Netty為啥可靠(二)
Selector空輪詢處理
在NIO中通過Selector的輪詢當前是否有IO事件,根據JDK NIO api描述,Selector的select方法會一直阻塞,直到IO事件達到或超時,但是在Linux平臺上這裡有時會出現問題,在某些場景下select方法會直接返回,即使沒有超時並且也沒有IO事件到達,這就是著名的epoll bug,這是一個比較嚴重的bug,它會導致執行緒陷入死迴圈,會讓CPU飆到100%,極大地影響系統的可靠性,到目前為止,JDK都沒有完全解決這個問題。
但是Netty有效的規避了這個問題,經過實踐證明,epoll bug已Netty框架解決,Netty的處理方式是這樣的:
記錄select空轉的次數,定義一個閥值,這個閥值預設是512,可以在應用層通過設定系統屬性io.netty.selectorAutoRebuildThreshold傳入,當空轉的次數超過了這個閥值,重新構建新Selector,將老Selector上註冊的Channel轉移到新建的Selector上,關閉老Selector,用新的Selector代替老Selector,詳細實現可以檢視NioEventLoop中的selector和rebuildSelector方法:
for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } int selectedKeys = selector.select(timeoutMillis); selectCnt ++; if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) { // Selected something, // waken up by user, or // the task queue has a pending task. break; } if (selectedKeys == 0 && Thread.interrupted()) { // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or it's client library we will // also log it. // // See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; } if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding selector.", selectCnt); rebuildSelector(); selector = this.selector; // Select again to populate selectedKeys. selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = System.nanoTime(); }
public void rebuildSelector() { if (!inEventLoop()) { execute(new Runnable() { @Override public void run() { rebuildSelector(); } }); return; } final Selector oldSelector = selector; final Selector newSelector; if (oldSelector == null) { return; } try { newSelector = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; } // Register all channels to the new Selector. int nChannels = 0; for (;;) { try { for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { if (!key.isValid() || key.channel().keyFor(newSelector) != null) { continue; } int interestOps = key.interestOps(); key.cancel(); key.channel().register(newSelector, interestOps, a); nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, key, e); } } } } catch (ConcurrentModificationException e) { // Probably due to concurrent modification of the key set. continue; } break; } selector = newSelector; try { // time to close the old selector as everything else is registered to the new one oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } } logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); }
防止執行緒跑飛
執行緒是多路複用器的核心,所有IO事件執行的載體,一旦執行緒出現異常執行緒跑飛(run方法執行結束),那麼可能會導致整個多路複用器不可用,導致掛載在多路複用器上的連線不可用,進而大量的業務請求失敗。由於Netty中的同時處理IO事件和非IO事件邏輯,所以執行緒不僅僅要處理IO異常,業務測觸發的異常也需要被正確的處理,一旦處理不當,會導致執行緒跑飛。Netty的處理是在run方法中catch所有的Throwable即所有的Exception和Error,不做任何處理,休眠1s繼續執行迴圈,休眠1s的目的是為了防止捕獲異常之後繼續執行再次進入該異常形成死迴圈。實現程式碼在NioEventLoop的run方法中:
@Override
protected void run() {
for (;;) {
oldWakenUp = wakenUp.getAndSet(false);
try {
...
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
記憶體保護
ByteBuf記憶體洩露保護
為了提升記憶體的利用率,Netty提供了記憶體池和物件池,記憶體洩露保護主要是針對Netty中的記憶體池的,Netty要求在使用完記憶體池中的記憶體之後要顯示的歸還,以免記憶體中的物件存在額外的引用造成記憶體洩露,Netty提供了SimpleChannelInboundHandler,該處理器會自動釋放記憶體,使用者可以直接繼承該處理器,它的channelRead方法的finally塊中呼叫了釋放記憶體的方法,另外記憶體洩露監控處理可以參考ResourceLeakDetector類中的程式碼:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
ByteBuf的記憶體溢位保護
為了防止一些超長的惡意流量耗盡伺服器記憶體壓垮伺服器,有必要會快取區設定上限,Netty做了如下處理:
- 在記憶體分配的時候指定緩衝區長度上限(io.netty.buffer.ByteBufAllocator.buffer(int, int))。
- 在對緩衝區進行寫入操作的時候,如果緩衝區容量不足需要擴充套件,首先對最大容量進行判斷,如果擴充套件後的容量超過上限,則拒絕擴充套件(io.netty.buffer.ByteBuf.ensureWritable(int)方法中處理)。
- 在解碼的時候,對訊息長度進行判斷,如果超過最大容量上限,則丟擲解碼異常,拒絕分配記憶體(io.netty.handler.codec.DelimiterBasedFrameDecoder.decode(ChannelHandlerContext, ByteBuf)方法中處理,在fail方法中丟擲TooLongFrameException異常)。