ActiveMQ簡單介紹之備忘筆記
阿新 • • 發佈:2019-02-03
一.ActiveMQ接收/傳送訊息流程
- 建立ConnectionFactory工廠物件,需要填入使用者名稱、密碼、以及要連線的地址,均使用預設即可,預設埠為tcp://localhost:61616
- 通過ConnectionFactory工廠物件我們建立一個Connection連線,並且呼叫Connection的start方法開啟連線,Connection預設是關閉的。
- 通過Connection物件建立Session會話(上下文環境物件),用於接收訊息,引數配置1為是否啟用事物,引數配置2為簽收模式,一般我們設定自動簽收。
- 通過Session建立Destination物件,指的是一個客戶端用來指定生產訊息目標和消費訊息來源的物件,在PTP模式中,Destination被稱作Queue即佇列;在Pub/Sub模式Destination被稱作Topic即主題。在程式中可以使用多個Queue和Topic.
- 我們需要通過Session物件建立訊息的傳送和接收物件(生產者和消費者)MessageProducer/MessageConsumer.
- 我們可以使用MessageProducer的setDeliveryMode方法為其設定持久化特性和非持久化特性。
- 最後我們使用JSM規範的TextMessage形式(有多種形式,我假設為這種)建立資料(通過Session物件),並用MessageProducer的send方法傳送資料。同理客戶端使用receive方法進行接收資料。最後不要忘記關閉Connection連線。
大致的流程圖如下:
簡單的程式碼編寫模擬一個MQ的生產者和消費者:
相關背景介紹:
1. ActiveMQ傳遞訊息的兩種方式
1)點對點方式(PTP):一個消費者對應一個生產者。
2)釋出/訂閱模式(Publish/Sub):一個生產者產生訊息傳送後,可以被多個消費者進行接收。
2. JMS定義了五種訊息正文格式,以及訊息的呼叫型別,允許傳送和接收一些不同型別的資料,提供現有訊息格式的一些級別的相容性。
StreamMessage:–JAVA原始的資料流
TextMessage:一個字串物件
ObjectMessage:一個系列化的java物件
BytesMessage:一個位元組物件
MapMessage:key/value方式的鍵值對
以下程式碼模擬PTP模式:
生產者:
public static void main(String[] args) throws Exception {
//第一步建立ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin" , "tcp://localhost:61616");
//第二步建立Connection
Connection connection = connectionFactory.createConnection();
//第三步啟動Connection的start
connection.start();
//第四步建立Session
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//第五步 建立Destination
Queue destination=session.createQueue("myQueue");
//第六步 使用Destination建立MessageProducer/MessageConsumer
MessageProducer Producer = session.createProducer(destination); //當前是市場這
Producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//第七步 利用Producer/Consumer傳送或接收訊息
for(int i=1;i<=5;i++){
TextMessage message=session.createTextMessage("我是生產者"+i);
Producer.send(message);
}
// session.commit(); //如果connection.createSession的引數為true,則是事物。 需要用session提交
if(connection!=null){
connection.close();
}
}
消費者:
public static void main(String[] args) throws Exception {
// 第一步建立ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"admin", "admin", "tcp://localhost:61616");
// 第二步建立Connection
Connection connection = connectionFactory.createConnection();
// 第三步啟動Connection的start
connection.start();
// 第四步建立Session
Session session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 第五步 建立Destination
Queue destination = session.createQueue("myQueue");
// 第六步 使用Destination建立MessageProducer/MessageConsumer
MessageConsumer consumer = session.createConsumer(destination);
// 第七步 利用Producer/Consumer傳送或接收訊息
while (true) {
TextMessage msg = (TextMessage) consumer.receive();
if (msg == null) {
break;
}
System.out.println("收到的內容:" + msg.getText());
}
if (connection != null) {
connection.close();
}
}
二.Spring整合JMS的三種訊息監聽器
- MessageListener
MessageListener是最原始的訊息監聽器,它是JMS規範中定義的一個介面。其中定義了一個用於處理接收到的訊息的onMessage方法,該方法只接收一個Message引數。
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class ConsumerMessageListener implements MessageListener {
public void onMessage(Message message) {
//這裡我們知道生產者傳送的就是一個純文字訊息,所以這裡可以直接進行強制轉換,或者直接把onMessage方法的引數改成Message的子類TextMessage
TextMessage textMsg = (TextMessage) message;
System.out.println("接收到一個純文字訊息。");
try {
System.out.println("訊息內容是:" + textMsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
- SessionAwareMessageListener
SessionAwareMessageListener是Spring為我們提供的,它不是標準的JMS MessageListener。MessageListener的設計只是純粹用來接收訊息的,假如我們在使用MessageListener處理接收到的訊息時我們需要傳送一個訊息通知對方我們已經收到這個訊息了,那麼這個時候我們就需要在程式碼裡面去重新獲取一個Connection或Session。SessionAwareMessageListener的設計就是為了方便我們在接收到訊息後傳送一個回覆的訊息,它同樣為我們提供了一個處理接收到的訊息的onMessage方法,但是這個方法可以同時接收兩個引數,一個是表示當前接收到的訊息Message,另一個就是可以用來發送訊息的Session物件。
package com.tiantian.springintejms.listener;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.jms.listener.SessionAwareMessageListener;
public class ConsumerSessionAwareMessageListener implements
SessionAwareMessageListener<TextMessage> {
private Destination destination;
public void onMessage(TextMessage message, Session session) throws JMSException {
System.out.println("收到一條訊息");
System.out.println("訊息內容是:" + message.getText());
MessageProducer producer = session.createProducer(destination);
Message textMessage = session.createTextMessage("ConsumerSessionAwareMessageListener。。。");
producer.send(textMessage);
}
public Destination getDestination() {
returndestination;
}
public void setDestination(Destination destination) {
this.destination = destination;
}
}
MessageListenerAdapter
MessageListenerAdapter類實現了MessageListener介面和SessionAwareMessageListener介面,它的主要作用是將接收到的訊息進行型別轉換,然後通過反射的形式把它交給一個普通的Java類進行處理。MessageListenerAdapter會把接收到的訊息做如下轉換:
TextMessage轉換為String物件;BytesMessage轉換為byte陣列;
MapMessage轉換為Map物件;
ObjectMessage轉換為對應的Serializable物件。