1. 程式人生 > >《Apache RocketMQ使用者指南》之過濾訊息示例

《Apache RocketMQ使用者指南》之過濾訊息示例

訊息過濾示例

原文連結        譯者:小村長

在大多數情況下,tag是一種簡單而有用的設計,用於選擇所需的資訊。 例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消費者將收到包含TAGA或TAGB或TAGB的訊息. 但限制是一條訊息只能有一個標籤,而這對於複雜的情況可能無效。 在這種情況下,您可以使用SQL表示式篩選出訊息.

原理

SQL功能可以通過您在傳送訊息時放入的屬性進行一些計算。 在RocketMQ定義的語法下,您可以實現一些有趣的邏輯。 這是一個例子:

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------


語法

RocketMQ只定義了一些基本的語法來支援這個功能。 你也可以很容易地擴充套件它.

  1. 數字比較, 像 >, >=, <, <=, BETWEEN, =;
  2. 字元比較, 像 =, <>, IN;
  3. IS NULL 或者 IS NOT NULL;
  4. 邏輯運算AND, OR, NOT;

常量型別是:

  1. 數字, 像123, 3.1415;
  2. 字串, 像‘abc’,必須使用單引號;
  3. NULL, 特殊常數;
  4. 布林常量, TRUEFALSE;

使用限制

只有消費者可以通過SQL92選擇訊息。 示例:

public void subscribe(final String topic, final MessageSelector messageSelector)

生產者示例

傳送時,您可以通過putUserProperty方法在訊息中放置屬性.

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();

Message msg = new Message("TopicTest",
    tag,
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));

SendResult sendResult = producer.send(msg);
   
producer.shutdown()

消費者示例

消費時,使用Message Selector.by Sql通過SQL92選擇訊息.

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();