Netty為啥可靠(三)
連線中斷處理
在客戶端和服務端建立起連線之後,如果連線發生了意外中斷,Netty也會及時釋放連線控制代碼資源(因為TCP是全雙工協議,通訊雙方都需要關閉和釋放Socket控制代碼才不會發生控制代碼的洩漏,如不經過特殊處理是會發生控制代碼洩露的),原理如下:
在讀取資料時會呼叫io.netty.buffer.AbstractByteBuf.writeBytes(ScatteringByteChannel, int),然後呼叫io.netty.buffer.ByteBuf.setBytes(int, ScatteringByteChannel, int),setBytes方法呼叫nio.channel.read,如果當前連線已經意外中斷,會收到JDK NIO層丟擲的ClosedChannelException異常,setBytes方法捕獲該異常之後直接返回-1,
在NioByteUnsafe.read方法中,發現當前讀取到的位元組長度為-1,即呼叫io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.closeOnRead(ChannelPipeline)方法,然後呼叫io.netty.channel.AbstractChannel.AbstractUnsafe.close(ChannelPromise)關閉連線釋放控制代碼資源。參考相關的程式碼:
//NioByteUnsafe.read方法 public void read() { ... boolean close = false; try { ... do { ... int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { ... close = localReadAmount < 0; break; } ... } while (...); ... if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { ... } finally { ... } } //NioSocketChannel.doReadBytes方法 protected int doReadBytes(ByteBuf byteBuf) throws Exception { return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes()); } //AbstractByteBuf.writeBytes方法 public int writeBytes(ScatteringByteChannel in, int length) throws IOException { ensureWritable(length); int writtenBytes = setBytes(writerIndex, in, length); if (writtenBytes > 0) { writerIndex += writtenBytes; } return writtenBytes; } //UnpooledHeapByteBuf.setBytes方法 public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { ensureAccessible(); try { return in.read((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length)); } catch (ClosedChannelException e) { return -1; } }
流量整形
一般大型的系統都包含多個模組,在部署時不同的模組可能部署在不同的機器上,比如我司的專案,至少5個部件起,少了都不好意思拿出去見人。這種情況下系統執行時會涉及到大量的上下游部件的通訊,但是由於不同伺服器無論是從硬體配置,還是系統模組的業務特性都會存在差異,這就導致到伺服器的處理能力,以及不同時間段伺服器的負載都是有差異的,這就可能會導致問題:上下游訊息的傳遞速度和下游部件的訊息處理速度失去平衡,下游部件接收到的訊息量遠遠超過了它的處理能力,導致大量的業務無法被及時的處理,甚至可能導致下游伺服器被壓垮。
在Netty框架中提供了流量整形處理機制來應付這種場景,通過控制伺服器單位時間內傳送/接收訊息的位元組數來使上下游伺服器處理相對平衡的狀態。Netty中的流量整形包含了兩種:一種是針對單個連線的流量整形,另一種是針對全域性即所有連線的流量整形。這兩種方式的流量整形原理是類似的,只是流量整形器的作用域不同,一個是全域性的,一個是連線建立後建立,連線關閉後被回收。GlobalTrafficShapingHandler處理全域性流量整形,ChannelTrafficShapingHandler處理單鏈路流量整形,流量整形處理有三個重要的引數:
- writeLimit:每秒最多可以寫多個位元組的資料。
- readLimit:每秒最多可以讀多少個位元組的資料。
- checkInterval:流量檢查的間隔時間,預設1s。
以讀操作為例,流量整形的工作過程大致如下:
- 啟動一個定時任務,每隔checkInterval毫秒執行一次,在任務中清除累加的讀寫位元組數還原成0,更新上次流量整形檢查時間。
- 執行讀操作,觸發channelRead方法,記錄當前已讀取的位元組數並且和上次流量整形檢查之後的所有讀操作讀取的位元組數進行累加。
- 根據時間間隔和已讀取的流量數計算當前流量判斷當前讀取操作是否已導致每秒讀取的位元組數超過了閥值readLimit,計算公式是:(bytes * 1000 / limit - interval) / 10 * 10,其中,bytes是上次流量整形檢查之後的所有讀操作累計讀取的位元組數,limit 就是readLimit,interval是當前時間距上次檢查經過的時間毫秒數,如果該公式計算出來的值大於固定的閥值10,那麼說明流量數已經超標,那麼把該讀操作放到延時任務中處理,延時的毫秒數就是上面那個公式計算出來的值。
下面是相關的程式碼:
//AbstractTrafficShapingHandler.channelRead方法
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
long size = calculateSize(msg);
long curtime = System.currentTimeMillis();
if (trafficCounter != null) {
//增加位元組累計數
trafficCounter.bytesRecvFlowControl(size);
if (readLimit == 0) {
// no action
ctx.fireChannelRead(msg);
return;
}
// compute the number of ms to wait before reopening the channel
long wait = getTimeToWait(readLimit,
trafficCounter.currentReadBytes(),
trafficCounter.lastTime(), curtime);
if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
// time in order to
// try to limit the traffic
if (!isSuspended(ctx)) {
ctx.attr(READ_SUSPENDED).set(true);
// Create a Runnable to reactive the read if needed. If one was create before it will just be
// reused to limit object creation
Attribute<Runnable> attr = ctx.attr(REOPEN_TASK);
Runnable reopenTask = attr.get();
if (reopenTask == null) {
reopenTask = new ReopenReadTimerTask(ctx);
attr.set(reopenTask);
}
ctx.executor().schedule(reopenTask, wait,
TimeUnit.MILLISECONDS);
} else {
// Create a Runnable to update the next handler in the chain. If one was create before it will
// just be reused to limit object creation
Runnable bufferUpdateTask = new Runnable() {
@Override
public void run() {
ctx.fireChannelRead(msg);
}
};
ctx.executor().schedule(bufferUpdateTask, wait, TimeUnit.MILLISECONDS);
return;
}
}
}
ctx.fireChannelRead(msg);
}
//AbstractTrafficShapingHandler.getTimeToWait方法
private static long getTimeToWait(long limit, long bytes, long lastTime, long curtime) {
long interval = curtime - lastTime;
if (interval <= 0) {
// Time is too short, so just lets continue
return 0;
}
return (bytes * 1000 / limit - interval) / 10 * 10;
}
private static class TrafficMonitoringTask implements Runnable {
...
@Override
public void run() {
if (!counter.monitorActive.get()) {
return;
}
long endTime = System.currentTimeMillis();
//還原累計位元組數,lastTime等變數
counter.resetAccounting(endTime);
if (trafficShapingHandler1 != null) {
trafficShapingHandler1.doAccounting(counter);
}
counter.scheduledFuture = counter.executor.schedule(this, counter.checkInterval.get(),
TimeUnit.MILLISECONDS);
}
}