1. 程式人生 > >說說MQ之RocketMQ(二)

說說MQ之RocketMQ(二)

RocketMQ 的 Java API

RocketMQ 是用 Java 語言開發的,因此,其 Java API 相對是比較豐富的,當然也有部分原因是 RocketMQ 本身提供的功能就比較多。RocketMQ API 提供的功能包括,

  1. 廣播消費,這個在之前已經提到過;
  2. 訊息過濾,支援簡單的 Message Tag 過濾,也支援按 Message Header、body 過濾;
  3. 順序消費和亂序消費,之前也提到過,這裡的順序消費應該指的是普通順序性,這一點與 Kafka 相同;
  4. Pull 模式消費,這個是相對 Push 模式來說的,Kafka 就是 Pull 模式消費;
  5. 事務訊息,這個好像沒有開源,但是 example 程式碼中有示例,總之,不推薦用;
  6. Tag,RocketMQ 在 Topic 下面又分了一層 Tag,用於表示訊息類別,可以用來過濾,但是順序性還是以 Topic 來看;

單看功能的話,即使不算事務訊息,也不算 Tag,RocketMQ 也遠超 Kafka,Kafka 應該只實現了 Pull 模式消費 + 順序消費這2個功能。RocketMQ 的程式碼示例在 rocketmq-example 中,注意,程式碼是不能直接執行的,因為所有的程式碼都少了設定 name server 的部分,需要自己手動加上,例如,producer.setNamesrvAddr("192.168.232.23:9876");

先來看一下生產者的 API,比較簡單,只有一種,如下,

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import java.util.List;
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("192.168.232.23:9876");
        producer.start();
        for (int i = 0; i < 10; i++)
            try {
                {
                    Message msg = new Message("TopicTest1",// topic
                        "TagA",// tag
                        "OrderID188",// key
                        ("RocketMQ "+String.format("%05d", i)).getBytes());// body
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, i));
                    System.out.println(String.format("%05d", i)+sendResult);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        producer.shutdown();
    }
}

可以發現,相比 Kafka 的 API,只多了 Tag,但實際上行為有很大不同。Kafka 的生產者客戶端,有同步和非同步兩種模式,但都是阻塞模式,send 方法返回傳送狀態的 Future,可以通過 Future 的 get 方法阻塞獲得傳送狀態。而 RocketMQ 採用的是同步非阻塞模式,傳送之後立刻返回傳送狀態(而不是 Future)。正常情況下,兩者使用上差別不大,但是在高可用場景中發生主備切換的時候,Kafka 的同步可以等待切換完成並重連,最後返回;而 RocketMQ 只能立刻報錯,由生產者選擇是否重發。所以,在生產者的 API 上,其實 Kafka 是要強一些的。

另外,RocketMQ 可以通過指定 MessageQueueSelector 類的實現來指定將訊息傳送到哪個分割槽去,Kafka 是通過指定生產者的 partitioner.class 引數來實現的,靈活性上 RocketMQ 略勝一籌。

再來看消費者的API,由於 RocketMQ 的功能比較多,我們先看 Pull 模式消費的API,如下,

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
public class PullConsumer {
    private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("192.168.232.23:9876");
        consumer.start();
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
        for (MessageQueue mq : mqs) {
            System.out.println("Consume from the queue: " + mq);
            SINGLE_MQ: while (true) {
                try {
                    long offset = consumer.fetchConsumeOffset(mq, true);
                    PullResult pullResult =
                            consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    if (null != pullResult.getMsgFoundList()) {
                        for (MessageExt messageExt : pullResult.getMsgFoundList()) {
                            System.out.print(new String(messageExt.getBody()));
                            System.out.print(pullResult);
                            System.out.println(messageExt);
                        }
                    }
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                    case FOUND:
                        // TODO
                        break;
                    case NO_MATCHED_MSG:
                        break;
                    case NO_NEW_MSG:
                        break SINGLE_MQ;
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offseTable.put(mq, offset);
    }
    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offseTable.get(mq);
        if (offset != null)
            return offset;
        return 0;
    }
}

這部分的 API 其實是和 Kafka 很相似的,唯一不同的是,RocketMQ 需要手工管理 offset 和指定分割槽,而 Kafka 可以自動管理(當然也可以手動管理),並且不需要指定分割槽(分割槽是在 Kafka 訂閱的時候指定的)。例子中,RocketMQ 使用 HashMap 自行管理,也可以用 OffsetStore 介面,提供了兩種管理方式,本地檔案和遠端 Broker。這部分感覺兩者差不多。

下面再看看 Push 模式順序消費,程式碼如下,

import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr("192.168.232.23:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(false);
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                this.consumeTimes.incrementAndGet();
                if ((this.consumeTimes.get() % 2) == 0) {
                    return ConsumeOrderlyStatus.SUCCESS;
                }
                else if ((this.consumeTimes.get() % 3) == 0) {
                    return ConsumeOrderlyStatus.ROLLBACK;
                }
                else if ((this.consumeTimes.get() % 4) == 0) {
                    return ConsumeOrderlyStatus.COMMIT;
                }
                else if ((this.consumeTimes.get() % 5) == 0) {
                    context.setSuspendCurrentQueueTimeMillis(3000);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

雖然提供了 Push 模式,RocketMQ 內部實際上還是 Pull 模式的 MQ,Push 模式的實現應該採用的是長輪詢,這點與 Kafka 一樣。使用該方式有幾個注意的地方,

  1. 接收訊息的監聽類要使用 MessageListenerOrderly
  2. ConsumeFromWhere 有幾個引數,表示從頭開始消費,從尾開始消費,還是從某個 TimeStamp 開始消費;
  3. 可以控制 offset 的提交,應該就是 context.setAutoCommit(false); 的作用;

控制 offset 提交這個特性非常有用,某種程度上擴充套件一下,就可以當做事務來用了,看程式碼 ConsumeMessageOrderlyService 的實現,其實並沒有那麼複雜,在不啟用 AutoCommit 的時候,只有返回 COMMIT 才 commit offset;啟用 AutoCommit 的時候,返回 COMMITROLLBACK(這個比較扯)、SUCCESS 的時候,都 commit offset。

後來發現,commit offset 功能在 Kafka 裡面也有提供,使用新的 API,呼叫 consumer.commitSync

再看一個 Push 模式亂序消費 + 訊息過濾的例子,消費者的程式碼如下,

import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
        consumer.setNamesrvAddr("192.168.232.23:9876");
        consumer.subscribe("TopicTest1", MessageFilterImpl.class.getCanonicalName());
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

這個例子與之前順序消費不同的地方在於,

  1. 接收訊息的監聽類使用的是 MessageListenerConcurrently
  2. 回撥方法中,使用的是自動 offset commit;
  3. 訂閱的時候增加了訊息過濾類 MessageFilterImpl

訊息過濾類 MessageFilterImpl 的程式碼如下,

import com.alibaba.rocketmq.common.filter.MessageFilter;
import com.alibaba.rocketmq.common.message.MessageExt;
public class MessageFilterImpl implements MessageFilter {
    @Override
    public boolean match(MessageExt msg) {
        String property = msg.getUserProperty("SequenceId");
        if (property != null) {
            int id = Integer.parseInt(property);
            if ((id % 3) == 0 && (id > 10)) {
                return true;
            }
        }
        return false;
    }
}

RocketMQ 執行過濾是在 Broker 端,Broker 所在的機器會啟動多個 FilterServer 過濾程序;Consumer 啟動後,會向 FilterServer 上傳一個過濾的 Java 類;Consumer 從 FilterServer 拉訊息,FilterServer 將請求轉發給 Broker,FilterServer 從 Broker 收到訊息後,按照 Consumer 上傳的 Java 過濾程式做過濾,過濾完成後返回給 Consumer。這種過濾方法可以節省網路流量,但是增加了 Broker 的負擔。可惜我沒有實驗出來使用過濾的效果,即使是用 github wiki 上的例子8也沒成功,不糾結了。RocketMQ 的按 Tag 過濾的功能也是在 Broker 上做的過濾,能用,是個很方便的功能。

還有一種廣播消費模式,比較簡單,可以去看程式碼,不再列出。

總之,RocketMQ 提供的功能比較多,比 Kafka 多很多易用的 API。