1. 程式人生 > 程式設計 >聊聊rocketmq的pullFromWhichNodeTable

聊聊rocketmq的pullFromWhichNodeTable

本文主要研究一下rocketmq的pullFromWhichNodeTable

pullFromWhichNodeTable

rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java

public class PullAPIWrapper {
    private final InternalLogger log = ClientLogger.getLog();
    private final MQClientInstance mQClientFactory;
    private final String consumerGroup;
    private final boolean unitMode;
    private ConcurrentMap<MessageQueue,AtomicLong/* brokerId */> pullFromWhichNodeTable =
        new ConcurrentHashMap<MessageQueue,AtomicLong>(32);
    private volatile boolean connectBrokerByUser = false
; private volatile long defaultBrokerId = MixAll.MASTER_ID; private Random random = new Random(System.currentTimeMillis()); private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>(); //...... public PullResult processPullResult(final MessageQueue mq,final PullResult pullResult,final SubscriptionData subscriptionData) { PullResultExt pullResultExt = (PullResultExt) pullResult; this.updatePullFromWhichNode(mq,pullResultExt.getSuggestWhichBrokerId()); if
(PullStatus.FOUND == pullResult.getPullStatus()) { ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer); List<MessageExt> msgListFilterAgain = msgList; if
(!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { msgListFilterAgain = new ArrayList<MessageExt>(msgList.size()); for (MessageExt msg : msgList) { if (msg.getTags() != null) { if (subscriptionData.getTagsSet().contains(msg.getTags())) { msgListFilterAgain.add(msg); } } } } if (this.hasHook()) { FilterMessageContext filterMessageContext = new FilterMessageContext(); filterMessageContext.setUnitMode(unitMode); filterMessageContext.setMsgList(msgListFilterAgain); this.executeHook(filterMessageContext); } for (MessageExt msg : msgListFilterAgain) { String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (Boolean.parseBoolean(traFlag)) { msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); } MessageAccessor.putProperty(msg,MessageConst.PROPERTY_MIN_OFFSET,Long.toString(pullResult.getMinOffset())); MessageAccessor.putProperty(msg,MessageConst.PROPERTY_MAX_OFFSET,Long.toString(pullResult.getMaxOffset())); } pullResultExt.setMsgFoundList(msgListFilterAgain); } pullResultExt.setMessageBinary(null); return pullResult; } public void updatePullFromWhichNode(final MessageQueue mq,final long brokerId) { AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); if (null == suggest) { this.pullFromWhichNodeTable.put(mq,new AtomicLong(brokerId)); } else { suggest.set(brokerId); } } public long recalculatePullFromWhichNode(final MessageQueue mq) { if (this.isConnectBrokerByUser()) { return this.defaultBrokerId; } AtomicLong suggest = this.pullFromWhichNodeTable.get(mq); if (suggest != null) { return suggest.get(); } return MixAll.MASTER_ID; } public boolean isConnectBrokerByUser() { return connectBrokerByUser; } //...... } 複製程式碼
  • PullAPIWrapper定義了pullFromWhichNodeTable,其key為MessageQueue,value為AtomicLong型別的brokerId
  • processPullResult方法會使用pullResultExt.getSuggestWhichBrokerId()來執行updatePullFromWhichNode;updatePullFromWhichNode會更新指定MessageQueue的brokerId
  • recalculatePullFromWhichNode方法在isConnectBrokerByUser為true時直接返回defaultBrokerId(MixAll.MASTER_ID),否則從pullFromWhichNodeTable取對應的brokerId,取不到則返回MixAll.MASTER_ID

PullResultExt

rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java

public class PullResultExt extends PullResult {
    private final long suggestWhichBrokerId;
    private byte[] messageBinary;

    public PullResultExt(PullStatus pullStatus,long nextBeginOffset,long minOffset,long maxOffset,List<MessageExt> msgFoundList,final long suggestWhichBrokerId,final byte[] messageBinary) {
        super(pullStatus,nextBeginOffset,minOffset,maxOffset,msgFoundList);
        this.suggestWhichBrokerId = suggestWhichBrokerId;
        this.messageBinary = messageBinary;
    }

    public byte[] getMessageBinary() {
        return messageBinary;
    }

    public void setMessageBinary(byte[] messageBinary) {
        this.messageBinary = messageBinary;
    }

    public long getSuggestWhichBrokerId() {
        return suggestWhichBrokerId;
    }
}
複製程式碼
  • PullResultExt定義了suggestWhichBrokerId屬性

processPullResponse

rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

public class MQClientAPIImpl {

    private final static InternalLogger log = ClientLogger.getLog();
    private static boolean sendSmartMsg =
        Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg","true"));

    //......    

    private PullResult processPullResponse(
        final RemotingCommand response) throws MQBrokerException,RemotingCommandException {
        PullStatus pullStatus = PullStatus.NO_NEW_MSG;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS:
                pullStatus = PullStatus.FOUND;
                break;
            case ResponseCode.PULL_NOT_FOUND:
                pullStatus = PullStatus.NO_NEW_MSG;
                break;
            case ResponseCode.PULL_RETRY_IMMEDIATELY:
                pullStatus = PullStatus.NO_MATCHED_MSG;
                break;
            case ResponseCode.PULL_OFFSET_MOVED:
                pullStatus = PullStatus.OFFSET_ILLEGAL;
                break;

            default:
                throw new MQBrokerException(response.getCode(),response.getRemark());
        }

        PullMessageResponseHeader responseHeader =
            (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);

        return new PullResultExt(pullStatus,responseHeader.getNextBeginOffset(),responseHeader.getMinOffset(),responseHeader.getMaxOffset(),null,responseHeader.getSuggestWhichBrokerId(),response.getBody());
    }

    //......
}
複製程式碼
  • processPullResponse方法會使用responseHeader.getSuggestWhichBrokerId()來建立PullResultExt並返回

PullMessageResponseHeader

rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java

public class PullMessageResponseHeader implements CommandCustomHeader {
    @CFNotNull
    private Long suggestWhichBrokerId;
    @CFNotNull
    private Long nextBeginOffset;
    @CFNotNull
    private Long minOffset;
    @CFNotNull
    private Long maxOffset;

    //......
}
複製程式碼
  • PullMessageResponseHeader定義了suggestWhichBrokerId屬性

processRequest

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java

public class PullMessageProcessor implements NettyRequestProcessor {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private final BrokerController brokerController;
    private List<ConsumeMessageHook> consumeMessageHookList;

    //......

    private RemotingCommand processRequest(final Channel channel,RemotingCommand request,boolean brokerAllowSuspend)
        throws RemotingCommandException {

        //......

if (getMessageResult.isSuggestPullingFromSlave()) {
                responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
            } else {
                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
            }

            switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
                case ASYNC_MASTER:
                case SYNC_MASTER:
                    break;
                case SLAVE:
                    if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                        responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
                    }
                    break;
            }

            if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                // consume too slow,redirect to another machine
                if (getMessageResult.isSuggestPullingFromSlave()) {
                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
                }
                // consume ok
                else {
                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
                }
            } else {
                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
            }

        //......
    }

    //......
}
複製程式碼
  • 當getMessageResult.isSuggestPullingFromSlave()則設定responseHeader的suggestWhichBrokerId為subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly(),否則設定為MixAll.MASTER_ID

whichBrokerWhenConsumeSlowly

rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java

public class SubscriptionGroupConfig {

    private String groupName;

    private boolean consumeEnable = true;
    private boolean consumeFromMinEnable = true;

    private boolean consumeBroadcastEnable = true;

    private int retryQueueNums = 1;

    private int retryMaxTimes = 16;

    private long brokerId = MixAll.MASTER_ID;

    private long whichBrokerWhenConsumeSlowly = 1;

    private boolean notifyConsumerIdsChangedEnable = true;

    //......
}
複製程式碼
  • SubscriptionGroupConfig的whichBrokerWhenConsumeSlowly預設值為1,而MixAll.MASTER_ID則為0

小結

  • PullAPIWrapper定義了pullFromWhichNodeTable,其key為MessageQueue,value為AtomicLong型別的brokerId
  • processPullResult方法會使用pullResultExt.getSuggestWhichBrokerId()來執行updatePullFromWhichNode;updatePullFromWhichNode會更新指定MessageQueue的brokerId
  • recalculatePullFromWhichNode方法在isConnectBrokerByUser為true時直接返回defaultBrokerId(MixAll.MASTER_ID),否則從pullFromWhichNodeTable取對應的brokerId,取不到則返回MixAll.MASTER_ID

doc