1. 程式人生 > 實用技巧 >RocketMQ原生API收發訊息

RocketMQ原生API收發訊息

RocketMQ原生API收發訊息

pom檔案

新建 maven 專案或 module,新增 rocketmq-client 依賴。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.tedu</groupId>
    <artifactId>demo1</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-store</artifactId>
            <version>4.7.1</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

同步訊息

同步訊息傳送要保證強一致性,發到master的訊息向slave複製後,才會向生產者傳送反饋資訊。

這種可靠性同步地傳送方式使用的比較廣泛,比如:重要的訊息通知,簡訊通知。

生產者

package demo1;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.Scanner;
/*
傳送同步訊息
 */
public class Producer {
    public static void main(String[] args) throws Exception {
        /*
        group 相同的生產者成為一個生產者組

        標識傳送同一類訊息的Producer,通常傳送邏輯一致。
        傳送普通訊息的時候,僅標識使用,並無特別用處。

        若傳送事務訊息,傳送某條訊息的producer-A宕機,
        使得事務訊息一直處於PREPARED狀態並超時,
        則broker會回查同一個group的其他producer,
        確認這條訊息應該commit還是rollback。

        但開源版本並不完全支援事務訊息(閹割了事務回查的程式碼)。?????
         */
        DefaultMQProducer p = new DefaultMQProducer("producer-demo1");

        /*
        連線nameserver叢集, 獲得註冊的broker資訊
         */
        p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        p.start();

        /*
        主題相當於是訊息的分類, 一類訊息使用一個主題
         */
        String topic = "Topic1";

        /*
        tag 相當於是訊息的二級分類, 在一個主題下, 可以通過 tag 再對訊息進行分類
         */
        String tag = "TagA";

        while (true) {
            System.out.print("輸入訊息,用逗號分隔多條訊息: ");
            String[] a = new Scanner(System.in).nextLine().split(",");

            for (String s : a) {
                Message msg = new Message(topic, tag, s.getBytes()); //一級分類, 二級分類, 訊息內容
                SendResult r = p.send(msg);// 傳送訊息後會得到伺服器反饋, 包含: smsgId, sendStatus, queue, queueOffset, offsetMsgId
                System.out.println(r);
            }
        }
    }
}

消費者

消費者的要點:

1. push 和 pull

消費者有兩種模式:push 和 pull。

push 模式由伺服器主動向消費者傳送訊息;pull 模式由消費者主動向伺服器請求訊息。

在消費者處理能力有限時,為了減輕消費者的壓力,可以採用pull模式。多數情況下都採用 pull 模式。

2. NameServer

消費者需要向 NameServer 詢問 Topic 的路由資訊。

3. Topic

從指定的Topic接收訊息。Topic相當於是一級分類。

4. Tag

Topic 相當於是一級分類,Tag 相當於是2級分類。

  • 多個 Tag 可以這樣寫: TagA || TagB || TagC
  • 不指定 Tag,或者說接收所有的 Tag,可以寫星號: *
package demo1;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws Exception {
        /*
        標識一類Consumer的集合名稱,

        這類Consumer通常消費一類訊息,且消費邏輯一致。
        同一個Consumer Group下的各個例項將共同消費
        topic的訊息,起到負載均衡的作用。

        消費進度以Consumer Group為粒度管理,不同
        Consumer Group之間消費進度彼此不受影響,
        即訊息A被Consumer Group1消費過,也會再
        給Consumer Group2消費。

        注: RocketMQ要求同一個Consumer Group的
        消費者必須要擁有相同的註冊資訊,即必須要聽一樣
        的topic(並且tag也一樣)。
         */
        DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo1");
        c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");

        c.subscribe("Topic1", "TagA");

        c.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext ctx) {
                for (MessageExt msg : list) {
                    System.out.println(new String(msg.getBody()) + " - " + msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        c.start();
        System.out.println("開始消費資料");
    }
}

非同步訊息


master 收到訊息後立即向生產者進行反饋。之後再以非同步方式向 slave 複製訊息。

非同步訊息通常用在對響應時間敏感的業務場景,即傳送端不能容忍長時間地等待Broker的響應。

生產者

package demo2;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.Scanner;

/*
非同步傳送訊息

一條訊息送出後, 不必暫停等待伺服器針對這條訊息的反饋, 而是可以立即傳送後續訊息.
使用監聽器, 以非同步的方式接收伺服器的反饋
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
        DefaultMQProducer p = new DefaultMQProducer("producer-demo2");
        p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        p.start();

        p.setRetryTimesWhenSendAsyncFailed(0);

        String topic = "Topic2";
        String tag = "TagA";
        String key = "Key-demo2";


        while (true) {
            System.out.print("輸入訊息,用逗號分隔多條訊息: ");
            String[] a = new Scanner(System.in).nextLine().split(",");

            for (String s : a) {
                Message msg = new Message(topic, tag, key, s.getBytes());

                p.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println("\n\n訊息傳送成功 : "+sendResult);
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        System.out.println("\n\n訊息傳送失敗");
                    }
                });

                System.out.println("--------------------訊息已送出-----------------------");
            }

        }
    }
}

消費者

package demo2;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
/*
與 demo1.Consumer 完全相同
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo2");
        c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");

        c.subscribe("Topic2", "TagA");

        c.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    System.out.println(new String(msg.getBody()) + " - " + msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });


        c.start();
        System.out.println("開始消費資料");
    }
}

單向訊息

這種方式主要用在不特別關心傳送結果的場景,例如日誌傳送。

生產者

package demo3;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.Scanner;

/*
單向訊息

訊息發出後, 伺服器不會返回結果
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
        DefaultMQProducer p = new DefaultMQProducer("producer-demo3");
        p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        p.start();

        String topic = "Topic3";
        String tag = "TagA";

        while (true) {
            System.out.print("輸入訊息,用逗號分隔多條訊息: ");
            String[] a = new Scanner(System.in).nextLine().split(",");
            for (String s : a) {
                Message msg = new Message(topic, tag, s.getBytes());
                p.sendOneway(msg);
            }
            System.out.println("--------------------訊息已送出-----------------------");
        }

    }
}

消費者

package demo3;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/*
與 demo1.Consumer 完全相同
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo2");
        c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");

        c.subscribe("Topic3", "TagA");

        c.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    System.out.println(new String(msg.getBody()) + " - " + msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });


        c.start();
        System.out.println("開始消費資料");
    }
}

順序訊息

上圖演示了 Rocketmq 順序訊息的基本原理:

  • 同一組有序的訊息序列,會被髮送到同一個佇列,按照 FIFO 的方式進行處理
  • 一個佇列只允許一個消費者執行緒接收訊息,這樣就保證訊息按順序被接收

下面以訂單為例:

一個訂單的順序流程是:建立、付款、推送、完成。訂單號相同的訊息會被先後傳送到同一個佇列中。消費時,從同一個佇列接收同一個訂單的訊息。

生產者

package demo4;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.List;
import java.util.Scanner;
/*
以下訊息, 相同id的訊息按順序傳送到同一個佇列,
消費時也從同一個佇列按順序消費
                                              topic

                                        =======================  queue1
                                        =======================  queue2
111,訊息1  111,訊息2  111,訊息3   ------->=======================  queue3
                                        =======================  queue4
222,訊息1  222,訊息2  222,訊息3   ------->=======================  queue5
                                        =======================  queue6
333,訊息1  333,訊息2  333,訊息3   ------->=======================  queue7
                                        =======================  queue8
                                                    ......
 */
public class Producer {
    static String[] msgs = {
            "15103111039,建立",
                                "15103111065,建立",
            "15103111039,付款",
                                                    "15103117235,建立",
                                "15103111065,付款",
                                                    "15103117235,付款",
                                "15103111065,完成",
            "15103111039,推送",
                                                    "15103117235,完成",
            "15103111039,完成"
    };

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer p = new DefaultMQProducer("producer-demo4");
        p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        p.start();

        String topic = "Topic4";
        String tag = "TagA";

        for (String s : msgs) {
            System.out.println("按回車傳送此訊息: "+s);
            new Scanner(System.in).nextLine();

            Message msg = new Message(topic, tag, s.getBytes());

            String[] a = s.split(",");
            long orderId = Long.parseLong(a[0]);

            /*
            MessageQueueSelector用來選擇傳送的佇列,
            這裡用訂單的id對佇列數量取餘來計算佇列索引

            send(msg, queueSelector, obj)
            第三個引數會傳遞到queueSelector, 作為它的第三個引數
             */
            SendResult r = p.send(msg, new MessageQueueSelector() {
                /*
                三個引數的含義:
                queueList: 當前Topic中所有佇列的列表
                message: 訊息
                o: send()方法傳入的orderId
                 */
                @Override
                public MessageQueue select(List<MessageQueue> queueList, Message message, Object o) {
                    Long orderId = (Long) o;
                    //訂單id對佇列數量取餘, 相同訂單id得到相同的佇列索引
                    long index = orderId % queueList.size();
                    System.out.println("訊息已傳送到: "+queueList.get((int) index));
                    return queueList.get((int) index);
                }
            }, orderId);

            System.out.println(r+"\n\n");
        }
    }
}

消費者

package demo4;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo4");
        c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        
        c.subscribe("Topic4", "*");

        c.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                String t = Thread.currentThread().getName();

                for (MessageExt msg : list) {
                    System.out.println(t+" - "+ msg.getQueueId() + " - " +new String(msg.getBody()));
                }

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        c.start();
        System.out.println("開始消費資料");
    }
}

延時訊息

訊息傳送到 Rocketmq 伺服器後, 延遲一定時間再向消費者進行投遞。

延時訊息的使用場景:

比如電商裡,提交了一個訂單就可以傳送一個延時訊息,1h後去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。

生產者傳送訊息時,對訊息進行延時設定:

msg.setDelayTimeLevel(3);

其中 3 代表級別而不是一個具體的時間值,級別和延時時長對應關係是在 MessageStoreConfig 類種進行定義的:

this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

對應關係表:

級別 延時時長
1 1s
2 5s
3 10s
4 30s
5 1m
6 2m
7 3m
8 4m
9 5m
10 6m
11 7m
12 8m
13 9m
14 10m
15 20m
16 30m
17 1h
18 2h

生產者

package demo5;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.Scanner;

/*
延時訊息

延時訊息的使用場景
比如電商裡,提交了一個訂單就可以傳送一個延時訊息,1h後去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer p = new DefaultMQProducer("producer-demo5");
        p.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876");
        p.start();

        while (true) {
            System.out.print("輸入訊息,用逗號分隔多條訊息: ");
            String[] a = new Scanner(System.in).nextLine().split(",");

            for (String s : a) {
                Message msg = new Message("Topic5", s.getBytes());

                /*
                設定訊息的延遲時間,這裡不支援任意的時間,只支援18個固定的延遲時長,
                分別用Leven 1到18 來表示:

                org/apache/rocketmq/store/config/MessageStoreConfig.java
                this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
                 */
                msg.setDelayTimeLevel(3);

                p.send(msg);
            }
        }
    }
}

消費者

package demo5;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo5");
        c.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876");
        c.subscribe("Topic5", "*");

        c.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext ctx) {
                System.out.println("------------------------------");
                for (MessageExt msg : list) {
                    long t = System.currentTimeMillis() - msg.getBornTimestamp();
                    System.out.println(new String(msg.getBody()) + " - 延遲: "+t);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        c.start();
        System.out.println("開始消費資料");
    }
}

批量訊息

批量傳送訊息能顯著提高傳遞小訊息的效能。限制是這些批量訊息應該有相同的topic,相同的waitStoreMsgOK,而且不能是延時訊息。此外,這一批訊息的總大小不應超過4MB。

生產者

package demo6;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.ArrayList;
import java.util.Scanner;
/*
批量傳送訊息能顯著提高傳遞小訊息的效能。限制是:
- 這些批量訊息應該有相同的topic,
- 相同的waitStoreMsgOK,
- 而且不能是延時訊息。
- 這一批訊息的總大小不應超過4MB。

如果超出4M需要進行資料分割, 請參考官方程式碼樣例https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer p = new DefaultMQProducer("producer-demo6");
        p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        p.start();

        String topic = "Topic6";

        while (true) {
            System.out.print("輸入訊息,用逗號分隔多條訊息: ");
            String[] a = new Scanner(System.in).nextLine().split(",");

            ArrayList<Message> messages = new ArrayList<>();
            for (String s : a) {
                messages.add(new Message(topic, s.getBytes()));
            }

            p.send(messages);
            System.out.println("批量訊息已傳送");
        }
    }
}

消費者

package demo6;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo6");
        c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        c.subscribe("Topic6", "*");

        c.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg :
                        list) {
                    System.out.println("收到: "+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        c.start();
        System.out.println("開始消費資料");
    }
}

訊息過濾

Tag 過濾

Tag 可以滿足大多數訊息過濾的需求。使用 Tag 過濾非常簡單,例如:

consumer.subscribe("Topic1", "TagA || TagB || TagC");

對自定義屬性過濾

生產者可以在訊息中新增自定義的屬性:

msg.putUserProperty("prop1", "1");
msg.putUserProperty("prop2", "2");

消費者接收資料時,可以根據屬性來過濾訊息:

consumer.subscribe("Topic7", MessageSelector.bySql("prop1=1 or prop2=2"));
1

可以看到,自定義屬性的過濾語法是 Sql 語法,RocketMQ只定義了一些基本語法來支援這個特性,支援的 Sql 過濾語法如下:

  • 數值比較,比如:>,>=,<,<=,BETWEEN,=;
  • 字元比較,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 邏輯符號 AND,OR,NOT;

生產者

package demo7;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.Random;
import java.util.Scanner;
/*
傳送的訊息中包含 tag 和 userProperty

消費者接收時,可以選擇用 tag 或 userProperty 進行過濾
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer p = new DefaultMQProducer("producer-demo7");
        p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        p.start();

        String topic = "Topic7";

        while (true) {
            System.out.print("輸入訊息,用逗號分隔多條訊息: ");
            String[] a = new Scanner(System.in).nextLine().split(",");
            System.out.print("輸入Tag: ");
            String tag = new Scanner(System.in).nextLine();

            for (String s : a) {
                Message msg = new Message(topic, tag, s.getBytes());
                msg.putUserProperty("rnd", ""+new Random().nextInt(4));
                p.send(msg);
            }

        }
    }
}

消費者

package demo7;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Scanner;

/*
如果使用sql過濾,需要在 broker.properties 中新增配置來啟用 sql 過濾:

    enablePropertyFilter=true
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        System.out.print("使用Tag過濾還是使用Sql過濾(tag/sql): ");
        String ts = new Scanner(System.in).nextLine();

        DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo7");
        c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        if (ts.equalsIgnoreCase("tag")) {
            System.out.println("使用Tag過濾: TagA || TagB || TagC");
            c.subscribe("Topic7", "TagA || TagB || TagC");
        } else {
            System.out.println("使用Sql過濾: rnd=1 or rnd > 2");
            c.subscribe("Topic7", MessageSelector.bySql("rnd=1 or rnd > 2"));
        }

        c.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    System.out.println(new String(msg.getBody()) + " - " + msg.getUserProperty("rnd"));

                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        c.start();
        System.out.println("開始消費資料");
    }
}

事務訊息

RocketMQ 提供了可靠性訊息,也叫事務訊息。下面分析一下其原理。

事務訊息的原理

下面來看 RocketMQ 的事務訊息是如何來發送“可靠訊息”的,只需要以下三步:

  1. 傳送半訊息(半訊息不會發送給消費者)
  2. 執行本地事務
  3. 提交訊息

完成事務訊息傳送後,消費者就可以以正常的方式來消費資料。

RocketMQ 的自動重發機制在絕大多數情況下,都可以保證訊息被正確消費。

假如訊息最終消費失敗了,還可以由人工處理進行託底。

上面分析的是正常情況下的執行流程。下面再來看兩種錯誤情況:

  1. 事務執行失敗時回滾訊息
  2. 伺服器無法得知訊息狀態時,需要主動回查訊息狀態

回滾:

訊息回查:

生產者

package demo8;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.Scanner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;

public class Producer {
    public static void main(String[] args) throws MQClientException {
        TransactionMQProducer p = new TransactionMQProducer("producer-demo8");
        p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");

        p.setExecutorService(Executors.newFixedThreadPool(5));

        p.setTransactionListener(new TransactionListener() {

            ConcurrentHashMap<String, LocalTransactionState> localTx = new ConcurrentHashMap<>();

            /*
            在這裡執行本地事務
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                System.out.println("執行本地事務");
                if (Math.random()<0.333) {
                    System.out.println("本地事務執行成功, 按回車提交事務訊息");
                    new Scanner(System.in).nextLine();

                    localTx.put(message.getTransactionId(), LocalTransactionState.COMMIT_MESSAGE);
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if (Math.random()<0.666) {
                    System.out.println("本地事務執行失敗, 按回車回滾事務訊息");
                    new Scanner(System.in).nextLine();

                    localTx.put(message.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else {
                    System.out.println("本地事務執行情況未知, 按回車繼續");
                    new Scanner(System.in).nextLine();

                    localTx.put(message.getTransactionId(), LocalTransactionState.UNKNOW);
                    return LocalTransactionState.UNKNOW;
                }
            }

            /*
            回查方法
            檢測頻率預設1分鐘,可通過在broker.conf檔案中設定transactionCheckInterval的值來改變預設值,單位為毫秒。
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("伺服器正在回查訊息狀態");

                LocalTransactionState s = localTx.get(messageExt.getTransactionId());
                if (s == null || s == LocalTransactionState.UNKNOW) {
                    s = LocalTransactionState.ROLLBACK_MESSAGE;
                }
                return s;
            }
        });

        p.start();

        String topic = "Topic8";

        while (true) {
            System.out.print("輸入訊息,用逗號分隔多條訊息: ");
            String[] a = new Scanner(System.in).nextLine().split(",");

            for (String s : a) {
                Message msg = new Message(topic, s.getBytes());
                System.out.println("---------傳送半訊息-----------");
                TransactionSendResult r = p.sendMessageInTransaction(msg, null);
                System.out.println("事務訊息傳送結果: "+ r.getLocalTransactionState().name());
            }
        }
    }
}

消費者

package demo8;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/*

如果返回 RECONSUME_LATER, 伺服器會等待一會再重試傳送訊息

訊息屬性預設設定 DELAY=6, 等待時間為 2 分鐘,

                org/apache/rocketmq/store/config/MessageStoreConfig.java
                this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";


 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo8");
        c.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876");

        c.subscribe("Topic8", "*");

        c.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext ctx) {
                for (MessageExt msg : list) {
                    System.out.println(new String(msg.getBody()) + " - " + msg);
                }
                if (Math.random()<0.5) {
                    System.out.println("訊息處理完成");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } else {
                    System.out.println("訊息處理失敗, 要求伺服器稍後重試傳送訊息");
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });

        c.start();
        System.out.println("開始消費資料");
    }
}