RocketMQ訊息的傳送與接收
阿新 • • 發佈:2020-06-24
1、概述
Producer 傳送訊息。主要是同步傳送訊息原始碼,涉及到 非同步/Oneway傳送訊息,事務訊息會跳過。
Broker 接收訊息。
Producer 傳送訊息
DefaultMQProducer#send(Message)
/**
* Send message in synchronous mode. This method returns only when the sending procedure totally completes. </p>
*
* <strong>Warn:</strong> this method has internal retry-mechanism,that is,internal implementation will retry
* {@link #retryTimesWhenSendFailed} times before claiming failure. As a result,multiple messages may potentially
* delivered to broker(s). It's up to the application developers to resolve potential duplication issue.
*
* @param msg Message to send.
* @return {@link SendResult} instance to inform senders details of the deliverable,say Message ID of the message,* {@link SendStatus} indicating broker storage/replication status,message queue sent to,etc.
* @throws MQClientException if there is any client error.
* @throws RemotingException if there is any network-tier error.
* @throws MQBrokerException if there is any error with broker.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Override
public SendResult send(
Message msg) throws MQClientException,RemotingException,MQBrokerException,InterruptedException {
Validators.checkMessage(msg,this);
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg);
}
複製程式碼
DefaultMQProducerImpl#sendDefaultImpl()
說明 :傳送訊息。步驟:獲取訊息路由資訊,選擇要傳送到的訊息佇列,執行訊息傳送核心方法,並對傳送結果進行封裝返回。
private SendResult sendDefaultImpl(
Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout
) throws MQClientException,InterruptedException {
// 校驗Producer處於執行狀態
this.makeSureStateOK();
// 校驗訊息格式
Validators.checkMessage(msg,this.defaultMQProducer);
// 呼叫編號;用於下面列印日誌,標記為同一次傳送訊息
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 獲取 Topic路由資訊
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
// 最後選擇訊息要傳送到的佇列
MessageQueue mq = null;
Exception exception = null;
// 最後一次傳送結果
SendResult sendResult = null;
// 同步多次呼叫
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 選擇訊息要傳送到的佇列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo,lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
// 呼叫傳送訊息核心方法
sendResult = this.sendKernelImpl(msg,mq,communicationMode,sendCallback,topicPublishInfo,timeout - costTime);
endTimestamp = System.currentTimeMillis();
// 更新Broker可用性資訊
this.updateFaultItem(mq.getBrokerName(),endTimestamp - beginTimestampPrev,false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
// 同步傳送成功但儲存有問題時 && 配置儲存異常時重新傳送開關 時,進行重試
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
// 列印異常,更新Broker可用性資訊,更新繼續迴圈
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(),true);
log.warn(String.format("sendKernelImpl exception,resend at once,InvokeID: %s,RT: %sms,Broker: %s",invokeID,mq),e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
// 列印異常,更新Broker可用性資訊,更新繼續迴圈
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(),e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
// 列印異常,更新Broker可用性資訊,部分情況下的異常,直接返回,結束迴圈
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(),e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
// 如下異常continue,進行傳送訊息重試
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
// 如果有傳送結果,進行返回,否則,丟擲異常;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
// 列印異常,更新Broker可用性資訊,更新繼續迴圈
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(),false);
log.warn(String.format("sendKernelImpl exception,throw exception,e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception",e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
// 返回傳送結果
if (sendResult != null) {
return sendResult;
}
// 根據不同情況,丟擲不同的異常
String info = String.format("Send [%d] times,still failed,cost [%d]ms,Topic: %s,BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info,exception);
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
// Namesrv找不到異常
validateNameServerSetting();
// 訊息路由找不到異常
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
複製程式碼
DefaultMQProducerImpl#tryToFindTopicPublishInfo()
說明 :獲得 Topic釋出資訊。優先從快取topicPublishInfoTable,其次從Namesrv中獲得。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 快取中獲取 Topic釋出資訊
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 當無可用的 Topic釋出資訊時,從Namesrv獲取一次
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic,new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// 若獲取的 Topic釋出資訊時候可用,則返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// 使用 {@link DefaultMQProducer#createTopicKey} 對應的 Topic釋出資訊。用於 Topic釋出資訊不存在 && Broker支援自動建立Topic
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic,true,this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
複製程式碼
MQFaultStrategy
說明 :Producer訊息傳送容錯策略。預設情況下容錯策略關閉,即sendLatencyFaultEnable=false。
| Producer傳送訊息消耗時長 | Broker不可用時長 |
| — | — |
| >= 15000 ms | 600 1000 ms |
| >= 3000 ms | 180 1000 ms |
| >= 2000 ms | 120 1000 ms |
| >= 1000 ms | 60 1000 ms |
| >= 550 ms | 30 * 1000 ms |
| >= 100 ms | 0 ms |
| >= 50 ms | 0 ms |
複製程式碼
public class MQFaultStrategy {
private final static InternalLogger log = ClientLogger.getLog();
/**
* 延遲故障容錯,維護每個Broker的傳送訊息的延遲
* key:brokerName
*/
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
/**
* 傳送訊息延遲容錯開關
*/
private boolean sendLatencyFaultEnable = false;
/**
* 延遲級別陣列
*/
private long[] latencyMax = {50L,100L,550L,1000L,2000L,3000L,15000L};
/**
* 不可用時長陣列
*/
private long[] notAvailableDuration = {0L,0L,30000L,60000L,120000L,180000L,600000L};
public long[] getNotAvailableDuration() {
return notAvailableDuration;
}
public void setNotAvailableDuration(final long[] notAvailableDuration) {
this.notAvailableDuration = notAvailableDuration;
}
public long[] getLatencyMax() {
return latencyMax;
}
public void setLatencyMax(final long[] latencyMax) {
this.latencyMax = latencyMax;
}
public boolean isSendLatencyFaultEnable() {
return sendLatencyFaultEnable;
}
public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
this.sendLatencyFaultEnable = sendLatencyFaultEnable;
}
/**
* 根據 Topic釋出資訊 選擇一個訊息佇列
*
* @param tpInfo Topic釋出資訊
* @param lastBrokerName brokerName
* @return 訊息佇列
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo,final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
// 獲取 brokerName=lastBrokerName && 可用的一個訊息佇列
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0) {
pos = 0;
}
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
}
// 選擇一個相對好的broker,並獲得其對應的一個訊息佇列,不考慮該佇列的可用性
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue",e);
}
// 選擇一個訊息佇列,不考慮佇列的可用性
return tpInfo.selectOneMessageQueue();
}
// 獲得 lastBrokerName 對應的一個訊息佇列,不考慮該佇列的可用性
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
/**
* 更新延遲容錯資訊
* @param brokerName brokerName
* @param currentLatency 延遲
* @param isolation 是否隔離。當開啟隔離時,預設延遲為30000。目前主要用於傳送訊息異常時
*/
public void updateFaultItem(final String brokerName,final long currentLatency,boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName,currentLatency,duration);
}
}
/**
* 計算延遲對應的不可用時間
* @param currentLatency 延遲
* @return 不可用時間
*/
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i]) {
return this.notAvailableDuration[i];
}
}
return 0;
}
}
複製程式碼
LatencyFaultTolerance
延遲故障容錯介面
public interface LatencyFaultTolerance<T> {
/**
* 更新物件的延遲和不可用時長
*
* @param name 物件
* @param currentLatency 延遲時間
* @param notAvailableDuration 不可用時長
*/
void updateFaultItem(final T name,final long notAvailableDuration);
/**
* 判斷物件是否可用
*
* @param name broker名稱
* @return 是否可用
*/
boolean isAvailable(final T name);
/**
* 移除物件
*
* @param name 移除物件
*/
void remove(final T name);
/**
* 獲取一個物件
*
* @return 物件
*/
T pickOneAtLeast();
}
複製程式碼
LatencyFaultToleranceImpl
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
/**
* 物件故障資訊Table
*/
private final ConcurrentHashMap<String,FaultItem> faultItemTable = new ConcurrentHashMap<String,FaultItem>(16);
/**
* 物件選擇Index
*/
private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();
@Override
public void updateFaultItem(final String name,final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name,faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
@Override
public boolean isAvailable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
return faultItem.isAvailable();
}
return true;
}
@Override
public void remove(final String name) {
this.faultItemTable.remove(name);
}
@Override
public String pickOneAtLeast() {
final Enumeration<FaultItem> elements = this.faultItemTable.elements();
List<FaultItem> tmpList = new LinkedList<FaultItem>();
while (elements.hasMoreElements()) {
final FaultItem faultItem = elements.nextElement();
tmpList.add(faultItem);
}
if (!tmpList.isEmpty()) {
// 打亂 + 排序。TODO 疑問:應該只能二選一。猜測Collections.shuffle(tmpList)去掉。
Collections.shuffle(tmpList);
Collections.sort(tmpList);
// 選擇順序在前一半的物件
final int half = tmpList.size() / 2;
if (half <= 0) {
return tmpList.get(0).getName();
} else {
final int i = this.whichItemWorst.getAndIncrement() % half;
return tmpList.get(i).getName();
}
}
return null;
}
@Override
public String toString() {
return "LatencyFaultToleranceImpl{" +
"faultItemTable=" + faultItemTable +
",whichItemWorst=" + whichItemWorst +
'}';
}
class FaultItem implements Comparable<FaultItem> {
private final String name;
private volatile long currentLatency;
private volatile long startTimestamp;
public FaultItem(final String name) {
this.name = name;
}
@Override
public int compareTo(final FaultItem other) {
if (this.isAvailable() != other.isAvailable()) {
if (this.isAvailable()) {
return -1;
}
if (other.isAvailable()) {
return 1;
}
}
if (this.currentLatency < other.currentLatency) {
return -1;
} else if (this.currentLatency > other.currentLatency) {
return 1;
}
if (this.startTimestamp < other.startTimestamp) {
return -1;
} else if (this.startTimestamp > other.startTimestamp) {
return 1;
}
return 0;
}
public boolean isAvailable() {
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
@Override
public int hashCode() {
int result = getName() != null ? getName().hashCode() : 0;
result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));
result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));
return result;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof FaultItem)) {
return false;
}
final FaultItem faultItem = (FaultItem) o;
if (getCurrentLatency() != faultItem.getCurrentLatency()) {
return false;
}
if (getStartTimestamp() != faultItem.getStartTimestamp()) {
return false;
}
return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;
}
@Override
public String toString() {
return "FaultItem{" +
"name='" + name + '\'' +
",currentLatency=" + currentLatency +
",startTimestamp=" + startTimestamp +
'}';
}
public String getName() {
return name;
}
public long getCurrentLatency() {
return currentLatency;
}
public void setCurrentLatency(final long currentLatency) {
this.currentLatency = currentLatency;
}
public long getStartTimestamp() {
return startTimestamp;
}
public void setStartTimestamp(final long startTimestamp) {
this.startTimestamp = startTimestamp;
}
}
}
複製程式碼
FaultItem
說明 :物件故障資訊。維護物件的名字、延遲、開始可用的時間。
class FaultItem implements Comparable<FaultItem> {
/**
* 物件名
*/
private final String name;
/**
* 延遲
*/
private volatile long currentLatency;
/**
* 開始可用時間
*/
private volatile long startTimestamp;
public FaultItem(final String name) {
this.name = name;
}
/**
* 可用性 > 延遲 > 開始可用時間
*
* @param other 另一物件
* @return 升序
*/
@Override
public int compareTo(final FaultItem other) {
if (this.isAvailable() != other.isAvailable()) {
if (this.isAvailable()) {
return -1;
}
if (other.isAvailable()) {
return 1;
}
}
if (this.currentLatency < other.currentLatency) {
return -1;
} else if (this.currentLatency > other.currentLatency) {
return 1;
}
if (this.startTimestamp < other.startTimestamp) {
return -1;
} else if (this.startTimestamp > other.startTimestamp) {
return 1;
}
return 0;
}
public boolean isAvailable() {
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
@Override
public int hashCode() {
int result = getName() != null ? getName().hashCode() : 0;
result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));
result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));
return result;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (!(o instanceof FaultItem)) {
return false;
}
final FaultItem faultItem = (FaultItem) o;
if (getCurrentLatency() != faultItem.getCurrentLatency()) {
return false;
}
if (getStartTimestamp() != faultItem.getStartTimestamp()) {
return false;
}
return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;
}
@Override
public String toString() {
return "FaultItem{" +
"name='" + name + '\'' +
",currentLatency=" + currentLatency +
",startTimestamp=" + startTimestamp +
'}';
}
public String getName() {
return name;
}
public long getCurrentLatency() {
return currentLatency;
}
public void setCurrentLatency(final long currentLatency) {
this.currentLatency = currentLatency;
}
public long getStartTimestamp() {
return startTimestamp;
}
public void setStartTimestamp(final long startTimestamp) {
this.startTimestamp = startTimestamp;
}
}
複製程式碼
DefaultMQProducerImpl#sendKernelImpl()
說明 :傳送訊息核心方法。該方法真正發起網路請求,傳送訊息給 Broker。
private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException,InterruptedException {
long beginStartTime = System.currentTimeMillis();
// 獲取 broker地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
// 是否使用broker vip通道。broker會開啟兩個埠對外服務。
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(),brokerAddr);
byte[] prevBody = msg.getBody();
try {
// for MessageBatch,ID has been set in the generating process
// 設定唯一編號
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
int sysFlag = 0;
boolean msgBodyCompressed = false;
// 訊息壓縮
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
// 事務
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
// hook:傳送訊息校驗
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
// hook:傳送訊息前邏輯
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
// 構建傳送訊息請求
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg,MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg,MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
// 傳送訊息
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed) {
//If msg body was compressed,msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(),this.defaultMQProducer.getNamespace()));
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,msg,timeout - costTimeSync,this);
break;
default:
assert false;
break;
}
// hook:傳送訊息後邏輯
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
// 返回傳送結果
return sendResult;
} catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(),this.defaultMQProducer.getNamespace()));
}
}
// broker為空丟擲異常
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist",null);
}
複製程式碼
3、Broker 接收訊息
SendMessageProcessor#sendMessage
#processRequest() 說明 :處理訊息請求。
#sendMessage() 說明 :傳送訊息,並返回傳送訊息結果。
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License,Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,software
* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.broker.processor;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private List<ConsumeMessageHook> consumeMessageHookList;
public SendMessageProcessor(final BrokerController brokerController) {
super(brokerController);
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx,request);
default:
// 解析請求
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}
// 傳送請求Context。在 hook 場景下使用
mqtraceContext = buildMsgContext(ctx,requestHeader);
// hook:處理髮送訊息前邏輯
this.executeSendMessageHookBefore(ctx,request,mqtraceContext);
RemotingCommand response;
// 處理髮送訊息邏輯
if (requestHeader.isBatch()) {
response = this.sendBatchMessage(ctx,mqtraceContext,requestHeader);
} else {
response = this.sendMessage(ctx,requestHeader);
}
// hook:處理髮送訊息後邏輯
this.executeSendMessageHookAfter(response,mqtraceContext);
return response;
}
}
@Override
public boolean rejectRequest() {
return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
this.brokerController.getMessageStore().isTransientStorePoolDeficient();
}
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx,final RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
ConsumeMessageContext context = new ConsumeMessageContext();
context.setNamespace(namespace);
context.setConsumerGroup(requestHeader.getGroup());
context.setTopic(requestHeader.getOriginTopic());
context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
context.setCommercialRcvTimes(1);
context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));
this.executeConsumeMessageHookAfter(context);
}
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist," + requestHeader.getGroup() + " "
+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return response;
}
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
return response;
}
if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false,true);
}
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ,topicSysFlag);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
if (!PermName.isWriteable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the topic[%s] sending message is forbidden",newTopic));
return response;
}
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed," + requestHeader.getOffset());
return response;
}
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt,MessageConst.PROPERTY_RETRY_TOPIC,msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);
int delayLevel = requestHeader.getDelayLevel();
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,DLQ_NUMS_PER_GROUP,PermName.PERM_WRITE,0
);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
} else {
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner,msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null,msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner,UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
if (putMessageResult != null) {
switch (putMessageResult.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(),backTopic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
default:
break;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(putMessageResult.getPutMessageStatus().name());
return response;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
}
private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader,RemotingCommand response,RemotingCommand request,MessageExt msg,TopicConfig topicConfig) {
String newTopic = requestHeader.getTopic();
if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
// 獲取訂閱分組配置
String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(
"subscription group not exist," + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return false;
}
// 計算最大可消費次數
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
// 超過最大消費次數
if (reconsumeTimes >= maxReconsumeTimes) {
newTopic = MixAll.getDLQTopic(groupName);
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,0
);
msg.setTopic(newTopic);
msg.setQueueId(queueIdInt);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return false;
}
}
}
int sysFlag = requestHeader.getSysFlag();
if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
}
msg.setSysFlag(sysFlag);
return true;
}
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,final RemotingCommand request,final SendMessageContext sendMessageContext,final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
// 初始化響應
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION,this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH,String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
log.debug("receive SendMessage request command,{}",request);
// 如果未開始接收訊息,丟擲系統異常
final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service,until %s",UtilAll.timeMillisToHumanString2(startTimstamp)));
return response;
}
// 訊息配置(Topic配置)校驗
response.setCode(-1);
super.msgCheck(ctx,response);
if (response.getCode() != -1) {
return response;
}
final byte[] body = request.getBody();
// 如果佇列小於0,從可用佇列隨機選擇
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
// 對RETRY型別的訊息處理。如果超過最大消費次數,則topic修改成"%DLQ%" + 分組名,即加入 死信佇列(Dead Letter Queue)
if (!handleRetryAndDLQ(requestHeader,response,msgInner,topicConfig)) {
return response;
}
// 建立MessageExtBrokerInner
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner,MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner,MessageConst.PROPERTY_CLUSTER,clusterName);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult = null;
Map<String,String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 校驗是否不允許傳送事務訊息
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
return handlePutMessageResult(putMessageResult,responseHeader,sendMessageContext,ctx,queueIdInt);
}
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult,SendMessageResponseHeader responseHeader,SendMessageContext sendMessageContext,ChannelHandlerContext ctx,int queueIdInt) {
if (putMessageResult == null) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store putMessage return null");
return response;
}
boolean sendOK = false;
switch (putMessageResult.getPutMessageStatus()) {
// Success
case PUT_OK:
sendOK = true;
response.setCode(ResponseCode.SUCCESS);
break;
case FLUSH_DISK_TIMEOUT:
response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
sendOK = true;
break;
case FLUSH_SLAVE_TIMEOUT:
response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
sendOK = true;
break;
case SLAVE_NOT_AVAILABLE:
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
sendOK = true;
break;
// Failed
case CREATE_MAPEDFILE_FAILED:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("create mapped file failed,server is busy or broken.");
break;
case MESSAGE_ILLEGAL:
case PROPERTIES_SIZE_EXCEEDED:
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark(
"the message is illegal,maybe msg body or properties length not matched. msg body length limit 128k,msg properties length limit 32k.");
break;
case SERVICE_NOT_AVAILABLE:
response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
response.setRemark(
"service not available now,maybe disk full," + diskUtil() + ",maybe your broker machine memory too small.");
break;
case OS_PAGECACHE_BUSY:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("[PC_SYNCHRONIZED]broker busy,start flow control for a while");
break;
case UNKNOWN_ERROR:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UNKNOWN_ERROR");
break;
default:
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("UNKNOWN_ERROR DEFAULT");
break;
}
String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
if (sendOK) {
// 統計
this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(),putMessageResult.getAppendMessageResult().getMsgNum(),1);
this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),putMessageResult.getAppendMessageResult().getWroteBytes());
this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
// 響應
response.setRemark(null);
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
doResponse(ctx,response);
// hook:設定傳送成功到context
if (hasSendMessageHook()) {
sendMessageContext.setMsgId(responseHeader.getMsgId());
sendMessageContext.setQueueId(responseHeader.getQueueId());
sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
sendMessageContext.setCommercialSendTimes(incValue);
sendMessageContext.setCommercialSendSize(wroteSize);
sendMessageContext.setCommercialOwner(owner);
}
return null;
} else {
// hook:設定傳送失敗到context
if (hasSendMessageHook()) {
int wroteSize = request.getBody().length;
int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
sendMessageContext.setCommercialSendTimes(incValue);
sendMessageContext.setCommercialSendSize(wroteSize);
sendMessageContext.setCommercialOwner(owner);
}
}
return response;
}
private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION,String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
log.debug("Receive SendMessage request command {}",request);
final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service,UtilAll.timeMillisToHumanString2(startTimstamp)));
return response;
}
response.setCode(-1);
super.msgCheck(ctx,response);
if (response.getCode() != -1) {
return response;
}
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
}
if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark("message topic length too long " + requestHeader.getTopic().length());
return response;
}
if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark("batch request does not support retry group " + requestHeader.getTopic());
return response;
}
MessageExtBatch messageExtBatch = new MessageExtBatch();
messageExtBatch.setTopic(requestHeader.getTopic());
messageExtBatch.setQueueId(queueIdInt);
int sysFlag = requestHeader.getSysFlag();
if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
}
messageExtBatch.setSysFlag(sysFlag);
messageExtBatch.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(messageExtBatch,MessageDecoder.string2messageProperties(requestHeader.getProperties()));
messageExtBatch.setBody(request.getBody());
messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());
messageExtBatch.setBornHost(ctx.channel().remoteAddress());
messageExtBatch.setStoreHost(this.getStoreHost());
messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(messageExtBatch,clusterName);
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);
return handlePutMessageResult(putMessageResult,messageExtBatch,queueIdInt);
}
public boolean hasConsumeMessageHook() {
return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
}
public void executeConsumeMessageHookAfter(final ConsumeMessageContext context) {
if (hasConsumeMessageHook()) {
for (ConsumeMessageHook hook : this.consumeMessageHookList) {
try {
hook.consumeMessageAfter(context);
} catch (Throwable e) {
// Ignore
}
}
}
}
public SocketAddress getStoreHost() {
return storeHost;
}
private String diskUtil() {
String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
String storePathLogis =
StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);
String storePathIndex =
StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);
return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f",physicRatio,logisRatio,indexRatio);
}
public void registerConsumeMessageHook(List<ConsumeMessageHook> consumeMessageHookList) {
this.consumeMessageHookList = consumeMessageHookList;
}
}
複製程式碼
AbstractSendMessageProcessor#msgCheck
說明:校驗訊息是否正確,主要是Topic配置方面,例如:Broker 是否有寫入許可權,topic配置是否存在,佇列編號是否正確。
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,final SendMessageRequestHeader requestHeader,final RemotingCommand response) {
// 檢查 broker 是否有寫入許可權
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
&& this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending message is forbidden");
return response;
}
// 檢查topic是否可以被髮送。目前是{@link MixAll.DEFAULT_TOPIC}不被允許傳送
if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
log.warn(errorMsg);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorMsg);
return response;
}
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
// 不存在topicConfig,則進行建立
if (null == topicConfig) {
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicSysFlag = TopicSysFlag.buildSysFlag(false,true);
} else {
topicSysFlag = TopicSysFlag.buildSysFlag(true,false);
}
}
// 建立topic配置
log.warn("the topic {} not exist,producer: {}",requestHeader.getTopic(),ctx.channel().remoteAddress());
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),requestHeader.getDefaultTopic(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.getDefaultTopicQueueNums(),topicSysFlag);
// 如果沒配置
if (null == topicConfig) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicConfig =
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
requestHeader.getTopic(),1,topicSysFlag);
}
}
if (null == topicConfig) {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("topic[" + requestHeader.getTopic() + "] not exist,apply first please!"
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
}
// 佇列編號是否正確
int queueIdInt = requestHeader.getQueueId();
int idValid = Math.max(topicConfig.getWriteQueueNums(),topicConfig.getReadQueueNums());
if (queueIdInt >= idValid) {
String errorInfo = String.format("request queueId[%d] is illegal,%s Producer: %s",queueIdInt,topicConfig.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
}
return response;
}
複製程式碼
DefaultMessageStore#putMessage
說明:儲存訊息封裝,最終儲存需要 CommitLog 實現。
@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
if (this.shutdown) {
log.warn("message store has shutdown,so putMessage is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE,null);
}
// // 從節點不允許寫入
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is slave mode,so putMessage is forbidden ");
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE,null);
}
// store是否允許寫入
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is not writeable,so putMessage is forbidden " + this.runningFlags.getFlagBits());
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE,null);
} else {
this.printTimes.set(0);
}
// 訊息過長
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,null);
}
// 訊息附加屬性過長
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED,null);
}
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY,null);
}
long beginTime = this.getSystemClock().now();
// 新增訊息到commitLog
PutMessageResult result = this.commitLog.putMessage(msg);
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500) {
log.warn("putMessage not in lock elapsed time(ms)={},bodyLength={}",elapsedTime,msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
複製程式碼