RocketMQ(六)發訊息的時候選擇queue的演算法有哪些?
一、說明
分為兩種,一種是直接發訊息,client內部有選擇queue的演算法,不允許外界改變。還有一種是可以自定義queue的選擇演算法(內建了三種演算法,不喜歡的話可以自定義演算法實現)。
public class org.apache.rocketmq.client.producer.DefaultMQProducer { // 只發送訊息,queue的選擇由預設的演算法來實現 @Override public SendResult send(Collection<Message> msgs) {} // 自定義選擇queue的演算法進行發訊息@Override public SendResult send(Collection<Message> msgs, MessageQueue messageQueue) {} }
二、原始碼
1、send(msg, mq)
1.1、使用場景
有時候我們不希望預設的queue選擇演算法,而是需要自定義,一般最常用的場景在順序訊息,順序訊息的傳送一般都會指定某組特徵的訊息都發當同一個queue裡,這樣才能保證順序,因為單queue是有序的。
1.2、原理剖析
內建了三種演算法,三種演算法都實現了一個共同的介面:
org.apache.rocketmq.client.producer.MessageQueueSelector
-
SelectMessageQueueByRandom
-
SelectMessageQueueByHash
-
SelectMessageQueueByMachineRoom
-
要想自定義邏輯的話,直接實現介面重寫select方法即可。
很典型的策略模式,不同演算法不同實現類,有個頂層介面。
1.2.1、SelectMessageQueueByRandom
public class SelectMessageQueueByRandom implements MessageQueueSelector { private Random random = new Random(System.currentTimeMillis()); @Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // mqs.size():佇列的個數。假設佇列個數是4,那麼這個value就是0-3之間隨機。 int value = random.nextInt(mqs.size()); return mqs.get(value); } }
so easy,就是純隨機。
mqs.size():佇列的個數。假設佇列個數是4,那麼這個value就是0-3之間隨機。
1.2.2、SelectMessageQueueByHash
public class SelectMessageQueueByHash implements MessageQueueSelector { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = arg.hashCode(); // 防止出現負數,取個絕對值,這也是我們平時開發中需要注意到的點 if (value < 0) { value = Math.abs(value); } // 直接取餘佇列個數。 value = value % mqs.size(); return mqs.get(value); } }
so easy,就是純取餘。
mqs.size():佇列的個數。假設佇列個數是4,且value的hashcode是3,那麼3 % 4 = 3,那麼就是最後一個佇列,也就是四號佇列,因為下標從0開始。
1.2.3、SelectMessageQueueByMachineRoom
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector { private Set<String> consumeridcs; @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return null; } public Set<String> getConsumeridcs() { return consumeridcs; } public void setConsumeridcs(Set<String> consumeridcs) { this.consumeridcs = consumeridcs; } }
沒看懂有啥鳥用,直接return null; 所以如果有自定義需求的話直接自定義就好了,這玩意沒看出有啥卵用。
1.2.4、自定義演算法
public class MySelectMessageQueue implements MessageQueueSelector { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return mqs.get(0); } }
永遠都選擇0號佇列,也就是第一個佇列。只是舉個例子,實際看你業務需求。
1.3、呼叫鏈
org.apache.rocketmq.client.producer.DefaultMQProducer#send(Message msg, MessageQueueSelector selector, Object arg) -> org.apache.rocketmq.client.producer.DefaultMQProducer#send(Message msg, MessageQueueSelector selector, Object arg) -> org.apache.rocketmq.client.producer.DefaultMQProducer#send(Message msg, MessageQueueSelector selector, Object arg, long timeout) -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl(xxx) -> mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg)); -> selector.select(messageQueueList, userMessage, arg) -> org.apache.rocketmq.client.producer.MessageQueueSelector#select(final List<MessageQueue> mqs, final Message msg, final Object arg)
2、send(msg)
2.1、使用場景
一般沒特殊需求的場景都用這個。因為他預設的queue選擇演算法很不錯,各種優化場景都替我們想到了。
2.2、原理剖析
// {@link org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl} // 這是傳送訊息核心原理,不清楚的看我之前發訊息原始碼分析的文章 // 選擇訊息要傳送的佇列 MessageQueue mq = null; for (int times = 0; times < 3; times++) { // 首次肯定是null String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 呼叫下面的方法進行選擇queue MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { // 給mq賦值,如果首次失敗了,那麼下次重試的時候(也就是下次for的時候),mq就有值了。 mq = mqSelected; ...... // 很關鍵,能解答下面會提到的兩個問題: // 1.faultItemTable是什麼時候放進去的? // 2.isAvailable() 為什麼只是判斷一個時間就可以知道Broker是否可用? this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); } }
選擇queue的主入口
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // 預設為false,代表不啟用broker故障延遲 if (this.sendLatencyFaultEnable) { try { // 隨機數且+1 int index = tpInfo.getSendWhichQueue().getAndIncrement(); // 遍歷 for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { // 先(隨機數 +1) % queue.size() int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) { pos = 0; } MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 看找到的這個queue所屬的broker是不是可用的 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { // 非失敗重試,直接返回到的佇列 // 失敗重試的情況,如果和選擇的佇列是上次重試是一樣的,則返回 // 也就是說如果你這個queue所在的broker可用, // 且不是重試進來的或失敗重試的情況,如果和選擇的佇列是上次重試是一樣的,那你就是天選之子了。 if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) { return mq; } } } // 如果所有佇列都不可用,那麼選擇一個相對好的broker,不考慮可用性的訊息佇列 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } // 隨機選擇一個queue return tpInfo.selectOneMessageQueue(); } // 當sendLatencyFaultEnable=false的時候選擇queue的方法,預設就是false。 return tpInfo.selectOneMessageQueue(lastBrokerName); }
2.2.1、不啟用broker故障延遲
既然sendLatencyFaultEnable預設是false,那就先看當sendLatencyFaultEnable=false時候的邏輯
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { // 第一次就是null,第二次(也就是重試的時候)就不是null了。 if (lastBrokerName == null) { // 第一次選擇佇列的邏輯 return selectOneMessageQueue(); } else { // 第一次選擇佇列傳送訊息失敗了,第二次重試的時候選擇佇列的邏輯 int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); // 過濾掉上次傳送訊息失敗的佇列 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }
那就繼續看第一次選擇佇列的邏輯:
public MessageQueue selectOneMessageQueue() { // 當前執行緒有個ThreadLocal變數,存放了一個隨機數 {@link org.apache.rocketmq.client.common.ThreadLocalIndex#getAndIncrement} // 然後取出隨機數根據佇列長度取模且將隨機數+1 int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) { pos = 0; } return this.messageQueueList.get(pos); }
好吧,其實也有點隨機一個的意思。但是亮點在於取出隨機數根據佇列長度取模且將隨機數+1,這個+1亮了(getAndIncrement cas +1)。
當訊息第一次傳送失敗時,lastBrokerName會存放當前選擇失敗的broker(mq = mqSelected),通過重試,此時lastBrokerName有值,代表上次選擇的boker傳送失敗,則重新對sendWhichQueue本地執行緒變數+1,遍歷選擇訊息佇列,直到不是上次的broker,也就是為了規避上次傳送失敗的broker的邏輯所在。
舉個例子:你這次隨機數是1,佇列長度是4,1%4=1,這時候失敗了,進入重試,那麼重試之前,也就是在上一步1%4之後,他把1進行了++操作,變成了2,那麼你這次重試的時候就是2%4=2,直接過濾掉了剛才失敗的broker。
那就繼續看第二次重試選擇佇列的邏輯:
// +1 int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { // 取模 int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); // 過濾掉上次傳送訊息失敗的佇列 if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } // 沒找到能用的queue的話繼續走預設的那個 return selectOneMessageQueue();
so easy,你上次不是失敗了,進入我這裡重試來了嗎?我也很簡單,我就還是取出隨機數+1然後取模佇列長度,我看這個broker是不是上次失敗的那個,是他小子的話就過濾掉,繼續遍歷queue找下一個能用的。
2.2.2、啟用broker故障延遲
也就是下面if裡的邏輯
if (this.sendLatencyFaultEnable) { .... }
看上面的註釋就行了,很清晰了,就是我先(隨機數 +1) % queue.size()
,然後看你這個queue所屬的broker是否可用,可用的話且不是重試進來的或失敗重試的情況,如果和選擇的佇列是上次重試是一樣的,那直接return你就完事了。那麼怎麼看broker是否可用的呢?
// {@link org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#isAvailable(String)} public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { return faultItem.isAvailable(); } return true; } // {@link org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl.FaultItem#isAvailable()} public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; }
疑問:
- faultItemTable是什麼時候放進去的?
- isAvailable() 為什麼只是判斷一個時間就可以知道Broker是否可用?
這就需要上面傳送訊息完成後所呼叫的這個方法了:
// {@link org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#updateFaultItem} // 傳送開始時間 beginTimestampPrev = System.currentTimeMillis(); // 進行傳送 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); // 傳送結束時間 endTimestamp = System.currentTimeMillis(); // 更新broker的延遲情況 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
細節邏輯如下:
// {@link org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem} public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { // 首次isolation傳入的是false,currentLatency是傳送訊息所耗費的時間,如下 // this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; // 根據延遲時間對比MQFaultStrategy中的延遲級別陣列latencyMax 不可用時長陣列notAvailableDuration 來將該broker加進faultItemTable中。 private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { // 假設currentLatency花費了10ms,那麼latencyMax裡的資料顯然不符合下面的所有判斷,所以直接return 0; if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; } // {@link org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#updateFaultItem()} @Override // 其實主要就是給startTimestamp賦值為當前時間+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的結果,給isAvailable()所用 // 也就是說只有notAvailableDuration == 0的時候,isAvailable()才會返回true。 public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); // 給startTimestamp賦值為當前時間+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的結果,給isAvailable()所用 faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); // 給startTimestamp賦值為當前時間+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的結果,給isAvailable()所用 old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); // 給startTimestamp賦值為當前時間+computeNotAvailableDuration(isolation ? 30000 : currentLatency);的結果,給isAvailable()所用 old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } }
下面這兩句程式碼詳細解釋下:
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
latencyMax | notAvailableDuration |
---|---|
50L | 0L |
100L | 0L |
550L | 30000L |
1000L | 60000L |
2000L | 120000L |
3000L | 180000L |
15000L | 600000L |
即
- currentLatency大於等於50小於100,則notAvailableDuration為0
- currentLatency大於等於100小於550,則notAvailableDuration為0
- currentLatency大於等於550小於1000,則notAvailableDuration為300000
- …等等
再來舉個例子:
假設isolation傳入true,
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
那麼notAvailableDuration將傳入600000L。結合isAvailable方法,大概流程如下:
RocketMQ為每個Broker預測了個可用時間(當前時間+notAvailableDuration),噹噹前時間大於該時間,才代表Broker可用,而notAvailableDuration有6個級別和latencyMax的區間一一對應,根據傳入的currentLatency去預測該Broker在什麼時候可用。
所以再來看這個
public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; }
根據執行時間來看落入哪個區間,在0~100的時間內notAvailableDuration都是0,都是可用的,大於該值後,可用的時間就會開始變大了,就認為不是最優解,直接捨棄。
2.3、呼叫鏈
org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message) -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message) -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message, long) -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl(xxx) -> MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); -> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue(xxx) org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName)
2.4、總結
- 在不開啟容錯的情況下,輪詢佇列進行傳送,如果失敗了,重試的時候過濾失敗的Broker
- 如果開啟了容錯策略,會通過RocketMQ的預測機制來預測一個Broker是否可用
- 如果上次失敗的Broker可用那麼還是會選擇該Broker的佇列
- 如果上述情況失敗,則隨機選擇一個進行傳送
- 在傳送訊息的時候會記錄一下呼叫的時間與是否報錯,根據該時間去預測broker的可用時間
三、總結
1、疑問
他搞了兩個過載send()方法,一個支援演算法選擇器,一個不支援演算法選擇,queue的演算法選擇是個典型的策略模式。為什麼send(message)
方法內建的queue選擇演算法不抽出到單獨的類中,然後此類實現org.apache.rocketmq.client.producer.MessageQueueSelector
介面呢?比如叫:SelectMessageQueueByBest
,比如如下:
public class org.apache.rocketmq.client.producer.DefaultMQProducer { // 只發送訊息,queue的選擇由預設的演算法來實現 @Override public SendResult send(Collection<Message> msgs) { this.send(msgs, new SelectMessageQueueByBest().select(xxx)); } // 自定義選擇queue的演算法進行發訊息 @Override public SendResult send(Collection<Message> msgs, MessageQueue messageQueue) {} }
我猜測可能是這個演算法過於複雜,與其它類的互動也過於多,引數也可能和內建的其他三個不同,所以沒搞到一起,但是還是搞到一起規範呀,乾的同一件事,只是演算法不同,很典型的策略模式。
2、面試
問:發訊息的時候選擇queue的演算法有哪些?
答:分為兩種,一種是直接發訊息,不能選擇queue,這種的queue選擇演算法如下:
- 在不開啟容錯的情況下,輪詢佇列進行傳送,如果失敗了,重試的時候過濾失敗的Broker
- 如果開啟了容錯策略,會通過RocketMQ的預測機制來預測一個Broker是否可用
- 如果上次失敗的Broker可用那麼還是會選擇該Broker的佇列
- 如果上述情況失敗,則隨機選擇一個進行傳送
- 在傳送訊息的時候會記錄一下呼叫的時間與是否報錯,根據該時間去預測broker的可用時間
另外一種是發訊息的時候可以選擇演算法甚至還可以實現介面自定義演算法:
SelectMessageQueueByRandom
:隨機SelectMessageQueueByHash
:hashSelectMessageQueueByMachineRoom
- 實現介面自定義
org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
->
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message)
->
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message,long)
->
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl(xxx)
->
MessageQueuemqSelected=this.selectOneMessageQueue(topicPublishInfo,lastBrokerName);
->
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue(xxx)
org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue(finalTopicPublishInfotpInfo,finalStringlastBrokerName)