1. 程式人生 > >JMS學習九 ActiveMQ的訊息持久化到Mysql資料庫

JMS學習九 ActiveMQ的訊息持久化到Mysql資料庫

1、將連線Mysql資料庫驅動包,放到ActiveMQ的lib目錄下

2,修改ActiveMQ的conf目錄下的active.xml檔案,修改資料持久化的方式

         2.1  修改原來的kshadb的持久化資料的方式

 

          2.2  連線Mysql的配置

3、將資料持久化Mysql的執行截圖

      3.1  重新啟動ActiveMQ,並執行程式,放入持久化資料,檢視Mysql的active資料庫

 

4,資料持久化程式碼

package test.mq.helloworld;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {
    
     //預設連線使用者名稱
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //預設連線密碼
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //預設連線地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    
    //傳送的訊息數量
    private static final int SENDNUM = 10;

    public static void main(String[] args) throws Exception {
        /*ActiveMQConnectionFactory activeMQConnectionFactory = 
                new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER, 
                ActiveMQConnection.DEFAULT_PASSWORD, 
                "tcp://localhost:61616");*/
        /**
         * activemq.xml  配置密碼
         */
        ActiveMQConnectionFactory activeMQConnectionFactory = 
                new ActiveMQConnectionFactory(
                "bhz", 
                "bhz", 
                "tcp://localhost:61616");
        //連線
        Connection connection = null;
        
        try {
             connection = activeMQConnectionFactory.createConnection();
             connection.start();
            
            //建立session
        //    Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
             //開啟事物
        //    Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
             //開啟事物 並且使用Client的方式
             /**
              *  3、通過Connection物件建立Session會話(上下文環境物件),  
                      引數一,表示是否開啟事務  
                      引數二,表示的是簽收模式,一般使用的有自動簽收和客戶端自己確認簽收  
  
                      第一個引數設定為true,表示開啟事務  
                      開啟事務後,記得要手動提交事務 
              */
            Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
            
            // 4、通過Session建立Destination物件,指的是一個客戶端用來指定生產訊息目標和消費訊息來源的物件。  
            // 在PTP模式中,Destination指的是Queue  
            // 在釋出訂閱模式中,Destination指的是Topic  
            //訊息的目的地
            Destination destination = session.createQueue("queue1");
            //建立訊息生產者
            // 5、使用Session來建立訊息物件的生產者或者消費者  
            MessageProducer messageProducer = session.createProducer(destination);
            //PERSISTENT 用來指定JMS Provider對訊息進行持久化操作,以免Provider fail的時候,丟失Message
            //NON_Persistent 方式下的JMS Provider不會對消進行持久化
            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //傳送訊息
        //    sendMessage(session, messageProducer);
            for (int j = 0; j < 10; j++) {
                TextMessage textMessage = session.createTextMessage();
                textMessage.setText("我的訊息內容,id為"+j);
                System.out.println("生產者: "+textMessage.getText());
                messageProducer.send(destination,textMessage,DeliveryMode.PERSISTENT,j,60*1000);    
            //    System.out.println("生產者: "+textMessage.getText());
            }
            
            //使用事物    Boolean.TRUE    
        //    session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }
    
    
     /**
     * 傳送訊息
     * @param session
     * @param messageProducer  訊息生產者
     * @throws Exception
     */
    public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
        for (int i = 0; i < Sender.SENDNUM; i++) {
            //建立一條文字訊息 
            TextMessage textMessage = session.createTextMessage();
            textMessage.setText("我的訊息內容,id為"+i);
            messageProducer.send(textMessage);
            System.out.println("生產者: "+textMessage.getText());
         //   TextMessage message = session.createTextMessage("ActiveMQ 傳送訊息" +i);
         //   System.out.println("生產者傳送訊息:Activemq 傳送訊息" + i);
            //通過訊息生產者發出訊息 
         //   messageProducer.send(message);
        }

    }

}