rocketmq問題彙總-如何將特定訊息傳送至特定queue,消費者從特定queue消費
阿新 • • 發佈:2019-02-12
業務描述
由於業務需要這樣一種場景,將訊息按照id(業務id)尾號傳送到對應的queue中,並啟動10個消費者(單jvm,10個消費者組),從對應的queue中叢集消費,如下圖1所示(假設有兩個broker組成的叢集):
producer如何實現
producer只需傳送訊息時呼叫如下方法即可
/** * 傳送有序訊息 * * @param messageMap 訊息資料 * @param selector 佇列選擇器,傳送時會回撥 * @param order 回撥佇列選擇器時,此引數會傳入佇列選擇方法,提供配需規則 * @return
關鍵是如何實現MessageQueueSelector:
class IDHashMessageQueueSelector implements MessageQueueSelector{ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int
這樣,所有的訊息會根據訊息的尾號,輪詢的落到相應的queue上。參考圖2,假設id=10001231,由於一共有20個queue,所以10001231%20=11,故訊息會落到broker-b queue-1上。
consumer端如何實現
針對consumer由於沒有限制是順序消費,故可以採用叢集消費模式的DefaultMQPushConsumer,由於一個消費者消費一類queue,故需要10個consumer group,比如consumer group0需要消費的queue為broker-a queue-0和broker-b queue-0,如下圖的概示:
那麼需要自己實現一個AllocateMessageQueueStrategy進行queue的分配,我們假設consumer group的名字格式需要提前定好,如xxx{queueid}ConsumerGroup,那麼實現如下:public class AllocateMessageQueueByHashAveragely extends AllocateMessageQueueAveragely{ private final Logger log = ClientLogger.getLog(); @Override public String getName() { return super.getName()+"ByIDHash"; } @Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { //解析queue id char idChar = consumerGroup.charAt(consumerGroup.length() - "ConsumerGroup".length() - 1); int id = Integer.parseInt(idChar+""); List<MessageQueue> submq = new ArrayList<MessageQueue>(); //根據queue id分配相應的MessageQueue for(MessageQueue mq : mqAll) { if(mq.getQueueId() == idChar || mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { submq.add(mq); } } if(submq.size() == 0) { log.warn("allocate err:"+consumerGroup+","+currentCID+","+cidAll+","+mqAll); } return super.allocate(consumerGroup, currentCID, submq, cidAll); } }
藉助AllocateMessageQueueAveragely來實現,以便有多個jvm的消費者時,能夠進行叢集消費,但是針對上面這個例子,消費者jvm例項不能超過2個,至於為什麼,參照下圖:
- over