說說MQ之RocketMQ(二)
RocketMQ 的 Java API
RocketMQ 是用 Java 語言開發的,因此,其 Java API 相對是比較豐富的,當然也有部分原因是 RocketMQ 本身提供的功能就比較多。RocketMQ API 提供的功能包括,
- 廣播消費,這個在之前已經提到過;
- 訊息過濾,支援簡單的 Message Tag 過濾,也支援按 Message Header、body 過濾;
- 順序消費和亂序消費,之前也提到過,這裡的順序消費應該指的是普通順序性,這一點與 Kafka 相同;
- Pull 模式消費,這個是相對 Push 模式來說的,Kafka 就是 Pull 模式消費;
- 事務訊息,這個好像沒有開源,但是 example 程式碼中有示例,總之,不推薦用;
- Tag,RocketMQ 在 Topic 下面又分了一層 Tag,用於表示訊息類別,可以用來過濾,但是順序性還是以 Topic 來看;
單看功能的話,即使不算事務訊息,也不算 Tag,RocketMQ 也遠超 Kafka,Kafka 應該只實現了 Pull 模式消費 + 順序消費這2個功能。RocketMQ 的程式碼示例在 rocketmq-example 中,注意,程式碼是不能直接執行的,因為所有的程式碼都少了設定 name server 的部分,需要自己手動加上,例如,producer.setNamesrvAddr("192.168.232.23:9876");
。
先來看一下生產者的 API,比較簡單,只有一種,如下,
import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.MessageQueueSelector; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import java.util.List; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("192.168.232.23:9876"); producer.start(); for (int i = 0; i < 10; i++) try { { Message msg = new Message("TopicTest1",// topic "TagA",// tag "OrderID188",// key ("RocketMQ "+String.format("%05d", i)).getBytes());// body SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, i)); System.out.println(String.format("%05d", i)+sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
可以發現,相比 Kafka 的 API,只多了 Tag,但實際上行為有很大不同。Kafka 的生產者客戶端,有同步和非同步兩種模式,但都是阻塞模式,send
方法返回傳送狀態的 Future
,可以通過 Future
的 get
方法阻塞獲得傳送狀態。而 RocketMQ 採用的是同步非阻塞模式,傳送之後立刻返回傳送狀態(而不是 Future
)。正常情況下,兩者使用上差別不大,但是在高可用場景中發生主備切換的時候,Kafka 的同步可以等待切換完成並重連,最後返回;而 RocketMQ 只能立刻報錯,由生產者選擇是否重發。所以,在生產者的 API 上,其實 Kafka 是要強一些的。
另外,RocketMQ 可以通過指定 MessageQueueSelector
類的實現來指定將訊息傳送到哪個分割槽去,Kafka 是通過指定生產者的 partitioner.class
引數來實現的,靈活性上 RocketMQ 略勝一籌。
再來看消費者的API,由於 RocketMQ 的功能比較多,我們先看 Pull 模式消費的API,如下,
import java.util.HashMap; import java.util.Map; import java.util.Set; import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; import com.alibaba.rocketmq.client.consumer.PullResult; import com.alibaba.rocketmq.client.consumer.store.OffsetStore; import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.message.MessageQueue; public class PullConsumer { private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.setNamesrvAddr("192.168.232.23:9876"); consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1"); for (MessageQueue mq : mqs) { System.out.println("Consume from the queue: " + mq); SINGLE_MQ: while (true) { try { long offset = consumer.fetchConsumeOffset(mq, true); PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); if (null != pullResult.getMsgFoundList()) { for (MessageExt messageExt : pullResult.getMsgFoundList()) { System.out.print(new String(messageExt.getBody())); System.out.print(pullResult); System.out.println(messageExt); } } putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: // TODO break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static void putMessageQueueOffset(MessageQueue mq, long offset) { offseTable.put(mq, offset); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = offseTable.get(mq); if (offset != null) return offset; return 0; } }
這部分的 API 其實是和 Kafka 很相似的,唯一不同的是,RocketMQ 需要手工管理 offset 和指定分割槽,而 Kafka 可以自動管理(當然也可以手動管理),並且不需要指定分割槽(分割槽是在 Kafka 訂閱的時候指定的)。例子中,RocketMQ 使用 HashMap 自行管理,也可以用 OffsetStore
介面,提供了兩種管理方式,本地檔案和遠端 Broker。這部分感覺兩者差不多。
下面再看看 Push 模式順序消費,程式碼如下,
import java.util.List; import java.util.concurrent.atomic.AtomicLong; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); consumer.setNamesrvAddr("192.168.232.23:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest1", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(false); System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); this.consumeTimes.incrementAndGet(); if ((this.consumeTimes.get() % 2) == 0) { return ConsumeOrderlyStatus.SUCCESS; } else if ((this.consumeTimes.get() % 3) == 0) { return ConsumeOrderlyStatus.ROLLBACK; } else if ((this.consumeTimes.get() % 4) == 0) { return ConsumeOrderlyStatus.COMMIT; } else if ((this.consumeTimes.get() % 5) == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
雖然提供了 Push 模式,RocketMQ 內部實際上還是 Pull 模式的 MQ,Push 模式的實現應該採用的是長輪詢,這點與 Kafka 一樣。使用該方式有幾個注意的地方,
- 接收訊息的監聽類要使用
MessageListenerOrderly
; ConsumeFromWhere
有幾個引數,表示從頭開始消費,從尾開始消費,還是從某個 TimeStamp 開始消費;- 可以控制 offset 的提交,應該就是
context.setAutoCommit(false);
的作用;
控制 offset 提交這個特性非常有用,某種程度上擴充套件一下,就可以當做事務來用了,看程式碼 ConsumeMessageOrderlyService
的實現,其實並沒有那麼複雜,在不啟用 AutoCommit 的時候,只有返回 COMMIT
才 commit offset;啟用 AutoCommit 的時候,返回 COMMIT
、ROLLBACK
(這個比較扯)、SUCCESS
的時候,都 commit offset。
後來發現,commit offset 功能在 Kafka 裡面也有提供,使用新的 API,呼叫 consumer.commitSync
。
再看一個 Push 模式亂序消費 + 訊息過濾的例子,消費者的程式碼如下,
import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4"); consumer.setNamesrvAddr("192.168.232.23:9876"); consumer.subscribe("TopicTest1", MessageFilterImpl.class.getCanonicalName()); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
這個例子與之前順序消費不同的地方在於,
- 接收訊息的監聽類使用的是
MessageListenerConcurrently
; - 回撥方法中,使用的是自動 offset commit;
- 訂閱的時候增加了訊息過濾類
MessageFilterImpl
;
訊息過濾類 MessageFilterImpl
的程式碼如下,
import com.alibaba.rocketmq.common.filter.MessageFilter; import com.alibaba.rocketmq.common.message.MessageExt; public class MessageFilterImpl implements MessageFilter { @Override public boolean match(MessageExt msg) { String property = msg.getUserProperty("SequenceId"); if (property != null) { int id = Integer.parseInt(property); if ((id % 3) == 0 && (id > 10)) { return true; } } return false; } }
RocketMQ 執行過濾是在 Broker 端,Broker 所在的機器會啟動多個 FilterServer 過濾程序;Consumer 啟動後,會向 FilterServer 上傳一個過濾的 Java 類;Consumer 從 FilterServer 拉訊息,FilterServer 將請求轉發給 Broker,FilterServer 從 Broker 收到訊息後,按照 Consumer 上傳的 Java 過濾程式做過濾,過濾完成後返回給 Consumer。這種過濾方法可以節省網路流量,但是增加了 Broker 的負擔。可惜我沒有實驗出來使用過濾的效果,即使是用 github wiki 上的例子8也沒成功,不糾結了。RocketMQ 的按 Tag 過濾的功能也是在 Broker 上做的過濾,能用,是個很方便的功能。
還有一種廣播消費模式,比較簡單,可以去看程式碼,不再列出。
總之,RocketMQ 提供的功能比較多,比 Kafka 多很多易用的 API。