1. 程式人生 > >rocketmq問題彙總-如何將特定訊息傳送至特定queue,消費者從特定queue消費

rocketmq問題彙總-如何將特定訊息傳送至特定queue,消費者從特定queue消費

  1. 業務描述

    由於業務需要這樣一種場景,將訊息按照id(業務id)尾號傳送到對應的queue中,並啟動10個消費者(單jvm,10個消費者組),從對應的queue中叢集消費,如下圖1所示(假設有兩個broker組成的叢集):
    圖1

  2. producer如何實現

    producer只需傳送訊息時呼叫如下方法即可

    /**
     * 傳送有序訊息
     *
     * @param messageMap 訊息資料
     * @param selector   佇列選擇器,傳送時會回撥
     * @param order      回撥佇列選擇器時,此引數會傳入佇列選擇方法,提供配需規則
     * @return
    傳送結果 */
    public Result<SendResult> send(Message msg, MessageQueueSelector selector, Object arg)

    關鍵是如何實現MessageQueueSelector:

    class IDHashMessageQueueSelector implements MessageQueueSelector{
        public MessageQueue select(List<MessageQueue> mqs, Message msg,
                Object arg) {
            int
    id = Integer.parseInt(arg.toString()); int size = mqs.size(); int index = id%size; return mqs.get(index); } }

    這樣,所有的訊息會根據訊息的尾號,輪詢的落到相應的queue上。參考圖2,假設id=10001231,由於一共有20個queue,所以10001231%20=11,故訊息會落到broker-b queue-1上。
    圖2

  3. consumer端如何實現

    針對consumer由於沒有限制是順序消費,故可以採用叢集消費模式的DefaultMQPushConsumer,由於一個消費者消費一類queue,故需要10個consumer group,比如consumer group0需要消費的queue為broker-a queue-0和broker-b queue-0,如下圖的概示:
    這裡寫圖片描述


    那麼需要自己實現一個AllocateMessageQueueStrategy進行queue的分配,我們假設consumer group的名字格式需要提前定好,如xxx{queueid}ConsumerGroup,那麼實現如下:

    public class AllocateMessageQueueByHashAveragely extends AllocateMessageQueueAveragely{
    private final Logger log = ClientLogger.getLog();
    @Override
    public String getName() {
        return super.getName()+"ByIDHash";
    }
    
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID,
            List<MessageQueue> mqAll, List<String> cidAll) {
        //解析queue id
        char idChar = consumerGroup.charAt(consumerGroup.length() - "ConsumerGroup".length() - 1);
        int id = Integer.parseInt(idChar+"");
        List<MessageQueue> submq = new ArrayList<MessageQueue>();
        //根據queue id分配相應的MessageQueue
        for(MessageQueue mq : mqAll) {
            if(mq.getQueueId() == idChar || mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                submq.add(mq);
            }
        }
        if(submq.size() == 0) {
            log.warn("allocate err:"+consumerGroup+","+currentCID+","+cidAll+","+mqAll);
        }
        return super.allocate(consumerGroup, currentCID, submq, cidAll);
    }
    }

    藉助AllocateMessageQueueAveragely來實現,以便有多個jvm的消費者時,能夠進行叢集消費,但是針對上面這個例子,消費者jvm例項不能超過2個,至於為什麼,參照下圖:
    這裡寫圖片描述

  4. over