org.apache.dubbo 2.7.7 服務端處理請求及時間輪(失敗重試)
本文主要針對dubbo-spring-boot-starter 2.7.7版本, 對應的 org.apache.dubbo 2.7.7 版本的原始碼。
本文主要從以下幾個點來分析:
- 服務端處理請求.
- 時間輪(失敗重試)。
服務端接收資料的處理流程:
客戶端請求發出去之後,服務端會收到這個請求的訊息,然後觸發呼叫。服務端這邊接收訊息的處理鏈路,也比較複雜,我們回到NettServer中建立io的過程。
@Override protected void doOpen() throws Throwable { bootstrap = new ServerBootstrap(); bossGroup= NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss"); workerGroup = NettyEventLoopFactory.eventLoopGroup( getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), "NettyServerWorker"); final NettyServerHandler nettyServerHandler= new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NettyEventLoopFactory.serverSocketChannelClass()) .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // FIXME: should we use getTimeout()? int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); if (getUrl().getParameter(SSL_ENABLED_KEY, false)) { ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler)); } ch.pipeline() .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
服務端啟動的時候,配置的訊息處理是handler配置的是nettyServerHandler , 所以我們直接進入到 NettyServerHandler#channelRead ,這個方法負責處理請求。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); handler.received(channel, msg); }
服務端收到讀的請求是,會進入這個方法。接著通過handler.received來處理msg ,而這個handler 是在服務釋出的時候構建得。起鏈路如下:
MultiMessageHandler(複合訊息處理) --->HeartbeatHandle(心跳訊息處理,接收心跳併發送心跳響應) --->AllChannelHandler (業務執行緒轉化處理器)--->DecodeHandler (業務解碼處理器)--->HeaderExchangeHandler--->DubboProtocol#requestHandler(new ExchangeHandlerAdapter())
而在構建NettyServerHandler 得時候將 this 傳了進去。this 即 NettyServer 。NettyServer是AbstractPeer 得子類。所以handler.received此時會呼叫AbsstractPeer.received方法,這個方法用來判斷服務端是否關閉了,如果關閉就直接返回,否則,通過handler處理鏈進行層層呼叫。
我們直接進入HeaderExchangeHandler.received,互動層請求響應處理,有三種處理方式
- handlerRequest,雙向請求
- handler.received 單向請求
- handleResponse 響應訊息
public void received(Channel channel, Object message) throws RemotingException { final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); if (message instanceof Request) { // handle request. 處理請求 Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { handleRequest(exchangeChannel, request); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } }
接著呼叫handleRequest方法。這個方法中,構建返回的物件Response,並且最終會通過非同步的方式來把msg傳遞到invoker中進行呼叫 handler.reply .這裡得最後一層就是呼叫 DubboProtocol 類中得 requestHandler 屬性構造得ExchangeHandlerAdapter 內部類的reply 方法:
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException { //如果訊息型別不是invocation,則丟擲異常表示無法識別 if (!(message instanceof Invocation)) { throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } //獲得請求引數 Invocation inv = (Invocation) message; // 獲取 invoker 領域物件,這個物件是在釋出服務的時候構建,然後封裝成 exporter 存在map裡面的。 Invoker<?> invoker = getInvoker(channel, inv); // need to consider backward-compatibility if it's a callback if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || !methodsStr.contains(",")) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); Result result = invoker.invoke(inv); // 發起對應呼叫 return result.thenApply(Function.identity()); } //......省略程式碼 };
invoker.invoke,發起本地服務呼叫,但是此時呼叫之前,invoke並不是一個直接呼叫的物件,而是包裝過的。在ServiceConfig#doExportUrlsFor1Protocol 構建包裝。最後的呼叫鏈路如下:
RegistryProtocol.InvokerDelegate.invoke --->DelegateProviderMetaDataInvoker.invoke--->AbstractProxyInvoker.invoke--->AbstractProxyInvoker(JavassistProxyFactory#getInvoker)
InvokerDelegate 未實現父類InvokerWrapper invoke方法。進入到InvokerWrapper.invoke方法,這個是一個Invoker包裝類,包裝了URL地址資訊和真正的Invoker代理物件。
@Override public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }
DelegateProviderMetaDataInvoker:這裡是一個委派類,它提供了服務提供者的元數序資訊。
@Override public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }
AbstractProxyInvoker:接著進入到AbstractProxyInvoker的invoke方法,在這個方法中,我們可以看到它會呼叫子類的doInvoke方法,獲得返回結果。其中proxy,表示服務端的物件例項,這個例項很顯然是在構建動態代理Invoker物件時儲存進來的。
public Result invoke(Invocation invocation) throws RpcException { try { Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()); CompletableFuture<Object> future = wrapWithFuture(value); CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> { AppResponse result = new AppResponse(); if (t != null) { if (t instanceof CompletionException) { result.setException(t.getCause()); } else { result.setException(t); } } else { result.setValue(obj); } return result; }); return new AsyncRpcResult(appResponseFuture, invocation); } catch (InvocationTargetException e) { if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) { logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e); } return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } }
最後進入到具體的子類,也就是在服務的釋出的時候通過 構建的
@Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
然後發起本地的呼叫即可,將結果返回 。至此,服務消費的處理流程就分析完了。最後附上這個流程的流程圖。
Dubbo中失敗重試的設計:
在Dubbo中,有很多地方涉及到服務失敗重試,比如服務註冊失敗時,會呼叫一個方法把失敗的請求儲存起來進行重試。clusterInvoker失敗重試
註冊中心失敗重試 FailbackRegistry#addFailedRegistered
private final HashedWheelTimer retryTimer; private void addFailedRegistered(URL url) { FailedRegisteredTask oldOne = failedRegistered.get(url); if (oldOne != null) { return; } FailedRegisteredTask newTask = new FailedRegisteredTask(url, this); oldOne = failedRegistered.putIfAbsent(url, newTask); if (oldOne == null) { // never has a retry task. then start a new task for retry. retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); } }
clusterInvoker(FailbackClusterInvoker)失敗重試
private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) { if (failTimer == null) { synchronized (this) { if (failTimer == null) { failTimer = new HashedWheelTimer( new NamedThreadFactory("failback-cluster-timer", true), 1, TimeUnit.SECONDS, 32, failbackTasks); } } } RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD); try { failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS); } catch (Throwable e) { logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage()); } }
不難發現他們都是基於HashedWhelloTimer這個類來實現的。
時間輪:
時間輪這個技術其實出來很久了,在kafka、zookeeper等技術中都有時間輪使用的方式。什麼是時間輪呢?
簡單來說: 時間輪是一種高效利用執行緒資源進行批量化排程的一種排程模型。把大批量的排程任務全部繫結到同一個排程器上,使用這一個排程器來進行所有任務的管理、觸發、以及執行。所以時間輪的模型能夠高效管理各種延時任務、週期任務、通知任務。 在工作中遇到類似的功能,可以採用時間輪機制。
時間輪,從圖片上來看,就和手錶的表圈是一樣,所以稱為時間輪,是因為它是以時間作為刻度組成的一個環形佇列,這個環形佇列採用陣列來實現,陣列的每個元素稱為槽,每個槽可以放一個定時任務列表,叫HashedWheelBucket,它是一個雙向連結串列,量表的每一項表示一個定時任務項(HashedWhellTimeout),其中封裝了真正的定時任務TimerTask。時間輪是由多個時間格組成,下圖中有8個時間格,每個時間格代表當前時間輪的基本時間跨度(tickDuration),其中時間輪的時間格的個數是固定的。
在下圖中,有8個時間格(槽),假設每個時間格的單位為1s,那麼整個時間輪走完一圈需要8s鍾。每秒鐘指標會沿著順時針方向移動一格,這個單位可以設定,比如以秒為單位,可以以一小時為單位,這個單位可以代表時間精度。
通過指標移動,來獲得每個時間格中的任務列表,然後遍歷這一個時間格中的雙向連結串列來執行任務,以此迴圈。
時間輪的執行邏輯:
首先,時間輪在啟動的時候,會記錄一下當前啟動時間,並賦值給一個叫startTime的變數。然後當需要新增任務的時候,首先會計算延遲時間(deadline),比如一個任務的延遲時間是24ms,那麼在新增任務時,會將當前時間(currentTime)+24ms-時間輪的啟動時間(startTime),然後把這個任務封裝成HashedWheelTimeout加入到連結串列中。
那麼這個任務應該放在哪個時間格里面呢? 通過 deadline%wheel.length 計算.時間輪在執行的時候,會從任務佇列中取出10W個進行遍歷處理。
Dubbo中的時間輪實現類是:HashedWheelTimer:
在FailbackClusterInvoker這個類中,構建了一個HashedWheelTimer,然後增加了一個任務RetryTimerTask到時間輪中。基於這段程式碼, 我們去分析一下HashedWheelTimer的實現。 FailbackClusterInvoker#addFailed 主要做了兩件事情:
- 如果時間輪等於 null,則初始化時間輪
- 建立重試任務,啟動時間輪。
HashedWheelTimer的構造:
- 呼叫createWheel建立一個時間輪,時間輪陣列一定是2的冪次方,比如傳入的
- ticksPerWheel=6,那麼初始化的wheel長度一定是8,這樣是便於時間格的計算。tickDuration,表示時間輪的跨度,代表每個時間格的時間精度,以納秒的方式來表現。
- 把工作執行緒Worker封裝成WorkerThread,從名字可以知道,它就是最終那個負責幹活的執行緒。
public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, long maxPendingTimeouts) { // 省略判斷邏輯程式碼 // Normalize ticksPerWheel to power of two and initialize the wheel. // 建立時間輪基本的資料結構,一個數組。長度為不小於ticksPerWheel的最小2的n次方 wheel = createWheel(ticksPerWheel); // 這是一個標示符,用來快速計算任務應該呆的格子。 // 我們知道,給定一個deadline的定時任務,其應該呆的格子=deadline%wheel.length.但是%操作是個相對耗時的操作,所以使用一種變通的位運算代替: // 因為一圈的長度為2的n次方,mask = 2^n-1後低位將全部是1,然後deadline&mast == deadline%wheel.length // java中的HashMap在進行hash之後,進行index的hash定址定址的演算法也是和這個一樣的 mask = wheel.length - 1; // Convert tickDuration to nanos. //時間輪的基本時間跨度,(tickDuration傳入是1的話,這裡會轉換成1000000) this.tickDuration = unit.toNanos(tickDuration); // Prevent overflow. // 校驗是否存在溢位。即指標轉動的時間間隔不能太長而導致tickDuration*wheel.length>Long.MAX_VALUE if (this.tickDuration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } //把worker包裝成thread workerThread = threadFactory.newThread(worker); this.maxPendingTimeouts = maxPendingTimeouts; //如果HashedWheelTimer例項太多,那麼就會列印一個error日誌 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); } }
然後來看看時間輪的建立createWheel
- 對傳入的ticksPerWheel進行整形
- 初始化固定長度的HashedWheelBucket
private static HashedWheelBucket[] createWheel(int ticksPerWheel) { if (ticksPerWheel <= 0) { throw new IllegalArgumentException( "ticksPerWheel must be greater than 0: " + ticksPerWheel); } if (ticksPerWheel > 1073741824) { throw new IllegalArgumentException( "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); } //對傳入的時間輪大小進行整形,整形成2的冪次方 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); //初始化一個固定長度的Bucket陣列 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0; i < wheel.length; i++) { wheel[i] = new HashedWheelBucket(); } return wheel; } private static int normalizeTicksPerWheel(int ticksPerWheel) { int normalizedTicksPerWheel = ticksPerWheel - 1; normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 1; normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 2; normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 4; normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 8; normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 16; return normalizedTicksPerWheel + 1; }
完成時間輪的初始化之後,並沒有去啟動時間輪,繼續看FailbackClusterInvoker中的程式碼。構建了一個RetryTimerTask,也就是一個重試的定時任務,接著把這個任務通過newTimeout加入到時間輪中,其中
- retryTimerTask,表示具體的重試任務
- RETRY_FAILED_PERIOD , 表示重試間隔時間,預設為5s
呼叫newTimeout方法,把任務新增進來。
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } //統計任務個數 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); //判斷最大任務數量是否超過限制 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } //如果時間輪沒有啟動,則通過start方法進行啟動 start(); // Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. //計算任務的延遲時間,通過當前的時間+當前任務執行的延遲時間-時間輪啟動的時間。 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; //在delay為正數的情況下,deadline是不可能為負數 //如果為負數,那麼說明超過了long的最大值 // Guard against overflow. if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } //建立一個Timeout任務,理論上來說,這個任務應該要加入到時間輪的時間格子中,但是這裡並不是先新增到時間格, //而是先加入到一個阻塞佇列,然後等到時間輪執行到下一個格子時,再從佇列中取出最多100000個任務新增到指定的時間格(槽)中。 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); timeouts.add(timeout); return timeout; }
任務新增到阻塞佇列之後,我們再來看啟動方法,start方法會根據當前的workerState狀態來啟動時間輪。並且用了startTimeInitialized來控制執行緒的執行,如果workerThread沒有啟動起來,那麼newTimeout方法會一直阻塞在執行start方法中。如果不阻塞,newTimeout方法會獲取不到startTime。
public void start() { //workerState一開始的時候是0(WORKER_STATE_INIT),然後才會設定為1(WORKER_STATE_STARTED) switch (WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } // 等待worker執行緒初始化時間輪的啟動時間 // Wait until the startTime is initialized by the worker. while (startTime == 0) { try { //這裡使用countDownLauch來確保排程的執行緒已經被啟動 startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } }
啟動時間輪:呼叫start()方法, 會呼叫 workerThread.start(); 來啟動一個工作執行緒,這個工作執行緒是在構造方法中初始化的,包裝的是一個Worker內部執行緒類。所以直接進入到Worker這個類的run方法,瞭解下它的設計邏輯
public void run() { // Initialize the startTime. // 初始化startTime,表示時間輪的啟動時間 startTime = System.nanoTime(); // 喚醒被阻塞的start()方法。 if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; } // Notify the other threads waiting for the initialization at start(). startTimeInitialized.countDown(); do { //返回每tick一次的時間間隔 final long deadline = waitForNextTick(); if (deadline > 0) { //計算時間輪的槽位 int idx = (int) (tick & mask); //移除掉CancelledTask processCancelledTasks(); //得到當前指標位置的時間槽 HashedWheelBucket bucket = wheel[idx]; //將newTimeout()方法中加入到待處理定時任務佇列中的任務加入到指定的格子中 transferTimeoutsToBuckets(); //執行目前指標指向的槽中的bucket連結串列中的任務 bucket.expireTimeouts(deadline); tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // Fill the unprocessedTimeouts so we can return them from stop() method. //如果Worker_State一隻是started狀態,就一直迴圈 for (HashedWheelBucket bucket : wheel) { //清除時間輪中不需要處理的任務 bucket.clearTimeouts(unprocessedTimeouts); } for (; ; ) { //遍歷任務佇列,發現如果有任務被取消,則新增到unprocessedTimeouts,也就是不需要處理的佇列中。 HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } }//處理被取消的任務. processCancelledTasks(); }
時間輪指標跳動:這個方法的主要作用就是返回下一個指標指向的時間間隔,然後進行sleep操作。可以想象一下,一個鐘錶上秒與秒之間是有時間間隔的,那麼waitForNextTick就是根據當前時間計算出跳動到下個時間的時間間隔,然後進行sleep,然後再返回當前時間距離時間輪啟動時間的時間間隔。
說得再直白一點:,假設當前的tickDuration的間隔是1s,tick預設=0, 此時第一次進來,得到的deadline=1,也就是下一次跳動的時間間隔是1s。
private long waitForNextTick() { //tick表示總的tick數 //tickDuration表示每個時間格的跨度,所以deadline返回的是下一次時間輪指標跳動的時間 long deadline = tickDuration * (tick + 1); for (; ; ) { //計算當前時間距離啟動時間的時間間隔 final long currentTime = System.nanoTime() - startTime; //通過下一次指標跳動的延遲時間距離當前時間的差額,這個作為sleep時間使用。 // 其實執行緒是以睡眠一定的時間再來執行下一個ticket的任務的 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; //sleepTimeMs小於零表示走到了下一個時間槽位置 if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { return currentTime; } } if (isWindows()) { sleepTimeMs = sleepTimeMs / 10 * 10; } //進入到這裡進行sleep,表示當前時間距離下一次tick時間還有一段距離,需要sleep。 try { Thread.sleep(sleepTimeMs); } catch (InterruptedException ignored) { if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } } }
transferTimeoutsToBuckets:轉移任務到時間輪中,前面我們講過,任務新增進來時,是先放入到阻塞佇列。
而在現在這個方法中,就是把阻塞佇列中的資料轉移到時間輪的指定位置。在這個轉移方法中,寫死了一個迴圈,每次都只轉移10萬個任務。然後根據HashedWheelTimeout的deadline延遲時間計算出時間輪需要執行多少次才能運行當前的任務,如果當前的任務延遲時間大於時間輪跑一圈所需要的時間,那麼就計算需要跑幾圈才能到這個任務執行。最後計算出該任務在時間輪中的槽位,新增到時間輪的連結串列中。
private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
// 迴圈100000次,也就是每次轉移10w個任務
for (int i = 0; i < 100000; i++) {
//從阻塞佇列中獲得具體的任務
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
//計算tick次數,deadline表示當前任務的延遲時間,tickDuration表示時間槽的間隔,兩者相除就可以計算當前任務需要tick幾次才能被執行
long calculated = timeout.deadline / tickDuration;
// 計算剩餘的輪數, 只有 timer 走夠輪數, 並且到達了 task 所在的 slot, task 才會過期.(被執行)
timeout.remainingRounds = (calculated - tick) / wheel.length;
// Ensure we don't schedule for past.
//如果任務在timeouts佇列裡面放久了, 以至於已經過了執行時間, 這個時候就使用當前tick,也就是放到當前bucket, 此方法呼叫完後就會被執行
final long ticks = Math.max(calculated, tick);
// 算出任務應該插入的 wheel 的 slot, stopIndex = tick 次數 & mask, mask =wheel.length - 1
int stopIndex = (int) (ticks & mask);
//把timeout任務插入到指定的bucket鏈中
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
執行時間輪中的任務:當指標跳動到某一個時間槽中時,會就觸發這個槽中的任務的執行。該功能是通過expireTimeouts來實現這個方法的主要作用是: 過期並執行格子中到期的任務。也就是當tick進入到指定格子時,worker執行緒會呼叫這個方法
HashedWheelBucket是一個連結串列,所以我們需要從head節點往下進行遍歷。如果連結串列沒有遍歷到連結串列尾部那麼就繼續往下遍歷。獲取的timeout節點,如果剩餘輪數remainingRounds大於0,那麼就說明要到下一圈才能執行,所以將剩餘輪數減一
如果當前剩餘輪數小於等於零了,那麼就將當前節點從bucket連結串列中移除,並判斷一下當前的時間是否大於timeout的延遲時間,如果是則呼叫timeout的expire執行任務。
void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
// 遍歷當前時間槽中的所有任務
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
//如果當前任務要被執行,那麼remainingRounds應該小於或者等於0
if (timeout.remainingRounds <= 0) {
//從bucket連結串列中移除當前timeout,並返回連結串列中下一個timeout
next = remove(timeout);
//如果timeout的時間小於當前的時間,那麼就呼叫expire執行task
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
//不可能發生的情況,就是說round已經為0了,deadline卻>當前槽的deadline
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
//因為當前的槽位已經過了,說明已經走了一圈了,把輪數減一
timeout.remainingRounds--;
}
//把指標放置到下一個timeout
timeout = next;
}
}
這就是整個時間輪的流程了。