1. 程式人生 > 程式設計 >聊聊rocketmq的pullThresholdForTopic

聊聊rocketmq的pullThresholdForTopic

本文主要研究一下rocketmq的pullThresholdForTopic

pullThresholdForTopic

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

	//......

    /**
     * Flow control threshold on topic level,default value is -1(Unlimited)
     * <p>
     * The value of {@code pullThresholdForQueue} will be overwrote and calculated based on
     * {@code pullThresholdForTopic} if
it is't unlimited * <p> * For example,if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,* then pullThresholdForQueue will be set to 100 */ private int pullThresholdForTopic = -1; /** * Limit the cached message size on topic level,default value is -1 MiB(Unlimited) * <p> * The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on * {@code pullThresholdSizeForTopic} if it is'
t unlimited * <p> * For example,if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are * assigned to this consumer,then pullThresholdSizeForQueue will be set to 100 MiB */ private int pullThresholdSizeForTopic = -1; //...... public int getPullThresholdForTopic
() { return pullThresholdForTopic; } public void setPullThresholdForTopic(final int pullThresholdForTopic) { this.pullThresholdForTopic = pullThresholdForTopic; } public int getPullThresholdSizeForTopic() { return pullThresholdSizeForTopic; } public void setPullThresholdSizeForTopic(final int pullThresholdSizeForTopic) { this.pullThresholdSizeForTopic = pullThresholdSizeForTopic; } //...... } 複製程式碼
  • DefaultMQPushConsumer定義了pullThresholdForTopic(預設值-1)、pullThresholdSizeForTopic(預設值-1)屬性

checkConfig

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

	//......

    private void checkConfig() throws MQClientException {
        Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());

		//......

        // pullThresholdForTopic
        if (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1) {
            if (this.defaultMQPushConsumer.getPullThresholdForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500) {
                throw new MQClientException(
                    "pullThresholdForTopic Out of range [1,6553500]"
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
            }
        }

        if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1) {
            // pullThresholdSizeForTopic
            if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() < 1 || this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400) {
                throw new MQClientException(
                    "pullThresholdSizeForTopic Out of range [1,102400]"
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),null);
            }
        }

        //......    
    }
    
    //......   
}     
複製程式碼
  • checkConfig方法要求defaultMQPushConsumer.getPullThresholdForTopic()在值非-1時必須大於等於1且小於等於6553500;defaultMQPushConsumer.getPullThresholdSizeForTopic()在值非-1時必須大於等於1且小於等於102400

messageQueueChanged

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java

public class RebalancePushImpl extends RebalanceImpl {
    private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills","20000"));
    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;

    public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
        this(null,null,defaultMQPushConsumerImpl);
    }

    public RebalancePushImpl(String consumerGroup,MessageModel messageModel,AllocateMessageQueueStrategy allocateMessageQueueStrategy,MQClientInstance mQClientFactory,DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
        super(consumerGroup,messageModel,allocateMessageQueueStrategy,mQClientFactory);
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
    }

    @Override
    public void messageQueueChanged(String topic,Set<MessageQueue> mqAll,Set<MessageQueue> mqDivided) {
        /**
         * When rebalance result changed,should update subscription's version to notify broker.
         * Fix: inconsistency subscription may lead to consumer miss messages.
         */
        SubscriptionData subscriptionData = this.subscriptionInner.get(topic);
        long newVersion = System.currentTimeMillis();
        log.info("{} Rebalance changed,also update version: {},{}",topic,subscriptionData.getSubVersion(),newVersion);
        subscriptionData.setSubVersion(newVersion);

        int currentQueueCount = this.processQueueTable.size();
        if (currentQueueCount != 0) {
            int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
            if (pullThresholdForTopic != -1) {
                int newVal = Math.max(1,pullThresholdForTopic / currentQueueCount);
                log.info("The pullThresholdForQueue is changed from {} to {}",this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(),newVal);
                this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal);
            }

            int pullThresholdSizeForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic();
            if (pullThresholdSizeForTopic != -1) {
                int newVal = Math.max(1,pullThresholdSizeForTopic / currentQueueCount);
                log.info("The pullThresholdSizeForQueue is changed from {} to {}",this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(),newVal);
                this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal);
            }
        }

        // notify broker
        this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
    }

    //......
}
複製程式碼
  • messageQueueChanged方法在currentQueueCount不為0的時候,且pullThresholdForTopic、pullThresholdSizeForTopic不為-1的時候會重新設定pullThresholdForQueue、pullThresholdSizeForQueue;具體的規則是pullThresholdForQueue取Math.max(1,pullThresholdForTopic / currentQueueCount),pullThresholdSizeForQueue取Math.max(1,pullThresholdSizeForTopic / currentQueueCount)

小結

  • DefaultMQPushConsumer定義了pullThresholdForTopic(預設值-1)、pullThresholdSizeForTopic(預設值-1)屬性
  • checkConfig方法要求defaultMQPushConsumer.getPullThresholdForTopic()在值非-1時必須大於等於1且小於等於6553500;defaultMQPushConsumer.getPullThresholdSizeForTopic()在值非-1時必須大於等於1且小於等於102400
  • messageQueueChanged方法在currentQueueCount不為0的時候,且pullThresholdForTopic、pullThresholdSizeForTopic不為-1的時候會重新設定pullThresholdForQueue、pullThresholdSizeForQueue;具體的規則是pullThresholdForQueue取Math.max(1,pullThresholdSizeForTopic / currentQueueCount)

doc