聊聊rocketmq的updateConsumeOffsetToBroker
阿新 • • 發佈:2020-06-24
序
本文主要研究一下rocketmq的updateConsumeOffsetToBroker
updateConsumeOffsetToBroker
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
public class RemoteBrokerOffsetStore implements OffsetStore {
//......
/**
* Update the Consumer Offset synchronously,once the Master is off,updated to Slave,* here need to be optimized.
*/
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq,long offset,boolean isOneway) throws RemotingException,MQBrokerException,InterruptedException,MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
findBrokerResult.getBrokerAddr(),requestHeader,1000 * 5);
} else {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
findBrokerResult.getBrokerAddr(),1000 * 5);
}
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist",null);
}
}
//......
}
複製程式碼
- RemoteBrokerOffsetStore的updateConsumeOffsetToBroker方法首先通過mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())獲取findBrokerResult
- 若返回null,則執行mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()),然後再執行mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())獲取findBrokerResult
- 之後對於findBrokerResult不為null的情況構建UpdateConsumerOffsetRequestHeader,然後執行mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway或者mQClientFactory.getMQClientAPIImpl().updateConsumerOffset
findBrokerAddressInAdmin
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
public class MQClientInstance {
//......
public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
String brokerAddr = null;
boolean slave = false;
boolean found = false;
HashMap<Long/* brokerId */,String/* address */> map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
for (Map.Entry<Long,String> entry : map.entrySet()) {
Long id = entry.getKey();
brokerAddr = entry.getValue();
if (brokerAddr != null) {
found = true;
if (MixAll.MASTER_ID == id) {
slave = false;
} else {
slave = true;
}
break;
}
} // end of for
}
if (found) {
return new FindBrokerResult(brokerAddr,slave,findBrokerVersion(brokerName,brokerAddr));
}
return null;
}
//......
}
複製程式碼
- findBrokerAddressInAdmin方法首先從brokerAddrTable獲取指定brokerName的brokerId及address的map,然後遍歷map,對於brokerAddr不為null的標記found為true,標記brokerId為MixAll.MASTER_ID的slave為false,否則為true,最後跳出迴圈;若found為true則構造FindBrokerResult返回,否則返回null
小結
- RemoteBrokerOffsetStore的updateConsumeOffsetToBroker方法首先通過mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())獲取findBrokerResult
- 若返回null,則執行mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()),然後再執行mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName())獲取findBrokerResult
- 之後對於findBrokerResult不為null的情況構建UpdateConsumerOffsetRequestHeader,然後執行mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway或者mQClientFactory.getMQClientAPIImpl().updateConsumerOffset