1. 程式人生 > 程式設計 >聊聊rocketmq的updateConsumeOffsetToBroker

聊聊rocketmq的updateConsumeOffsetToBroker

本文主要研究一下rocketmq的updateConsumeOffsetToBroker

updateConsumeOffsetToBroker

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java

public class RemoteBrokerOffsetStore implements OffsetStore {

	//......

    /**
     * Update the Consumer Offset synchronously,once the Master is off,updated to Slave,* here need to be optimized.
     */
    @Override
    public void updateConsumeOffsetToBroker(MessageQueue mq,long offset,boolean isOneway) throws RemotingException,MQBrokerException,InterruptedException,MQClientException {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
        if
(null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); } if (findBrokerResult != null) { UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); requestHeader.setTopic(mq.getTopic()); requestHeader.setConsumerGroup(this.groupName); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setCommitOffset(offset); if
(isOneway) { this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( findBrokerResult.getBrokerAddr(),requestHeader,1000 * 5); } else { this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset( findBrokerResult.getBrokerAddr(),1000 * 5); } } else
{ throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist",null); } } //...... } 複製程式碼
  • RemoteBrokerOffsetStore的updateConsumeOffsetToBroker方法首先通過mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())獲取findBrokerResult
  • 若返回null,則執行mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()),然後再執行mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())獲取findBrokerResult
  • 之後對於findBrokerResult不為null的情況構建UpdateConsumerOffsetRequestHeader,然後執行mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway或者mQClientFactory.getMQClientAPIImpl().updateConsumerOffset

findBrokerAddressInAdmin

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {

	//......

    public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
        String brokerAddr = null;
        boolean slave = false;
        boolean found = false;

        HashMap<Long/* brokerId */,String/* address */> map = this.brokerAddrTable.get(brokerName);
        if (map != null && !map.isEmpty()) {
            for (Map.Entry<Long,String> entry : map.entrySet()) {
                Long id = entry.getKey();
                brokerAddr = entry.getValue();
                if (brokerAddr != null) {
                    found = true;
                    if (MixAll.MASTER_ID == id) {
                        slave = false;
                    } else {
                        slave = true;
                    }
                    break;

                }
            } // end of for
        }

        if (found) {
            return new FindBrokerResult(brokerAddr,slave,findBrokerVersion(brokerName,brokerAddr));
        }

        return null;
    }

    //......
}
複製程式碼
  • findBrokerAddressInAdmin方法首先從brokerAddrTable獲取指定brokerName的brokerId及address的map,然後遍歷map,對於brokerAddr不為null的標記found為true,標記brokerId為MixAll.MASTER_ID的slave為false,否則為true,最後跳出迴圈;若found為true則構造FindBrokerResult返回,否則返回null

小結

  • RemoteBrokerOffsetStore的updateConsumeOffsetToBroker方法首先通過mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())獲取findBrokerResult
  • 若返回null,則執行mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()),然後再執行mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())獲取findBrokerResult
  • 之後對於findBrokerResult不為null的情況構建UpdateConsumerOffsetRequestHeader,然後執行mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway或者mQClientFactory.getMQClientAPIImpl().updateConsumerOffset

doc