ActiveMQ Topic消息重發
阿新 • • 發佈:2019-02-21
get mave oid tps ESS scope cnblogs 多重 ali
MQ學習系列:
- 消息隊列概念與認知
- ActiveMQ Topic消息重發
ActiveMQ Topic 消息重發
準備工作
windows下ActiveMQ的下載與啟動
- 百度的教程:鏈接 ←這裏包含基本的下載安裝啟動以及簡單的配置賬號
- 登錄控制臺主頁:http://localhost:8161/admin/
啟動錯誤以及解決方案
activeMQ啟動錯誤 BeanFactory not initialized
- https://blog.csdn.net/huang_sheng0527/article/details/75276113
- https://blog.csdn.net/qingshuiwater/article/details/82498091
JMS 消息確認機制
在session接口中定義的幾個常量:
- AUTO_ACKNOWLEDGE = 1 自動確認
- CLIENT_ACKNOWLEDGE = 2 客戶端手動確認
- DUPS_OK_ACKNOWLEDGE = 3 自動批量確認
- SESSION_TRANSACTED = 0 事務提交並確認
代碼實現
消息消費端在創建Session對象時需要指定應答模式為客戶端手動應答,當消費者獲取到消息並成功處理後需要調用message.acknowledge()方法進行應答,通知Broker消費成功。如果處理過程中出現異常,需要調用session.recover()通知Broker重復消息,默認最多重復6次。
- 創建maven項目引入依賴
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-client --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.8</version> </dependency> <!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies>
- 編寫測試方法模擬【無消息重發的正常情況】
package org.newmean;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import javax.jms.*;
public class ActiveMQTest {
//消息發送方-producter
@Test
public void test1() throws JMSException {
//創建連接工廠對象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//從工廠中獲取一個連接對象
Connection connection = connectionFactory.createConnection();
//連接MQ服務
connection.start();
//獲取session對象
//參數說明 b 是否使用事務 i jms消息確認機制 1 2 3 0 用常量表示
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//通過session創建Topic
Topic topic = session.createTopic("TestTopic");
//通過session創建消息發送者
MessageProducer producer = session.createProducer(topic);
//通過session創建消息對象
TextMessage message = session.createTextMessage("hello");
//發送消息
producer.send(message);
//關閉資源
producer.close();
session.close();
connection.close();
}
//消息接收方-consumer
@Test
public void test2() throws JMSException {
//創建連接工廠對象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//從工廠中獲取一個連接對象
Connection connection = connectionFactory.createConnection();
//連接MQ服務
connection.start();
//獲取session對象
//參數說明 b 是否使用事務 i jms消息確認機制 1 2 3 0 用常量表示
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//通過session創建Topic
Topic topic = session.createTopic("TestTopic");
//通過session創建消費者
MessageConsumer consumer = session.createConsumer(topic);
//指定消息監聽器
consumer.setMessageListener(new MessageListener() {
//當我們監聽的topic中存在消息,onMessage這個方法就會自動運行
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消費者接收到了消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//因為要接收消息不能關閉,同時線程不能死掉
while (true){
}
}
}
先啟動test2方法發起訂閱“TestTopic”消息,然後啟動test1方法,這時消費者收到了消息。
消息重發模擬
我們只需要更消息接收方的代碼,改動如下:
//消息接收方-consumer
@Test
public void test2() throws JMSException {
//創建連接工廠對象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//從工廠中獲取一個連接對象
Connection connection = connectionFactory.createConnection();
//連接MQ服務
connection.start();
//獲取session對象
//參數說明 b 是否使用事務 i jms消息確認機制 1 2 3 0 用常量表示
final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//通過session創建Topic
Topic topic = session.createTopic("TestTopic");
//通過session創建消費者
MessageConsumer consumer = session.createConsumer(topic);
//指定消息監聽器
consumer.setMessageListener(new MessageListener() {
//當我們監聽的topic中存在消息,onMessage這個方法就會自動運行
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
if(textMessage.getText().equals("nihao")){
System.out.println("消費者接收到了消息:"+textMessage.getText());
message.acknowledge();
}else {
System.out.println("消息處理失敗了..");
session.recover();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//因為要接收消息不能關閉,同時線程不能死掉
while (true){
}
}
先啟動test2方法發起訂閱“TestTopic”消息,然後啟動test1方法,這時消費者就會調用session.recover()方法讓消息發布者重發消息默認6次,我們能夠看到7條(第一次+重發六次)“消息處理失敗了..”輸出。
ActiveMQ Topic消息重發