ActiveMQ的兩種訊息形式。
一、訊息的傳遞型別
- 點對點:即一個生產者和一個消費者一一對應
- 釋出/訂閱:即一個生產者產生訊息並進行傳送後,可以由多個消費者進行接收。
二、訊息的正文格式
JMS定義了五種不同的訊息正文格式,以及呼叫的訊息型別,允許你傳送並接收以一些不同形式的資料,提供現有訊息格式的一些級別的相容性。- StreamMessage Java原始值的資料流
- MapMessage 一套名稱-值對
- TextMessage 一個字串物件
- ObjectMessage 一個序列化的 Java物件
- BytesMessage 一個位元組的資料流
-----------------------------------------------------------------------------------------------------------------------------
一、建立Maven測試專案
二、pom.xml
<dependencies>
<!-- 訊息佇列 -->
<dependency>
</dependencies> |
三、生產者(訊息傳送者)
第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
第二步:使用ConnectionFactory物件建立一個Connection物件。
第三步:開啟連線,呼叫Connection物件的start方法。
第四步:使用Connection物件建立一個Session物件。
第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。
第六步:使用Session物件建立一個Producer物件。
第七步:建立一個Message物件,建立一個TextMessage物件。
第八步:使用Producer物件傳送訊息。
第九步:關閉資源。
四、消費者(訊息接收者)
消費者:接收訊息。
第一步:建立一個ConnectionFactory物件。
第二步:從ConnectionFactory物件中獲得一個Connection物件。
第三步:開啟連線。呼叫Connection物件的start方法。
第四步:使用Connection物件建立一個Session物件。
第五步:使用Session物件建立一個Destination物件。和傳送端保持一致queue,並且佇列的名稱一致。
第六步:使用Session物件建立一個Consumer物件。
第七步:接收訊息。
第八步:列印訊息。
第九步:關閉資源
測試1:先執行生產者,在執行消費者消費訊息
測試2:先測試消費者等待訊息到來,再執行生產者生產訊息
注意:消費者執行後會一直等待鍵盤輸入
package name.yaohuan.activemq; public class QueueTest { @Test public void testProducer() throws Exception { // 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。 //brokerURL伺服器的ip及埠號 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.100.53:61616"); // 第二步:使用ConnectionFactory物件建立一個Connection物件。 Connection connection = connectionFactory.createConnection(); // 第三步:開啟連線,呼叫Connection物件的start方法。 connection.start(); // 第四步:使用Connection物件建立一個Session物件。 //第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。 //第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。 //引數:佇列的名稱。 Queue queue = session.createQueue("test-queue"); // 第六步:使用Session物件建立一個Producer物件。 MessageProducer producer = session.createProducer(queue); // 第七步:建立一個Message物件,建立一個TextMessage物件。 /*TextMessage message = new ActiveMQTextMessage(); message.setText("hello activeMq,this is my first test.");*/ TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test."); // 第八步:使用Producer物件傳送訊息。 producer.send(textMessage); // 第九步:關閉資源。 producer.close(); session.close(); connection.close(); } @Test public void testConsumer() throws Exception { // 第一步:建立一個ConnectionFactory物件。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.100.53:61616"); // 第二步:從ConnectionFactory物件中獲得一個Connection物件。 Connection connection = connectionFactory.createConnection(); // 第三步:開啟連線。呼叫Connection物件的start方法。 connection.start(); // 第四步:使用Connection物件建立一個Session物件。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session物件建立一個Destination物件。和傳送端保持一致queue,並且佇列的名稱一致。 Queue queue = session.createQueue("test-queue"); // 第六步:使用Session物件建立一個Consumer物件。 MessageConsumer consumer = session.createConsumer(queue); // 第七步:接收訊息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; //取訊息的內容 String text = textMessage.getText(); // 第八步:列印訊息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); //等待鍵盤輸入 System.in.read();//這句話會讓消費者一直線上 // 第九步:關閉資源 consumer.close(); session.close(); connection.close(); } } |
注意: 訊息釋出後,點選檢視訊息時,在activemq中預設會報告錯誤,如下 原因: 此版本的activemq不支援執行在jdk1.8上,可以切換成jdk1.7 ActiveMQ的5.15版本可以支援jdk1.8
解決方案: 配置內建的jetty服務使用jdk1.7啟動 在activemq的 bin路徑下找到 env 檔案 修改其中的配置,將 #JAVA_HOME=""
改為 JAVA_HOME="/usr/local/jdk1.7.0_76/"
重啟activemq ./ activemq restart
------------------------------------------------------------------------------------------------------------
一、生產者(訊息傳送者)
第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
第二步:使用ConnectionFactory物件建立一個Connection物件。
第三步:開啟連線,呼叫Connection物件的start方法。
第四步:使用Connection物件建立一個Session物件。
第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Topic物件。
第六步:使用 Session 物件建立一個 Producer 物件。第七步:建立一個Message物件,建立一個TextMessage物件。
第八步:使用Producer物件傳送訊息。
第九步:關閉資源。
二、消費者(訊息接收者)
消費者:接收訊息。
第一步:建立一個ConnectionFactory物件。
第二步:從ConnectionFactory物件中獲得一個Connection物件。
第三步:開啟連線。呼叫Connection物件的start方法。
第四步:使用Connection物件建立一個Session物件。
第五步:使用Session物件建立一個Destination物件。和傳送端保持一致topic,並且topic的名稱一致。
第六步:使用 Session 物件建立一個 Consumer 物件。第七步:接收訊息。
第八步:列印訊息。
第九步:關閉資源
package name.yaohuan.activemq; public class TopicTest { @Test public void testProducer() throws Exception { // 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。 // brokerURL伺服器的ip及埠號 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.100.53:61616"); // 第二步:使用ConnectionFactory物件建立一個Connection物件。 Connection connection = connectionFactory.createConnection(); // 第三步:開啟連線,呼叫Connection物件的start方法。 connection.start(); // 第四步:使用Connection物件建立一個Session物件。 // 第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。 // 第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個topic物件。 // 引數:話題的名稱。 Topic topic = session.createTopic("test-topic"); // 第六步:使用Session物件建立一個Producer物件。 MessageProducer producer = session.createProducer(topic); // 第七步:建立一個Message物件,建立一個TextMessage物件。 /* * TextMessage message = new ActiveMQTextMessage(); message.setText( * "hello activeMq,this is my first test."); */ TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test"); // 第八步:使用Producer物件傳送訊息。 producer.send(textMessage); // 第九步:關閉資源。 producer.close(); session.close(); connection.close(); } @Test public void testConsumer() throws Exception { // 第一步:建立一個ConnectionFactory物件。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.100.53:61616"); // 第二步:從ConnectionFactory物件中獲得一個Connection物件。 Connection connection = connectionFactory.createConnection(); // 第三步:開啟連線。呼叫Connection物件的start方法。 connection.start(); // 第四步:使用Connection物件建立一個Session物件。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session物件建立一個Destination物件。和傳送端保持一致topic,並且話題的名稱一致。 Topic topic = session.createTopic("test-topic"); // 第六步:使用Session物件建立一個Consumer物件。 MessageConsumer consumer = session.createConsumer(topic); // 第七步:接收訊息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; // 取訊息的內容 String text = textMessage.getText(); // 第八步:列印訊息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic的消費端03。。。。。"); // 等待鍵盤輸入 System.in.read(); // 第九步:關閉資源 consumer.close(); session.close(); connection.close(); } } |