RocketMQ讀書筆記2——生產者
【生產者的不同寫入策略】
生產者向消息隊列裏寫入數據,不同的業務需要生產者采用不同的寫入策略:
同步發送、異步發送、延遲發送、發送事務消息等。
【DefaultMQProduce示例】
public class ProducerQuickStart { public static void main(String[] args) throws MQClientException,InterruptedException { /**1.設置Producer的GroupName**/ DefaultMQProducer producer= new DefaultMQProducer("GROUP_B"); /**2.設置Instance**/ producer.setInstanceName("instanceB"); /**3.設置發送失敗的重試次數**/ producer.setRetryTimesWhenSendFailed(3); /**4.設置NameServer的地址**/ producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.2:9876"); /**5.啟動Producer**/ producer.start(); for (int i = 0; i < 10; i++) { try{ /**6.組裝消息並發送**/ Message msg = new Message("TopicTest","TagA", ("Hello HigginCui:"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg,new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("sendResultStatus:" + sendResult.getSendStatus()); } @Override public void onException(Throwable e) { e.printStackTrace(); } }); }catch (Exception e){ e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
[ 提示:設置Instance ]
當一個JVM需要啟動多個Producer的時候,通過設置不同的InstanceName來區分,不設置的話系統使用默認名稱“DEFAULT”。
[ 提示:設置發送失敗的重試次數 ]
當網絡出現異常的時候,這個次數影響消息的重復投遞次數。想保證消息不丟失,可以設置多重試幾次。
【消息發送的返回值】
FLUSH_DISK_TIMEOUT
刷盤超時(需要Broker設置為SYNC_FLUSH同步刷盤才會報這個錯)
FLUSH_SLAVE_TIMEOUT
主從同步超時(在主備方式,且Broker設置為SYNC_MASTER情況下)
SLAVE_NOT_AVALIABLE
沒有找到被設置成SLAVE的Broker。(在主備方式,且Broker設置成SYNC_MASTER的情況下)
SEND_OK
發送成功(需要結合所配置的 刷盤策略、主從策略來定)
【延遲消息】
RocketMQ支持延遲消息,Broker收到這類消息後,延遲一段時間再處理,使消息在規定的一段時間內生效。
延遲消息使用方法:
在創建Message對象時,調用setDelayTimeLevel(int level)方法設置延遲時間,然後再把這個新消息發送出去。
目前延遲消息不支持任意設置,僅支持預設值的時間長度(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)。如setDelayTimeLevel(3)表示延遲10s。
【自定義消息發送規則】
一個Topic下會有多個MessageQueue,如果使用Producer的默認配置,這個Producer會輪流向各個MessageQueue發送消息。Consumer消費的時候,會根據負載均衡策略,消費分配到的MessageQueue。不經過特定設置,某條消息發往哪個MessageQueue,被哪個Consumer消費都是未知的,
[ 如果把同一類型的消息發往相同的MessageQueue? ]
想把同一類型的消息發往相同的MessageQueue,可以用MessageQueueSelector。
代碼示例:
public class OrderMessageQueueSelector implements MessageQueueSelector { /** * 根據訂單的id值平均分配對應的MessageQueue * @param mqs 消息隊列 * @param msg 消息 * @param orderKey * @return */ @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object orderKey) { int id = Integer.parseInt(orderKey.toString()); int idMainIndex = id/100; int size = mqs.size(); //MessageQueue的總數 int index= idMainIndex /size ; return mqs.get(index); //返回選中的MessageQueue } }
在發送消息的時候,把MessageQueueSelector的對象作為參數,使用
MQProducer接口的自定義發送方法:
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
在MessageQueueSelector的實現中,根據傳入的Object參數,或者根據Message消息內容確定把消息發往哪個MessageQueue,返回被選中的MessageQueue。
【事務消息】
RocketMQ的事務消息:發送消息事件和其他事件需要同時成功或失敗。
如轉賬操作:A賬戶轉賬1W到B賬戶,A發送"B賬戶加1W"的消息,要和"A賬戶扣除1W"的這個操作同時成功或失敗。
[ 關鍵詞 ]
兩階段提交。
[ 大致流程 ]
RocketMQ采用兩階段提交的方式實現事務消息。
TransactionMQProducer處理流程如下:
1.先發一個"B賬戶增加1W"的待確認消息。
2.發送成功後做"A賬戶扣除1W"的操作
3.根據"A賬戶扣除1W"的操作成功與否,決定之前"B賬戶增加1W"的消息是commit還是rollback。
[ 具體流程 ]
1.Producer向MQ發送"B賬戶增加1W"的待確認消息。
2.RocketMQ將這個待確認消息持久化成功後,向Producer回復消息發送成功,此時第一階段消息發送完成。
3.執行本地事件邏輯,即"A賬戶扣除1W"的操作。
4.Producer根據本地事件執行結果向RocketMQ發送二次確認(Commit或RollBack)消息:
如果收到Commit狀態則將第一階段的待確認消息標記為“可投遞”,Consumer將收到該消息;
如果收到RocketBack狀態則刪除第一階段的待確認消息,Consumer無法收到該消息。
5.若中途出現異常,步驟4提交的二次確認最終未到達RocketMQ,服務器在經過固定的時間會對“待確認”消息發起回查請求。
6.Producer收到回查請求後,通過檢查本地對應消息的本地事件執行結果返回Commit或RockBack狀態(如果發送第一階段待確認消息的Producer不能工作,回查請求將被發送到和Producer在同一個Group裏的其他Producer)。
【為什麽RocketMQ4.x版本刪除事務消息】
雖然上述的方案很好地實現了事務消息功能,也是RocketMQ之前的版本實現事務消息的邏輯,因為RocketMQ依賴將數據順序寫到磁盤的這個特征來提高性能,步驟4需要更改第一階段待確認消息的狀態,這樣會導致磁盤Catch的臟頁過多,降低了系統性能,所以RocketMQ在4.x版本將這部分功能去除了。
RocketMQ讀書筆記2——生產者