1. 程式人生 > >ActiveMQ的兩種訊息形式。

ActiveMQ的兩種訊息形式。

一、訊息的傳遞型別

  • 點對點:即一個生產者和一個消費者一一對應
PTP的過程好比是兩個人打電話,這兩個人獨享這一條通訊鏈路。一方傳送訊息,另外一方接收 訊息 。在實際應用中因為有多個使用者對使用 PTP 的鏈路,它的通訊場景如下圖所示:
  • 釋出/訂閱:即一個生產者產生訊息並進行傳送後,可以由多個消費者進行接收。
釋出訂閱模式類似聽廣播,一個播音員廣播,可以有很多聽眾同時收聽。這種關係如下圖所示:

二、訊息的正文格式

JMS定義了五種不同的訊息正文格式,以及呼叫的訊息型別,允許你傳送並接收以一些不同形式的資料,提供現有訊息格式的一些級別的相容性。

  • StreamMessage    Java原始值的資料流
  • MapMessage        一套名稱-值對
  • TextMessage        一個字串物件
  • ObjectMessage    一個序列化的 Java物件

  • BytesMessage     一個位元組的資料流

-----------------------------------------------------------------------------------------------------------------------------

一、建立Maven測試專案

二、pom.xml

    <dependencies>         <!-- 訊息佇列 -->         <dependency>             
<groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>             <version>5.11.3</version>         </dependency>
    </dependencies>

三、生產者(訊息傳送者)

第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。

第二步:使用ConnectionFactory物件建立一個Connection物件。

第三步:開啟連線,呼叫Connection物件的start方法。

第四步:使用Connection物件建立一個Session物件。

第五步:使用Session物件建立一個Destination物件(topicqueue),此處建立一個Queue物件

第六步:使用Session物件建立一個Producer物件。

第七步:建立一個Message物件,建立一個TextMessage物件。

第八步:使用Producer物件傳送訊息。

第九步:關閉資源。

四、消費者(訊息接收者)

消費者:接收訊息。

第一步:建立一個ConnectionFactory物件。

第二步:從ConnectionFactory物件中獲得一個Connection物件。

第三步:開啟連線。呼叫Connection物件的start方法。

第四步:使用Connection物件建立一個Session物件。

第五步:使用Session物件建立一個Destination物件。和傳送端保持一致queue,並且佇列的名稱一致

第六步:使用Session物件建立一個Consumer物件。

第七步:接收訊息。

第八步:列印訊息。

第九步:關閉資源

測試1:先執行生產者,在執行消費者消費訊息

測試2:先測試消費者等待訊息到來,再執行生產者生產訊息

注意:消費者執行後會一直等待鍵盤輸入

package name.yaohuan.activemq; public class QueueTest {     @Test     public void testProducer() throws Exception {         // 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。         //brokerURL伺服器的ip及埠號         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.100.53:61616");         // 第二步:使用ConnectionFactory物件建立一個Connection物件。         Connection connection = connectionFactory.createConnection();         // 第三步:開啟連線,呼叫Connection物件的start方法。         connection.start();         // 第四步:使用Connection物件建立一個Session物件。         //第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。         //第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);         // 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。         //引數:佇列的名稱。         Queue queue = session.createQueue("test-queue");         // 第六步:使用Session物件建立一個Producer物件。         MessageProducer producer = session.createProducer(queue);         // 第七步:建立一個Message物件,建立一個TextMessage物件。         /*TextMessage message = new ActiveMQTextMessage();         message.setText("hello activeMq,this is my first test.");*/         TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");         // 第八步:使用Producer物件傳送訊息。         producer.send(textMessage);         // 第九步:關閉資源。         producer.close();         session.close();         connection.close();     }          @Test     public void testConsumer() throws Exception {         // 第一步:建立一個ConnectionFactory物件。         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.100.53:61616");         // 第二步:從ConnectionFactory物件中獲得一個Connection物件。         Connection connection = connectionFactory.createConnection();         // 第三步:開啟連線。呼叫Connection物件的start方法。         connection.start();         // 第四步:使用Connection物件建立一個Session物件。         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);         // 第五步:使用Session物件建立一個Destination物件。和傳送端保持一致queue,並且佇列的名稱一致。         Queue queue = session.createQueue("test-queue");         // 第六步:使用Session物件建立一個Consumer物件。         MessageConsumer consumer = session.createConsumer(queue);         // 第七步:接收訊息。         consumer.setMessageListener(new MessageListener() {                          @Override             public void onMessage(Message message) {                 try {                     TextMessage textMessage = (TextMessage) message;                     //取訊息的內容                     String text = textMessage.getText();                     // 第八步:列印訊息。                     System.out.println(text);                 } catch (JMSException e) {                     e.printStackTrace();                 }             }         });         //等待鍵盤輸入         System.in.read();//這句話會讓消費者一直線上         // 第九步:關閉資源         consumer.close();         session.close();         connection.close();     } }

注意: 訊息釋出後,點選檢視訊息時,在activemq中預設會報告錯誤,如下   原因: 此版本的activemq不支援執行在jdk1.8上,可以切換成jdk1.7 ActiveMQ的5.15版本可以支援jdk1.8

解決方案: 配置內建的jetty服務使用jdk1.7啟動 在activemq的 bin路徑下找到  env 檔案 修改其中的配置,將 #JAVA_HOME=""
改為 JAVA_HOME="/usr/local/jdk1.7.0_76/"

重啟activemq ./ activemq restart
 

------------------------------------------------------------------------------------------------------------

一、生產者(訊息傳送者)

第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。

第二步:使用ConnectionFactory物件建立一個Connection物件。

第三步:開啟連線,呼叫Connection物件的start方法。

第四步:使用Connection物件建立一個Session物件。

第五步:使用Session物件建立一個Destination物件(topicqueue),此處建立一個Topic物件。

第六步:使用 Session 物件建立一個 Producer 物件。

第七步:建立一個Message物件,建立一個TextMessage物件。

第八步:使用Producer物件傳送訊息。

第九步:關閉資源。

二、消費者(訊息接收者)

消費者:接收訊息。

第一步:建立一個ConnectionFactory物件。

第二步:從ConnectionFactory物件中獲得一個Connection物件。

第三步:開啟連線。呼叫Connection物件的start方法。

第四步:使用Connection物件建立一個Session物件。

第五步:使用Session物件建立一個Destination物件。和傳送端保持一致topic並且topic的名稱一致。

第六步:使用 Session 物件建立一個 Consumer 物件。

第七步:接收訊息。

第八步:列印訊息。

第九步:關閉資源


package name.yaohuan.activemq; public class TopicTest {     @Test     public void testProducer() throws Exception {         // 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。         // brokerURL伺服器的ip及埠號         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.100.53:61616");         // 第二步:使用ConnectionFactory物件建立一個Connection物件。         Connection connection = connectionFactory.createConnection();         // 第三步:開啟連線,呼叫Connection物件的start方法。         connection.start();         // 第四步:使用Connection物件建立一個Session物件。         // 第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。         // 第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);         // 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個topic物件。         // 引數:話題的名稱。         Topic topic = session.createTopic("test-topic");         // 第六步:使用Session物件建立一個Producer物件。         MessageProducer producer = session.createProducer(topic);         // 第七步:建立一個Message物件,建立一個TextMessage物件。         /*          * TextMessage message = new ActiveMQTextMessage(); message.setText(          * "hello activeMq,this is my first test.");          */         TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");         // 第八步:使用Producer物件傳送訊息。         producer.send(textMessage);         // 第九步:關閉資源。         producer.close();         session.close();         connection.close();     }          @Test     public void testConsumer() throws Exception {         // 第一步:建立一個ConnectionFactory物件。         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.100.53:61616");         // 第二步:從ConnectionFactory物件中獲得一個Connection物件。         Connection connection = connectionFactory.createConnection();         // 第三步:開啟連線。呼叫Connection物件的start方法。         connection.start();         // 第四步:使用Connection物件建立一個Session物件。         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);         // 第五步:使用Session物件建立一個Destination物件。和傳送端保持一致topic,並且話題的名稱一致。         Topic topic = session.createTopic("test-topic");         // 第六步:使用Session物件建立一個Consumer物件。         MessageConsumer consumer = session.createConsumer(topic);         // 第七步:接收訊息。         consumer.setMessageListener(new MessageListener() {             @Override             public void onMessage(Message message) {                 try {                     TextMessage textMessage = (TextMessage) message;                     // 取訊息的內容                     String text = textMessage.getText();                     // 第八步:列印訊息。                     System.out.println(text);                 } catch (JMSException e) {                     e.printStackTrace();                 }             }         });         System.out.println("topic的消費端03。。。。。");         // 等待鍵盤輸入         System.in.read();         // 第九步:關閉資源         consumer.close();         session.close();         connection.close();     } }