activeMQ持久化到資料庫配置
阿新 • • 發佈:2018-12-31
1.修改conf/activemq.xml檔案,新增一下配置
<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#dataSource"/>
</persistenceAdapter>
<bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource"> <property name="driverClassName" value="com.mysql.jdbc.Driver"></property> <property name="url" value="jdbc:mysql://localhost:3306/activeMQ?relaxAutoCommit=true"></property> <property name="username" value="root"></property> <property name="password" value="****"></property> <property name="poolPreparedStatements" value="true"></property> </bean>
2.將dbcp和jdbc驅動拷到activeMQ的lib目錄下
3.以下程式碼實現
package com.suobei.activeMQ; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test; import javax.jms.*; /** * topic訊息持久化訂閱,持久到mysql * Created by wangmin on 2018/3/12 0012. */ public class TopicPersistentMysqlTest { /** * 編寫訊息的傳送方,生產者*/ @Test public void test1() throws JMSException { //1.建立連線工廠物件 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://www.itroot.top:61616"); //2.獲取連線物件 Connection connection = connectionFactory.createConnection(); System.out.println(connection); //3.連線MQ服務connection.start(); //4.獲得session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.通過session建立主題 Topic topic = session.createTopic("itRootTopic"); //6.通過session物件建立訊息的消費者 MessageProducer messageProducer = session.createProducer(topic); //7.建立一條訊息 TextMessage textMessage = session.createTextMessage("起床了...啦啦啦"); //8.傳送訊息 messageProducer.send(textMessage,DeliveryMode.PERSISTENT,1,1000*60*60*24); //關閉資源 messageProducer.close(); session.close(); connection.close(); } /** * 訊息的消費者,接收方 * @throws JMSException */ @Test public void test2() throws JMSException { //1.建立連線工廠物件 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://www.itroot.top:61616"); //2.獲取連線物件 Connection connection = connectionFactory.createConnection(); //設定客戶端id connection.setClientID("client-1"); //3.連線MQ服務 connection.start(); //4.獲得session final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.通過session建立主題 Topic topic = session.createTopic("itRootTopic"); //6.通過session物件建立持久化訊息的消費者 TopicSubscriber consumer = session.createDurableSubscriber(topic, "client1-sub"); consumer.setMessageListener(message -> { //當我們監聽的topic中存在訊息時,這個方法自動執行 TextMessage textMessage= (TextMessage) message; try { System.out.println("接收到訊息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } }); while (true){ } } }