1. 程式人生 > >MQ叢集方式訂閱訊息

MQ叢集方式訂閱訊息

叢集訂閱即某個消費者叢集只消費指定的 Topic,而不是消費所有 Topic。

請確保同一個 Consumer ID 下所有 Consumer 例項的訂閱關係保持一致,具體請參考訂閱關係一致文件。

  1. publicclassConsumerTest{
  2. publicstaticvoid main(String[] args){
  3. Properties properties =newProperties();
  4. properties.put(PropertyKeyConst.ConsumerId,"XXX");// 您在控制檯建立的 Consumer ID
  5. properties.put
    (PropertyKeyConst.AccessKey,"XXX");// AccessKey 阿里雲身份驗證,在阿里雲伺服器管理控制檯建立
  6. properties.put(PropertyKeyConst.SecretKey,"XXX");// SecretKey 阿里雲身份驗證,在阿里雲伺服器管理控制檯建立
  7. //PropertyKeyConst.ONSAddr地址請根據實際情況對應以下幾類進行輸入:
  8. //公共雲生產環境:http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
  9. //公共雲公測環境:http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
  10. //杭州金融雲環境:http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
  11. //杭州深圳雲環境:http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
  12. //亞太東南1公共雲環境(只適用於新加坡ECS):http://ap-southeastaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
  13. properties.put(PropertyKeyConst.ONSAddr,
  14. "http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"
    );//此處以公共雲生產環境為例
  15. Consumer consumer =ONSFactory.createConsumer(properties);
  16. consumer.subscribe("TopicTestMQ","TagA||TagB",newMessageListener(){//訂閱多個Tag
  17. publicAction consume(Message message,ConsumeContext context){
  18. System.out.println("Receive: "+ message);
  19. returnAction.CommitMessage;
  20. }
  21. });
  22. //訂閱另外一個Topic
  23. consumer.subscribe("TopicTestMQ-Other","*",newMessageListener(){//訂閱全部Tag
  24. publicAction consume(Message message,ConsumeContext context){
  25. System.out.println("Receive: "+ message);
  26. returnAction.CommitMessage;
  27. }
  28. });
  29. consumer.start(); //訂閱主題後才開啟消費叢集
  30. System.out.println("Consumer Started");
  31. }
  32. }