1. 程式人生 > >ActiveMQ 訊息持久化

ActiveMQ 訊息持久化

生產端(訊息持久化):


import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {
	 
	public static void main(String[] args) throws Exception{
		
		//第一步:建立connectionFactory工廠物件【需填入使用者名稱、密碼、要連線的地址】
		 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin",
                            "1234",
                            "tcp://localhost:61616");
		
		//第二步:通過ConnectionFactory工廠物件建立一個Connection連線,並且呼叫Connection的start方法開啟連線【connection預設是關閉的】
		Connection connection = connectionFactory.createConnection();
		connection.start();
		
		//第三步:通過connection建立session會話(上下文環境物件),用於接收訊息,引數配置1為是否啟用事務,引數配置2為簽收模式【一般我們設定為自動簽收】
		Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
 
		//使用事務的方式進行訊息的傳送
	//	Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
		
		//使用CLIENT端簽收的方式
		//Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
		
		//第四步:通過session建立Destination物件,指的是一個客戶端用來指定生產訊息目標和消費訊息來源的物件,在PTP模式中,Destination被稱作Queue即佇列;在Pub/Sub模式,Destination被稱作Topic即主題.在程式中可以使用多個Queue和Topic.
		Destination destination = session.createQueue("first");
		
		//第五步:通過session物件建立訊息的傳送和接收物件(生產者和消費者)MessageProducer/MessageConsumer
		MessageProducer messageProducer = session.createProducer(null);
		
		//第六步:可以使用MessageProducer的setDeliveryMode方法為其設定持久化特性和非持久化特性(DeliveryMode)
		messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		
				
		for (int i = 1; i <= 10; i++) {
			//TextMessage textMessage = session.createTextMessage("helloworld"+i);
			TextMessage textMessage = session.createTextMessage("我是訊息內容"+i);
			//第一個引數: 目的地
			//第二個引數: 訊息
			//第三個引數: 是否持久化
			//第四個引數: 優先順序【0-9  0-4表示普通    5-9表示加急  預設4】
			//第五個引數: 訊息在mq上的存放有效期【單位毫秒】
			//messageProducer.send(destination, textMessage, DeliveryMode.NON_PERSISTENT, i, 1000*60*2);
			messageProducer.send(destination, textMessage);
			//TimeUnit.SECONDS.sleep(1);
			System.out.println("生產者:"+textMessage.getText());
		}
		
		//提交資料
		//session.commit();
		
		//session.rollback();
		
		if (connection!=null) {
			connection.close();
		}
	}
}

 

消費端(訊息持久化):


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {  
    public static void main(String[] args) {  
        // ConnectionFactory :連線工廠,JMS 用它建立連線  
        ConnectionFactory connectionFactory;  
        // Connection :JMS 客戶端到JMS Provider 的連線  
        Connection connection = null;  
        // Session: 一個傳送或接收訊息的執行緒  
        Session session;  
        // Destination :訊息的目的地;訊息傳送給誰.  
        Destination destination;  
        // 消費者,訊息接收者  
        MessageConsumer consumer;  
        connectionFactory = new ActiveMQConnectionFactory(  
                "admin",  
                "1234", "tcp://localhost:61616");  
        try {  
            // 構造從工廠得到連線物件  
            connection = connectionFactory.createConnection();  
            // 啟動  
            connection.start();  
            // 獲取操作連線  
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
            // 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置  
            destination = session.createQueue("first");  
            consumer = session.createConsumer(destination);  
            while (true) {  
                // 設定接收者接收訊息的時間,為了便於測試,這裡誰定為100s  
            	//TextMessage message = (TextMessage) consumer.receive(100000);  
            	TextMessage message = (TextMessage) consumer.receive(); 
                if (null != message) {  
                	//System.out.println("收到訊息" + message.getText());  
                	System.out.println("消費資料:" + message.getText());
                	//message.acknowledge();
                } else {  
                    break;  
                }  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                if (null != connection)  
                    connection.close();  
            } catch (Throwable ignore) {  
            }  
        }  
    }  
}