ActiveMQ學習總結------原生實戰操作(下)03
本篇將繼續延續上一篇的內容,作為知識補充篇,為接下來我們學習spring整合ActiveMQ打好基礎
本篇主要學習內容:
1.ActiveMQ 佇列服務監聽
2.ActiveMQ Topic模型
回顧下上一篇ActiveMQ學習總結我們學習到了:
1.ActiveMQ術語及API介紹
2.ActiveMQ 文字訊息處理
3.ActiveMQ 物件訊息處理
相信大現在對ActiveMQ的一些簡單操作已經很輕鬆掌握了
上一篇文章地址:https://www.cnblogs.com/arebirth/p/activemq02.html
一 ActiveMQ實現佇列服務監聽
在我們上一篇的練習中,所有的消費者都是接收一次訊息即斷開連線,這樣是不是很不方便。
試想一下,如果我們的provider在consumer接收完第一條訊息後又繼續傳送了一條訊息,那麼consumer已經斷開連線了,是不是就不能連線不間斷的實時獲取訊息?
解決方案:
很容易,用我們的佇列服務監聽即可
注*:根據上一章的學習,大家對環境搭建使用配置,肯定都已經相當清楚了,這裡就不過多闡述,直接進行程式碼實戰
1 訊息生產者
相比之下,我麼你的生產者照之前是沒有任何變化的,主要的變化還是在cosumer身上
package cn.arebirth.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQQueueListenerProducer { public static void sendTextActiveMq(String txt) { //定義連結工廠 ConnectionFactory connectionFactory = null; //定義連結物件 Connection connection = null; //定義會話 Session session = null; //目的地 Destination destination = null; //定義訊息的傳送者 MessageProducer producer = null; //定義訊息 Message message = null; try { //建立連結工廠 connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //建立連結誒物件 connection = connectionFactory.createConnection(); //啟動連結 connection.start(); //建立會話 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目的地 destination = session.createQueue("queue-listener"); //建立訊息生產者 producer = session.createProducer(destination); //建立訊息物件 message = session.createTextMessage(txt); //傳送訊息 producer.send(message); } catch (Exception ex) { ex.printStackTrace(); } finally { //回收資源 if (producer != null) { try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
2 訊息消費者
package cn.arebirth.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQQueueListenerConsumer { public static void receiveTextActiveMq() { // 定義連結工廠 ConnectionFactory connectionFactory = null; // 定義連結物件 Connection connection = null; // 定義會話 Session session = null; // 目的地 Destination destination = null; // 定義訊息的傳送者 MessageConsumer consumer = null; // 定義訊息 Message message = null; try { //建立連結工廠 connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //建立連結物件 connection = connectionFactory.createConnection(); //啟動連結 connection.start(); //建立會話 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目的地 destination = session.createQueue("queue-listener"); //建立訊息消費者 consumer = session.createConsumer(destination); //佇列服務監聽 consumer.setMessageListener(new MessageListener() { //ActiveMQ回撥方法。通過該方法將訊息傳遞到consumer @Override public void onMessage(Message message) { //處理訊息 String msg = null; try { msg = ((TextMessage) message).getText(); } catch (JMSException e) { e.printStackTrace(); } System.out.println("Producer say:" + msg); } }); } catch (Exception ex) { ex.printStackTrace(); } } }
3 測試
3.1 provider測試
package cn.arebirth.mq; public class ProducerTest { public static void main(String[] args) { ActiveMQQueueListenerProducer.sendTextActiveMq("Hello,consumer!"); } }觀察我們的控制檯可以發現已經成功釋出到佇列
3.2 consumer測試
package cn.arebirth.mq; public class ConsumerTest { public static void main(String[] args) { ActiveMQQueueListenerConsumer.receiveTextActiveMq(); } }我們執行後可以發現,它接收到了訊息,但是它的程序並沒有關閉,
我們用provider繼續釋出一條訊息,看看consumer能不能接收到
可以看到,consumer持續在後臺監聽我們釋出的訊息,
通過上面程式碼,不難發現,provider沒有任何改動,只是consumer修改了一部分
通過呼叫匿名內部類的方法來實現持續監聽
consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { } }
注意:因為涉及到佇列持續監聽,所以我們不能在finally處給資源回收,否則還在監聽狀態,資源都回收沒了,也就無從監聽啦。
二 Topic模型
在本系列文章第一篇也有介紹過一些Topic模型的概念,那麼這裡我們將以原理+實戰的方式來帶領大家掌握
1 Publish/Subscribe處理模式(Topic)
訊息生產者(釋出)訊息到topic中,同時有多個訊息消費者(訂閱)消費該訊息。
和點對點方式不同,釋出到Topic的訊息會被所有的訂閱者消費,而點對點的只能是指定的消費者去消費
當生產者釋出訊息,不管是否有消費者,都不會儲存訊息,也就是說它是發完就啥也不管了那種,
所以要注意:一定要有消費者,然後在有生產者,否則生產者不發完訊息什麼也不管了,你消費者在生產者之後才有,那麼你是接收不到訊息的。
接下來我們就以實戰的方式鼓搗下。
2 建立生產者
package cn.arebirth.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQTopicProducer { public static void sendTextActiveMQ(String txt){ //定義連結工廠 ConnectionFactory connectionFactory = null; //定義連結物件 Connection connection = null; //定義會話 Session session = null; //目的地 Destination destination = null; //定義訊息的傳送者 MessageProducer producer = null; //定義訊息 Message message = null; try { //建立連結工廠 connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //建立連結誒物件 connection = connectionFactory.createConnection(); //啟動連結 connection.start(); //建立會話 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目的地 destination = session.createTopic("topic-test"); //建立訊息生產者 producer = session.createProducer(destination); //建立訊息物件 message = session.createTextMessage(txt); //傳送訊息 producer.send(message); } catch (Exception ex) { ex.printStackTrace(); } finally { //回收資源 if (producer != null) { try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
我們可以發現,在建立目的地destination的時候程式碼有了變動
destination = session.createTopic("topic-test");
變成了createTopic,對這就是topic模式了。
3 建立消費者
package cn.arebirth.mq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class ActiveMQTopicConsumer implements Runnable { public static void receiveTextActiveMQ(String threadName) { // 定義連結工廠 ConnectionFactory connectionFactory = null; // 定義連結物件 Connection connection = null; // 定義會話 Session session = null; // 目的地 Destination destination = null; // 定義訊息的傳送者 MessageConsumer consumer = null; // 定義訊息 Message message = null; try { //建立連結工廠 connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616"); //建立連結物件 connection = connectionFactory.createConnection(); //啟動連結 connection.start(); //建立會話 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目的地 destination = session.createTopic("topic-test"); //建立訊息的消費者 consumer = session.createConsumer(destination); //服務監聽 consumer.setMessageListener(new MessageListener() { //ActiveMQ回撥方法。通過該方法將訊息傳遞到consumer @Override public void onMessage(Message message) { //處理訊息 String msg = null; try { msg = ((TextMessage) message).getText(); } catch (JMSException e) { e.printStackTrace(); } System.out.println(threadName + "--Producer say:" + msg); } }); } catch (Exception ex) { ex.printStackTrace(); } } @Override public void run() { receiveTextActiveMQ(Thread.currentThread().getName()); } }
我們可以發現,在建立目的地destination的時候程式碼有了變動
destination = session.createTopic("topic-test");
還有實現了Runnable這個是為了一會測試的時候,多執行緒啟動,看效果,是否多個都會接受到,(如果看著糊塗的話,你也可以去掉執行緒的部分,單獨複製多個物件,並啟動,效果也是一樣的)
4 測試(要先啟動消費者,否則消費者是接收不到訊息的!當然,你自己可以試一下)
4.1 測試消費者
package cn.arebirth.mq; public class ConsumerTest { public static void main(String[] args) { ActiveMQTopicConsumer a1 = new ActiveMQTopicConsumer(); Thread t1 = new Thread(a1,"a1"); ActiveMQTopicConsumer a2 = new ActiveMQTopicConsumer(); Thread t2 = new Thread(a2,"a2"); ActiveMQTopicConsumer a3 = new ActiveMQTopicConsumer(); Thread t3 = new Thread(a3,"a3"); t1.start(); t2.start(); t3.start(); } }
可以看到,我們的消費者已經啟動了,三個執行緒。並以監聽服務的方式啟動
4.2 測試生產者
package cn.arebirth.mq; public class ProducerTest { public static void main(String[] args) { ActiveMQTopicProducer.sendTextActiveMQ("hello,topic"); } }
可以看到,在topics下面,我們釋出的內容已經有記錄了
然後我們在看下,我們的consumer
可以發現,三個consumer都已經接收到了
ps:
如果你對ActiveMQ原理性的東西感到困惑,可以看下我們前面的文章:https://www.cnblogs.com/arebirth/p/activemq01.html
&n