《Apache RocketMQ使用者指南》之過濾訊息示例
阿新 • • 發佈:2018-12-22
訊息過濾示例
原文連結 譯者:小村長
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE"); consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
消費者將收到包含TAGA或TAGB或TAGB的訊息. 但限制是一條訊息只能有一個標籤,而這對於複雜的情況可能無效。 在這種情況下,您可以使用SQL表示式篩選出訊息.
原理
SQL功能可以通過您在傳送訊息時放入的屬性進行一些計算。 在RocketMQ定義的語法下,您可以實現一些有趣的邏輯。 這是一個例子:
------------ | message | |----------| a > 5 AND b = 'abc' | a = 10 | --------------------> Gotten | b = 'abc'| | c = true | ------------ ------------ | message | |----------| a > 5 AND b = 'abc' | a = 1 | --------------------> Missed | b = 'abc'| | c = true | ------------
語法
RocketMQ只定義了一些基本的語法來支援這個功能。 你也可以很容易地擴充套件它.
- 數字比較, 像
>
,>=
,<
,<=
,BETWEEN
,=
; - 字元比較, 像
=
,<>
,IN
; IS NULL
或者IS NOT NULL
;- 邏輯運算
AND
,OR
,NOT
;
常量型別是:
- 數字, 像123, 3.1415;
- 字串, 像‘abc’,必須使用單引號;
NULL
, 特殊常數;- 布林常量,
TRUE
或FALSE
;
使用限制
只有消費者可以通過SQL92選擇訊息。 示例:
public void subscribe(final String topic, final MessageSelector messageSelector)
生產者示例
傳送時,您可以通過putUserProperty方法在訊息中放置屬性.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.start(); Message msg = new Message("TopicTest", tag, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); // Set some properties. msg.putUserProperty("a", String.valueOf(i)); SendResult sendResult = producer.send(msg); producer.shutdown()
消費者示例
消費時,使用Message Selector.by Sql通過SQL92選擇訊息.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); // only subsribe messages have property a, also a >=0 and a <= 3 consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();