ActiveMQ之虛擬主題和映象佇列
轉自:http://blog.csdn.net/zhu_tianwei/article/details/46303419, 略做補充
ActiveMQ支援的虛擬Destinations分為有兩種,分別是
1.虛擬主題(Virtual Topics)2.組合 Destinations(CompositeDestinations)
這兩種虛擬Destinations可以看做對簡單的topic和queue用法的補充,基於它們可以實現一些簡單有用的EIP功能,虛擬主題類似於1對多的分支功能+消費端的cluster+failover,組合Destinations類似於簡單的destinations直接的路由功能。
虛擬主題(Virtual Topics)
ActiveMQ中,topic只有在持久訂閱(durablesubscription)下是持久化的。存在持久訂閱時,每個持久訂閱者,都相當於一個持久化的queue的客戶端,它會收取所有訊息。這種情況下存在兩個問題:
1.同一應用內consumer端負載均衡的問題:同一個應用上的一個持久訂閱不能使用多個consumer來共同承擔訊息處理功能。因為每個都會獲取所有訊息。queue模式可以解決這個問題,broker端又不能將訊息傳送到多個應用端。所以,既要釋出訂閱,又要讓消費者分組,這個功能jms規範本身是沒有的。
2.同一應用內consumer端failover的問題:由於只能使用單個的持久訂閱者,如果這個訂閱者出錯,則應用就無法處理訊息了,系統的健壯性不高。
為了解決這兩個問題,ActiveMQ中實現了虛擬Topic的功能。使用起來非常簡單。
對於訊息釋出者來說,就是一個正常的Topic,名稱以VirtualTopic.開頭。例如VirtualTopic.TEST。
對於訊息接收端來說,是個佇列,不同應用裡使用不同的字首作為佇列的名稱,即可表明自己的身份即可實現消費端應用分組。例如Consumer.A.VirtualTopic.TEST,說明它是名稱為A的消費端,同理Consumer.B.VirtualTopic.TEST說明是一個名稱為B的客戶端。可以在同一個應用裡使用多個consumer消費此queue,則可以實現上面兩個功能。又因為不同應用使用的queue名稱不同(字首不同),所以不同的應用中都可以接收到全部的訊息。每個客戶端相當於一個持久訂閱者,而且這個客戶端可以使用多個消費者共同來承擔消費任務。
生產者:
- package cn.slimsmart.activemq.demo.virtualtopic;
- import javax.jms.Connection;
- import javax.jms.DeliveryMode;
- import javax.jms.JMSException;
- import javax.jms.MessageProducer;
- import javax.jms.Session;
- import javax.jms.TextMessage;
- import javax.jms.Topic;
- import org.apache.activemq.ActiveMQConnectionFactory;
- publicclass Producer {
- publicstaticvoid main(String[] args) throws JMSException {
- // 連線到ActiveMQ伺服器
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.18.67:61616");
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
- // 建立主題
- Topic topic = session.createTopic("VirtualTopic.TEST");
- MessageProducer producer = session.createProducer(topic);
- // NON_PERSISTENT 非持久化 PERSISTENT 持久化,傳送訊息時用使用持久模式
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- TextMessage message = session.createTextMessage();
- message.setText("topic 訊息。");
- message.setStringProperty("property", "訊息Property");
- // 釋出主題訊息
- producer.send(message);
- System.out.println("Sent message: " + message.getText());
- session.close();
- connection.close();
- }
- }
消費者:
- package cn.slimsmart.activemq.demo.virtualtopic;
- import javax.jms.Connection;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageConsumer;
- import javax.jms.MessageListener;
- import javax.jms.Queue;
- import javax.jms.Session;
- import javax.jms.TextMessage;
- import org.apache.activemq.ActiveMQConnectionFactory;
- publicclass Consumer {
- publicstaticvoid main(String[] args) throws JMSException, InterruptedException {
- // 連線到ActiveMQ伺服器
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.18.67:61616");
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
- // 建立主題
- Queue topicA = session.createQueue("Consumer.A.VirtualTopic.TEST");
- Queue topicB = session.createQueue("Consumer.B.VirtualTopic.TEST");
- // 消費者A組建立訂閱
- MessageConsumer consumerA1 = session.createConsumer(topicA);
- consumerA1.setMessageListener(new MessageListener() {
- // 訂閱接收方法
- publicvoid onMessage(Message message) {
- TextMessage tm = (TextMessage) message;
- try {
- System.out.println("Received message A1: " + tm.getText()+":"+tm.getStringProperty("property"));
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
- MessageConsumer consumerA2 = session.createConsumer(topicA);
- consumerA2.setMessageListener(new MessageListener() {
- // 訂閱接收方法
- publicvoid onMessage(Message message) {
- TextMessage tm = (TextMessage) message;
- try {
- System.out.println("Received message A2: " + tm.getText()+":"+tm.getStringProperty("property"));
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
- //消費者B組建立訂閱
- MessageConsumer consumerB1 = session.createConsumer(topicB);
- consumerB1.setMessageListener(new MessageListener() {
- // 訂閱接收方法
- publicvoid onMessage(Message message) {
- TextMessage tm = (TextMessage) message;
- try {
- System.out.println("Received message B1: " + tm.getText()+":"+tm.getStringProperty("property"));
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
- MessageConsumer consumerB2 = session.createConsumer(topicB);
- consumerB2.setMessageListener(new MessageListener() {
- // 訂閱接收方法
- publicvoid onMessage(Message message) {
- TextMessage tm = (TextMessage) message;
- try {
- System.out.println("Received message B2: " + tm.getText()+":"+tm.getStringProperty("property"));
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
- session.close();
- connection.close();
- }
- }
從queue接收到的訊息,message.getJMSDestination().toString()為topic://VirtualTopic.TEST,即原始的destination。訊息的persistent屬性為true,即每個相當於一個持久訂閱。
A1和A2為一個應用,B1和B2為一個應用,2組應用內部做負載,和failover。
Virtual Topic這個功能特性在broker上有個總開關,useVirtualTopics屬性,預設為true,設定為false即可關閉此功能。當此功能開啟,並且使用了持久化的儲存時,broker啟動的時候會從持久化儲存裡拿到所有的destinations的名稱,如果名稱模式與Virtual Topics匹配,則把它們新增到系統的Virtual Topics列表中去。當然,沒有顯式定義的Virtual Topics,也可以直接使用的,系統會自動建立對應的實際topic。當有consumer訪問此VirtualTopics時,系統會自動建立持久化的queue,並在每次Topic收到訊息時,分發到具體的queue。
消費端使用的queue名稱字首的Consumer是可以修改的。示例如下: