聊聊rocketmq的suspendCurrentQueueTimeMillis
阿新 • • 發佈:2020-06-24
序
本文主要研究一下rocketmq的suspendCurrentQueueTimeMillis
suspendCurrentQueueTimeMillis
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
//......
/**
* Suspending pulling time for cases requiring slow pulling like flow-control scenario.
*/
private long suspendCurrentQueueTimeMillis = 1000;
public long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis;
}
public void setSuspendCurrentQueueTimeMillis(final long suspendCurrentQueueTimeMillis) {
this.suspendCurrentQueueTimeMillis = suspend CurrentQueueTimeMillis;
}
//......
}
複製程式碼
- DefaultMQPushConsumer定義了suspendCurrentQueueTimeMillis屬性,預設值為1000
submitConsumeRequestLater
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
//......
private void submitConsumeRequestLater(
final ProcessQueue processQueue,final MessageQueue messageQueue,final long suspend TimeMillis
) {
long timeMillis = suspendTimeMillis;
if (timeMillis == -1) {
timeMillis = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis();
}
if (timeMillis < 10) {
timeMillis = 10;
} else if (timeMillis > 30000) {
timeMillis = 30000;
}
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
ConsumeMessageOrderlyService.this.submitConsumeRequest(null,processQueue,messageQueue,true);
}
},timeMillis,TimeUnit.MILLISECONDS);
}
//......
}
複製程式碼
- submitConsumeRequestLater方法在timeMillis為-1時會讀取defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis()的值,如果該值小於10則重置為10,如果該值大於30000則重置為30000;然後使用scheduledExecutorService延時timeMillis執行submitConsumeRequest方法
小結
DefaultMQPushConsumer定義了suspendCurrentQueueTimeMillis屬性,預設值為1000;ConsumeMessageOrderlyService的submitConsumeRequestLater方法在timeMillis為-1時會讀取defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis()的值,如果該值小於10則重置為10,如果該值大於30000則重置為30000;然後使用scheduledExecutorService延時timeMillis執行submitConsumeRequest方法