RocketMQ原理學習--死信訊息實現原理
阿新 • • 發佈:2018-12-31
上一篇部落格《RocketMQ原理學習--失敗訊息實現原理》中我們瞭解到RocketMQ對於失敗訊息的處理原理,當訊息一直失敗的情況下RocketMQ是如何處理的,這篇部落格我們通過分析原始碼簡單瞭解一下。
RocketMQ對於失敗次數超過16次的訊息設定為死信訊息,訊息最終被放到DLQ死信佇列中,需要人工進行干預處理。處理程式碼還是在SendMessageProcessor的consumerSendMsgBack方法中,簡單來說就是判斷重試次數超過16或者延時級別小於0,則將訊息設定為新的死信topic為:%DLQ%+consumerGroup
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ConsumerSendMsgBackRequestHeader requestHeader = (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); //省略部分程式碼 //最大重試次數為16 if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { //死信佇列 %DLQ%+consumerGroup newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0 ); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); return response; } } else { if (0 == delayLevel) { delayLevel = 3 + msgExt.getReconsumeTimes(); } msgExt.setDelayTimeLevel(delayLevel); } MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(newTopic); msgInner.setBody(msgExt.getBody()); msgInner.setFlag(msgExt.getFlag()); MessageAccessor.setProperties(msgInner, msgExt.getProperties()); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags())); msgInner.setQueueId(queueIdInt); msgInner.setSysFlag(msgExt.getSysFlag()); msgInner.setBornTimestamp(msgExt.getBornTimestamp()); msgInner.setBornHost(msgExt.getBornHost()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1); String originMsgId = MessageAccessor.getOriginMessageId(msgExt); MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId); //訊息被持久化到死信佇列中 PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); //省略部分程式碼 return response; }
總結:死信訊息需要人為進行處理干預,可以通過RocketMQ控制檯等重新發送