1. 程式人生 > 程式設計 >聊聊rocketmq的consumeTimeout

聊聊rocketmq的consumeTimeout

本文主要研究一下rocketmq的consumeTimeout

consumeTimeout

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

    private final InternalLogger log = ClientLogger.getLog();

    //......

    /**
     * Maximum amount of time in
minutes a message may block the consuming thread. */ private long consumeTimeout = 15; public long getConsumeTimeout() { return consumeTimeout; } public void setConsumeTimeout(final long consumeTimeout) { this.consumeTimeout = consumeTimeout; } public TraceDispatcher getTraceDispatcher
() { return traceDispatcher; } } 複製程式碼
  • DefaultMQPushConsumer定義了consumeTimeout屬性,預設為15,單位是分鐘

ConsumeMessageConcurrentlyService

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
    private static final InternalLogger log
= ClientLogger.getLog(); private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; private final DefaultMQPushConsumer defaultMQPushConsumer; private final MessageListenerConcurrently messageListener; private final BlockingQueue<Runnable> consumeRequestQueue; private final ThreadPoolExecutor consumeExecutor; private final String consumerGroup; private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService cleanExpireMsgExecutors; public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,MessageListenerConcurrently messageListener) { this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl; this.messageListener = messageListener; this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer(); this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup(); this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>(); this.consumeExecutor = new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(),this.defaultMQPushConsumer.getConsumeThreadMax(),1000 * 60,TimeUnit.MILLISECONDS,this.consumeRequestQueue,new ThreadFactoryImpl("ConsumeMessageThread_")); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_")); this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_")); } public void start() { this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { @Override public void run() { cleanExpireMsg(); } },this.defaultMQPushConsumer.getConsumeTimeout(),TimeUnit.MINUTES); } private void cleanExpireMsg() { Iterator<Map.Entry<MessageQueue,ProcessQueue>> it = this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator(); while (it.hasNext()) { Map.Entry<MessageQueue,ProcessQueue> next = it.next(); ProcessQueue pq = next.getValue(); pq.cleanExpiredMsg(this.defaultMQPushConsumer); } } //...... } 複製程式碼
  • ConsumeMessageConcurrentlyService的start方法註冊了一個定時任務,每隔defaultMQPushConsumer.getConsumeTimeout()執行一次cleanExpireMsg方法;cleanExpireMsg方法會遍歷processQueueTable,然後挨個執行ProcessQueue的cleanExpiredMsg方法

ProcessQueue.cleanExpiredMsg

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java

public class ProcessQueue {

	//......

    public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
        if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
            return;
        }

        int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
        for (int i = 0; i < loop; i++) {
            MessageExt msg = null;
            try {
                this.lockTreeMap.readLock().lockInterruptibly();
                try {
                    if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
                        msg = msgTreeMap.firstEntry().getValue();
                    } else {

                        break;
                    }
                } finally {
                    this.lockTreeMap.readLock().unlock();
                }
            } catch (InterruptedException e) {
                log.error("getExpiredMsg exception",e);
            }

            try {

                pushConsumer.sendMessageBack(msg,3);
                log.info("send expire msg back. topic={},msgId={},storeHost={},queueId={},queueOffset={}",msg.getTopic(),msg.getMsgId(),msg.getStoreHost(),msg.getQueueId(),msg.getQueueOffset());
                try {
                    this.lockTreeMap.writeLock().lockInterruptibly();
                    try {
                        if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
                            try {
                                removeMessage(Collections.singletonList(msg));
                            } catch (Exception e) {
                                log.error("send expired msg exception",e);
                            }
                        }
                    } finally {
                        this.lockTreeMap.writeLock().unlock();
                    }
                } catch (InterruptedException e) {
                    log.error("getExpiredMsg exception",e);
                }
            } catch (Exception e) {
                log.error("send expired msg exception",e);
            }
        }
    }

    public long removeMessage(final List<MessageExt> msgs) {
        long result = -1;
        final long now = System.currentTimeMillis();
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            this.lastConsumeTimestamp = now;
            try {
                if (!msgTreeMap.isEmpty()) {
                    result = this.queueOffsetMax + 1;
                    int removedCnt = 0;
                    for (MessageExt msg : msgs) {
                        MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
                        if (prev != null) {
                            removedCnt--;
                            msgSize.addAndGet(0 - msg.getBody().length);
                        }
                    }
                    msgCount.addAndGet(removedCnt);

                    if (!msgTreeMap.isEmpty()) {
                        result = msgTreeMap.firstKey();
                    }
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (Throwable t) {
            log.error("removeMessage exception",t);
        }

        return result;
    }

    //......
}
複製程式碼
  • ProcessQueue的cleanExpiredMsg方法對於consumeOrderly的直接返回,之後根據msgTreeMap.size()確定loop,最大值為16;然後迴圈判斷msgTreeMap.firstEntry().getValue()是否超過consumeTimeout,不是則直接跳出迴圈;是的話執行pushConsumer.sendMessageBack(msg,3),然後呼叫removeMessage移除該msg;removeMessage方法則根據msg.getQueueOffset()移除訊息

小結

ConsumeMessageConcurrentlyService的start方法註冊了一個定時任務,每隔defaultMQPushConsumer.getConsumeTimeout()執行一次cleanExpireMsg方法;cleanExpireMsg方法會遍歷processQueueTable,然後挨個執行ProcessQueue的cleanExpiredMsg方法

doc