《商城專案06》--用ActiveMQ實現訊息的傳送和接收
阿新 • • 發佈:2018-12-16
一, 使用場景
對商品資訊進行操作的同時, 將資料同步到solr庫, 實現該需求有以下幾種方式:
方式1: 在e3-manager-service新增商品資訊的實現類中直接寫將資料新增到solr庫; <弊端: 負責商品資訊操作的開發人員不一定對solr熟悉, 所以得分離出來寫, 這裡可以直接呼叫寫好的solr介面>
方式2: 單獨寫一solr服務 e3-xxx-service, 實現將商品資訊資料同步到solr庫; <弊端: 服務之間的呼叫耦合性太強, 如果兩服務存在互相呼叫關係, 則應用啟動時, 必定因找不到呼叫的服務而報錯! >
方式3: 用ActiveMQ訊息佇列中介軟體, 解除服務之間呼叫的耦合性;
二,ActiveMQ的下載安裝(windows環境下)
1, 資源下載
連結:https://pan.baidu.com/s/1ZtHxbtfNOnngpYd4Dhpgww
提取碼:z0ta
2, 解壓使用
2.1 解壓, 自定義 存放路徑
2.2 啟動MQ
直接雙擊啟動會有點問題, 不清楚什麼原因出現閃退
所以這裡採用cmd命令方式啟動:
cmd --> cd xxx\apache-activemq-5.12.0\bin -->activemq.bat start -->回車
2.3 測試是否啟動成功
(URL: http://localhost:8161/admin/)
登入: admin/admin
能正確顯示出以下頁面即為ok
三, ActiveMQ的測試使用
1, 導包 (activemq-all-5.11.2.jar)
2, 具體測試方法
2.1 Queue模式(一對一)
//新建服務提供者Queue, 進行訊息傳送 @Test public void testQueueProducer() throws Exception { // 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。 //brokerURL伺服器的ip及埠號 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 第二步:使用ConnectionFactory物件建立一個Connection物件。 Connection connection = connectionFactory.createConnection(); // 第三步:開啟連線,呼叫Connection物件的start方法。 connection.start(); // 第四步:使用Connection物件建立一個Session物件。 //第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。 //第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答 2、手動應答。一般是自動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。 //引數:佇列的名稱。 //Queue queue = session.createQueue("test-queue"); Queue queue = session.createQueue("spring-queue"); // 第六步:使用Session物件建立一個Producer物件。 MessageProducer producer = session.createProducer(queue); // 第七步:建立一個Message物件,建立一個TextMessage物件。 /*TextMessage message = new ActiveMQTextMessage(); message.setText("hello activeMq,this is my first test.");*/ TextMessage textMessage = session.createTextMessage("hello activeMq, this is my first test."); // 第八步:使用Producer物件傳送訊息。 producer.send(textMessage); // 第九步:關閉資源。 producer.close(); session.close(); connection.close(); }
//新建服務消費者Consunmer, 對訊息進行接收
@Test
public void testQueueConsumer() throws Exception {
// 第一步:建立一個ConnectionFactory物件。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 第二步:從ConnectionFactory物件中獲得一個Connection物件。
Connection connection = connectionFactory.createConnection();
// 第三步:開啟連線。呼叫Connection物件的start方法。
connection.start();
// 第四步:使用Connection物件建立一個Session物件。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session物件建立一個Destination物件。和傳送端保持一致queue,並且佇列的名稱一致。
//Queue queue = session.createQueue("test-queue");
Queue queue = session.createQueue("spring-queue");
// 第六步:使用Session物件建立一個Consumer物件。
MessageConsumer consumer = session.createConsumer(queue);
// 第七步:接收訊息。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = null;
//取訊息的內容
text = textMessage.getText();
// 第八步:列印訊息。
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//等待鍵盤輸入
System.in.read();
// 第九步:關閉資源
consumer.close();
session.close();
connection.close();
}
2.2 Topic廣播模式(一對多 )
//新建服務提供者TopicProducer, 進行訊息廣播式傳送
@Test
public void testTopicProducer() throws Exception {
// 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
// brokerURL伺服器的ip及埠號
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 第二步:使用ConnectionFactory物件建立一個Connection物件。
Connection connection = connectionFactory.createConnection();
// 第三步:開啟連線,呼叫Connection物件的start方法。
connection.start();
// 第四步:使用Connection物件建立一個Session物件。
// 第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。
// 第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個topic物件。
// 引數:話題的名稱。
Topic topic = session.createTopic("test-topic");
// 第六步:使用Session物件建立一個Producer物件。
MessageProducer producer = session.createProducer(topic);
// 第七步:建立一個Message物件,建立一個TextMessage物件。
/*
* TextMessage message = new ActiveMQTextMessage(); message.setText(
* "hello activeMq,this is my first test.");
*/
TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
// 第八步:使用Producer物件傳送訊息。
producer.send(textMessage);
// 第九步:關閉資源。
producer.close();
session.close();
connection.close();
}
//新建服務消費者Consunmer,對訊息進行接收,該方法可以並行啟動多次,相當於同時有多個訊息接收者
@Test
public void testTopicConsumer() throws Exception {
// 第一步:建立一個ConnectionFactory物件。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 第二步:從ConnectionFactory物件中獲得一個Connection物件。
Connection connection = connectionFactory.createConnection();
// 第三步:開啟連線。呼叫Connection物件的start方法。
connection.start();
// 第四步:使用Connection物件建立一個Session物件。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session物件建立一個Destination物件。和傳送端保持一致topic,並且話題的名稱一致。
Topic topic = session.createTopic("test-topic");
// 第六步:使用Session物件建立一個Consumer物件。
MessageConsumer consumer = session.createConsumer(topic);
// 第七步:接收訊息。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = null;
// 取訊息的內容
text = textMessage.getText();
// 第八步:列印訊息。
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.out.println("topic的消費端01。。。。。");
// System.out.println("topic的消費端02。。。。。");
// 等待鍵盤輸入
System.in.read();
// 第九步:關閉資源
consumer.close();
session.close();
connection.close();
}
備註:
Topic廣播模式需要在訊息接收方法啟動的狀態下, 再進行訊息傳送才能被接收到; 如果沒有訊息接收者的情況下, 傳送的訊息會隨即被銷燬;