netty原始碼閱讀與分析----HashedWheelTimer
阿新 • • 發佈:2019-01-03
netty是一個基礎通訊框架,管理的連線數較多,可能多至百萬級,每一個連線都有或多或少有超時任務,比如傳送資料超時,心跳檢測等。如果為每一個連線都啟動一個Timer,不僅效率低下,而且佔用資源。基於論文Hashed and hierarchical timing wheels: data structures for the efficient implementation of a timer facility提出的定時輪,netty採用這種方式來管理和維護大量的定時任務,實現就在HashedWheelTimer這個類中,其原理如下:
定時輪其實就是一種環型的資料結構,可以把它想象成一個時鐘,分成了許多格子,每個格子代表一定的時間,在這個格子上用一個連結串列來儲存要執行的超時任務,同時有一個指標一格一格的走,走到哪個格子時就執行格子對應的超時任務,超時任務通過一定的規則放入到格子中,如下圖所示:
以上圖為例子,假設一格代表1s,上圖能表示的時間段則為8s,假設當前指標指向3,有一個任務需要3s後執行,那麼這個任務應該放在3+3=6的格子中,如果有一個任務需要6s後執行,那麼這個任務應該放在(3+6)%8=1的格子中。接下來看下netty中的HashedWheelTimer類是具體如何實現這個演算法的,建構函式如下:
接下來看下建立定時輪的方法:public HashedWheelTimer( ThreadFactory threadFactory,//用於建立worker執行緒 long tickDuration, //表示一格的時長,就是多久走一格 TimeUnit unit, //時間單位 int ticksPerWheel, //一圈有多少格 boolean leakDetection,//是否開啟記憶體洩露檢測 long maxPendingTimeouts ) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } if (unit == null) { throw new NullPointerException("unit"); } if (tickDuration <= 0) { throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } if (ticksPerWheel <= 0) { throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } // Normalize ticksPerWheel to power of two and initialize the wheel. wheel = createWheel(ticksPerWheel);//建立定時輪,格數為2的冪次方 mask = wheel.length - 1;//因為格子數為2的冪次方,此處用於代替%取餘操作,可以提高效率 // Convert tickDuration to nanos. this.tickDuration = unit.toNanos(tickDuration);//轉換成納秒 // Prevent overflow. if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {//校驗是否存在溢位。即指標轉動的時間間隔不能太長而導致tickDuration*wheel.length>Long.MAX_VALUE throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } workerThread = threadFactory.newThread(worker); leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; this.maxPendingTimeouts = maxPendingTimeouts; if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); } }
這裡我們可以看到,一個定時輪就是包含了一個數組(確切的說是環形陣列,對應環形的資料結構),每個元素HashedWheelBucket是一個連結串列,private static HashedWheelBucket[] createWheel(int ticksPerWheel) { if (ticksPerWheel <= 0) { throw new IllegalArgumentException( "ticksPerWheel must be greater than 0: " + ticksPerWheel); } if (ticksPerWheel > 1073741824) { throw new IllegalArgumentException( "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); } ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);//確保為2的米次方 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0; i < wheel.length; i ++) { wheel[i] = new HashedWheelBucket(); } return wheel; }
private static final class HashedWheelBucket {
// Used for the linked-list datastructure
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
.....
}
接下來看下定時輪的啟動,停止和新增任務,首先是啟動:public void start() {// 啟動定時輪。這個方法其實在新增定時任務(newTimeout()方法)的時候會自動呼叫此方法,因為如果時間輪里根本沒有定時任務,啟動時間輪也是空耗資源
// 判斷當前時間輪的狀態,如果是初始化,則啟動worker執行緒,啟動整個定時輪;如果已經啟動則略過;如果是已經停止,則報錯。這裡因為可能有多個執行緒爭搶啟動定時輪,所以採用了cas方式的無鎖設計
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
startTimeInitialized.await();//等待worker啟動
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
接下來看下stop:public Set<Timeout> stop() {
// worker執行緒不能停止定時輪,也就是加入的定時任務的執行緒不能呼叫這個方法。防止惡意的定時任務呼叫這個方法造成定時任務失效
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from " +
TimerTask.class.getSimpleName());
}
// 嘗試CAS替換當前狀態變為“停止:2”。如果失敗,則當前時間輪的狀態只能是“初始化:0”或者“停止:2”。直接將當前狀態設定為“停止:2“
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2.
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
return Collections.emptySet();
}
try {
boolean interrupted = false;
while (workerThread.isAlive()) {
workerThread.interrupt();//中斷worker執行緒
try {
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}
return worker.unprocessedTimeouts();//返回未處理的任務
}
接下來看下新增任務的方法:public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow.
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
其實就是包裝成一個HashedWheelTimeout任務物件,然後放入到timeouts佇列中。從閱讀netty原始碼以及之前disruptor的原始碼後我們可以發現,無鎖設計,juc併發包在框架中的應用非常普遍。