聊聊rocketmq的pullFromWhichNodeTable
阿新 • • 發佈:2020-06-24
序
本文主要研究一下rocketmq的pullFromWhichNodeTable
pullFromWhichNodeTable
rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
public class PullAPIWrapper {
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;
private final String consumerGroup;
private final boolean unitMode;
private ConcurrentMap<MessageQueue,AtomicLong/* brokerId */> pullFromWhichNodeTable =
new ConcurrentHashMap<MessageQueue,AtomicLong>(32);
private volatile boolean connectBrokerByUser = false ;
private volatile long defaultBrokerId = MixAll.MASTER_ID;
private Random random = new Random(System.currentTimeMillis());
private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
//......
public PullResult processPullResult(final MessageQueue mq,final PullResult pullResult,final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq,pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
if (this.hasHook()) {
FilterMessageContext filterMessageContext = new FilterMessageContext();
filterMessageContext.setUnitMode(unitMode);
filterMessageContext.setMsgList(msgListFilterAgain);
this.executeHook(filterMessageContext);
}
for (MessageExt msg : msgListFilterAgain) {
String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(traFlag)) {
msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
}
MessageAccessor.putProperty(msg,MessageConst.PROPERTY_MIN_OFFSET,Long.toString(pullResult.getMinOffset()));
MessageAccessor.putProperty(msg,MessageConst.PROPERTY_MAX_OFFSET,Long.toString(pullResult.getMaxOffset()));
}
pullResultExt.setMsgFoundList(msgListFilterAgain);
}
pullResultExt.setMessageBinary(null);
return pullResult;
}
public void updatePullFromWhichNode(final MessageQueue mq,final long brokerId) {
AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
if (null == suggest) {
this.pullFromWhichNodeTable.put(mq,new AtomicLong(brokerId));
} else {
suggest.set(brokerId);
}
}
public long recalculatePullFromWhichNode(final MessageQueue mq) {
if (this.isConnectBrokerByUser()) {
return this.defaultBrokerId;
}
AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
if (suggest != null) {
return suggest.get();
}
return MixAll.MASTER_ID;
}
public boolean isConnectBrokerByUser() {
return connectBrokerByUser;
}
//......
}
複製程式碼
- PullAPIWrapper定義了pullFromWhichNodeTable,其key為MessageQueue,value為AtomicLong型別的brokerId
- processPullResult方法會使用pullResultExt.getSuggestWhichBrokerId()來執行updatePullFromWhichNode;updatePullFromWhichNode會更新指定MessageQueue的brokerId
- recalculatePullFromWhichNode方法在isConnectBrokerByUser為true時直接返回defaultBrokerId(
MixAll.MASTER_ID
),否則從pullFromWhichNodeTable取對應的brokerId,取不到則返回MixAll.MASTER_ID
PullResultExt
rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java
public class PullResultExt extends PullResult {
private final long suggestWhichBrokerId;
private byte[] messageBinary;
public PullResultExt(PullStatus pullStatus,long nextBeginOffset,long minOffset,long maxOffset,List<MessageExt> msgFoundList,final long suggestWhichBrokerId,final byte[] messageBinary) {
super(pullStatus,nextBeginOffset,minOffset,maxOffset,msgFoundList);
this.suggestWhichBrokerId = suggestWhichBrokerId;
this.messageBinary = messageBinary;
}
public byte[] getMessageBinary() {
return messageBinary;
}
public void setMessageBinary(byte[] messageBinary) {
this.messageBinary = messageBinary;
}
public long getSuggestWhichBrokerId() {
return suggestWhichBrokerId;
}
}
複製程式碼
- PullResultExt定義了suggestWhichBrokerId屬性
processPullResponse
rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
public class MQClientAPIImpl {
private final static InternalLogger log = ClientLogger.getLog();
private static boolean sendSmartMsg =
Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg","true"));
//......
private PullResult processPullResponse(
final RemotingCommand response) throws MQBrokerException,RemotingCommandException {
PullStatus pullStatus = PullStatus.NO_NEW_MSG;
switch (response.getCode()) {
case ResponseCode.SUCCESS:
pullStatus = PullStatus.FOUND;
break;
case ResponseCode.PULL_NOT_FOUND:
pullStatus = PullStatus.NO_NEW_MSG;
break;
case ResponseCode.PULL_RETRY_IMMEDIATELY:
pullStatus = PullStatus.NO_MATCHED_MSG;
break;
case ResponseCode.PULL_OFFSET_MOVED:
pullStatus = PullStatus.OFFSET_ILLEGAL;
break;
default:
throw new MQBrokerException(response.getCode(),response.getRemark());
}
PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
return new PullResultExt(pullStatus,responseHeader.getNextBeginOffset(),responseHeader.getMinOffset(),responseHeader.getMaxOffset(),null,responseHeader.getSuggestWhichBrokerId(),response.getBody());
}
//......
}
複製程式碼
- processPullResponse方法會使用responseHeader.getSuggestWhichBrokerId()來建立PullResultExt並返回
PullMessageResponseHeader
rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
public class PullMessageResponseHeader implements CommandCustomHeader {
@CFNotNull
private Long suggestWhichBrokerId;
@CFNotNull
private Long nextBeginOffset;
@CFNotNull
private Long minOffset;
@CFNotNull
private Long maxOffset;
//......
}
複製程式碼
- PullMessageResponseHeader定義了suggestWhichBrokerId屬性
processRequest
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
public class PullMessageProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
private List<ConsumeMessageHook> consumeMessageHookList;
//......
private RemotingCommand processRequest(final Channel channel,RemotingCommand request,boolean brokerAllowSuspend)
throws RemotingCommandException {
//......
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
break;
case SLAVE:
if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
break;
}
if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
// consume too slow,redirect to another machine
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
}
// consume ok
else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
}
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
//......
}
//......
}
複製程式碼
- 當getMessageResult.isSuggestPullingFromSlave()則設定responseHeader的suggestWhichBrokerId為subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly(),否則設定為MixAll.MASTER_ID
whichBrokerWhenConsumeSlowly
rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java
public class SubscriptionGroupConfig {
private String groupName;
private boolean consumeEnable = true;
private boolean consumeFromMinEnable = true;
private boolean consumeBroadcastEnable = true;
private int retryQueueNums = 1;
private int retryMaxTimes = 16;
private long brokerId = MixAll.MASTER_ID;
private long whichBrokerWhenConsumeSlowly = 1;
private boolean notifyConsumerIdsChangedEnable = true;
//......
}
複製程式碼
- SubscriptionGroupConfig的whichBrokerWhenConsumeSlowly預設值為1,而MixAll.MASTER_ID則為0
小結
- PullAPIWrapper定義了pullFromWhichNodeTable,其key為MessageQueue,value為AtomicLong型別的brokerId
- processPullResult方法會使用pullResultExt.getSuggestWhichBrokerId()來執行updatePullFromWhichNode;updatePullFromWhichNode會更新指定MessageQueue的brokerId
- recalculatePullFromWhichNode方法在isConnectBrokerByUser為true時直接返回defaultBrokerId(
MixAll.MASTER_ID
),否則從pullFromWhichNodeTable取對應的brokerId,取不到則返回MixAll.MASTER_ID