JMS學習九 ActiveMQ的訊息持久化到Mysql資料庫
1、將連線Mysql資料庫驅動包,放到ActiveMQ的lib目錄下
2,修改ActiveMQ的conf目錄下的active.xml檔案,修改資料持久化的方式
2.1 修改原來的kshadb的持久化資料的方式
2.2 連線Mysql的配置
3、將資料持久化Mysql的執行截圖
3.1 重新啟動ActiveMQ,並執行程式,放入持久化資料,檢視Mysql的active資料庫
4,資料持久化程式碼
package test.mq.helloworld;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender {
//預設連線使用者名稱
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//預設連線密碼
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//預設連線地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
//傳送的訊息數量
private static final int SENDNUM = 10;
public static void main(String[] args) throws Exception {
/*ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");*/
/**
* activemq.xml 配置密碼
*/
ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory(
"bhz",
"bhz",
"tcp://localhost:61616");
//連線
Connection connection = null;
try {
connection = activeMQConnectionFactory.createConnection();
connection.start();
//建立session
// Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//開啟事物
// Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
//開啟事物 並且使用Client的方式
/**
* 3、通過Connection物件建立Session會話(上下文環境物件),
引數一,表示是否開啟事務
引數二,表示的是簽收模式,一般使用的有自動簽收和客戶端自己確認簽收
第一個引數設定為true,表示開啟事務
開啟事務後,記得要手動提交事務
*/
Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
// 4、通過Session建立Destination物件,指的是一個客戶端用來指定生產訊息目標和消費訊息來源的物件。
// 在PTP模式中,Destination指的是Queue
// 在釋出訂閱模式中,Destination指的是Topic
//訊息的目的地
Destination destination = session.createQueue("queue1");
//建立訊息生產者
// 5、使用Session來建立訊息物件的生產者或者消費者
MessageProducer messageProducer = session.createProducer(destination);
//PERSISTENT 用來指定JMS Provider對訊息進行持久化操作,以免Provider fail的時候,丟失Message
//NON_Persistent 方式下的JMS Provider不會對消進行持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
//傳送訊息
// sendMessage(session, messageProducer);
for (int j = 0; j < 10; j++) {
TextMessage textMessage = session.createTextMessage();
textMessage.setText("我的訊息內容,id為"+j);
System.out.println("生產者: "+textMessage.getText());
messageProducer.send(destination,textMessage,DeliveryMode.PERSISTENT,j,60*1000);
// System.out.println("生產者: "+textMessage.getText());
}
//使用事物 Boolean.TRUE
// session.commit();
} catch (Exception e) {
e.printStackTrace();
}finally{
if(connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 傳送訊息
* @param session
* @param messageProducer 訊息生產者
* @throws Exception
*/
public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
for (int i = 0; i < Sender.SENDNUM; i++) {
//建立一條文字訊息
TextMessage textMessage = session.createTextMessage();
textMessage.setText("我的訊息內容,id為"+i);
messageProducer.send(textMessage);
System.out.println("生產者: "+textMessage.getText());
// TextMessage message = session.createTextMessage("ActiveMQ 傳送訊息" +i);
// System.out.println("生產者傳送訊息:Activemq 傳送訊息" + i);
//通過訊息生產者發出訊息
// messageProducer.send(message);
}
}
}