1. 程式人生 > 程式設計 >聊聊rocketmq的suspendCurrentQueueTimeMillis

聊聊rocketmq的suspendCurrentQueueTimeMillis

本文主要研究一下rocketmq的suspendCurrentQueueTimeMillis

suspendCurrentQueueTimeMillis

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

	//......

    /**
     * Suspending pulling time for
cases requiring slow pulling like flow-control scenario. */ private long suspendCurrentQueueTimeMillis = 1000; public long getSuspendCurrentQueueTimeMillis() { return suspendCurrentQueueTimeMillis; } public void setSuspendCurrentQueueTimeMillis(final long suspendCurrentQueueTimeMillis) { this.suspendCurrentQueueTimeMillis = suspend
CurrentQueueTimeMillis; } //...... } 複製程式碼
  • DefaultMQPushConsumer定義了suspendCurrentQueueTimeMillis屬性,預設值為1000

submitConsumeRequestLater

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java

public class ConsumeMessageOrderlyService implements ConsumeMessageService {

	//......

    private void submitConsumeRequestLater(
        final ProcessQueue processQueue,final MessageQueue messageQueue,final long suspend
TimeMillis ) { long timeMillis = suspendTimeMillis; if (timeMillis == -1) { timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis(); } if (timeMillis < 10) { timeMillis = 10; } else if (timeMillis > 30000) { timeMillis = 30000; } this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.submitConsumeRequest(null,processQueue,messageQueue,true); } },timeMillis,TimeUnit.MILLISECONDS); } //...... } 複製程式碼
  • submitConsumeRequestLater方法在timeMillis為-1時會讀取defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis()的值,如果該值小於10則重置為10,如果該值大於30000則重置為30000;然後使用scheduledExecutorService延時timeMillis執行submitConsumeRequest方法

小結

DefaultMQPushConsumer定義了suspendCurrentQueueTimeMillis屬性,預設值為1000;ConsumeMessageOrderlyService的submitConsumeRequestLater方法在timeMillis為-1時會讀取defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis()的值,如果該值小於10則重置為10,如果該值大於30000則重置為30000;然後使用scheduledExecutorService延時timeMillis執行submitConsumeRequest方法

doc