MQ叢集方式訂閱訊息
阿新 • • 發佈:2019-01-28
叢集訂閱即某個消費者叢集只消費指定的 Topic,而不是消費所有 Topic。
請確保同一個 Consumer ID 下所有 Consumer 例項的訂閱關係保持一致,具體請參考訂閱關係一致文件。
publicclassConsumerTest{
publicstaticvoid main(String[] args){
Properties properties =newProperties();
properties.put(PropertyKeyConst.ConsumerId,"XXX");// 您在控制檯建立的 Consumer ID
properties.put
properties.put(PropertyKeyConst.SecretKey,"XXX");// SecretKey 阿里雲身份驗證,在阿里雲伺服器管理控制檯建立
//PropertyKeyConst.ONSAddr地址請根據實際情況對應以下幾類進行輸入:
//公共雲生產環境:http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
//公共雲公測環境:http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
//杭州金融雲環境:http://jbponsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
//杭州深圳雲環境:http://mq4finance-sz.addr.aliyun.com:8080/rocketmq/nsaddr4client-internal
//亞太東南1公共雲環境(只適用於新加坡ECS):http://ap-southeastaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal
properties.put(PropertyKeyConst.ONSAddr,
"http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal"
Consumer consumer =ONSFactory.createConsumer(properties);
consumer.subscribe("TopicTestMQ","TagA||TagB",newMessageListener(){//訂閱多個Tag
publicAction consume(Message message,ConsumeContext context){
System.out.println("Receive: "+ message);
returnAction.CommitMessage;
}
});
//訂閱另外一個Topic
consumer.subscribe("TopicTestMQ-Other","*",newMessageListener(){//訂閱全部Tag
publicAction consume(Message message,ConsumeContext context){
System.out.println("Receive: "+ message);
returnAction.CommitMessage;
}
});
consumer.start(); //訂閱主題後才開啟消費叢集
System.out.println("Consumer Started");
}
}