ActiveMQ 訊息持久化
阿新 • • 發佈:2018-12-24
生產端(訊息持久化):
import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { public static void main(String[] args) throws Exception{ //第一步:建立connectionFactory工廠物件【需填入使用者名稱、密碼、要連線的地址】 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "1234", "tcp://localhost:61616"); //第二步:通過ConnectionFactory工廠物件建立一個Connection連線,並且呼叫Connection的start方法開啟連線【connection預設是關閉的】 Connection connection = connectionFactory.createConnection(); connection.start(); //第三步:通過connection建立session會話(上下文環境物件),用於接收訊息,引數配置1為是否啟用事務,引數配置2為簽收模式【一般我們設定為自動簽收】 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //使用事務的方式進行訊息的傳送 // Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //使用CLIENT端簽收的方式 //Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); //第四步:通過session建立Destination物件,指的是一個客戶端用來指定生產訊息目標和消費訊息來源的物件,在PTP模式中,Destination被稱作Queue即佇列;在Pub/Sub模式,Destination被稱作Topic即主題.在程式中可以使用多個Queue和Topic. Destination destination = session.createQueue("first"); //第五步:通過session物件建立訊息的傳送和接收物件(生產者和消費者)MessageProducer/MessageConsumer MessageProducer messageProducer = session.createProducer(null); //第六步:可以使用MessageProducer的setDeliveryMode方法為其設定持久化特性和非持久化特性(DeliveryMode) messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i = 1; i <= 10; i++) { //TextMessage textMessage = session.createTextMessage("helloworld"+i); TextMessage textMessage = session.createTextMessage("我是訊息內容"+i); //第一個引數: 目的地 //第二個引數: 訊息 //第三個引數: 是否持久化 //第四個引數: 優先順序【0-9 0-4表示普通 5-9表示加急 預設4】 //第五個引數: 訊息在mq上的存放有效期【單位毫秒】 //messageProducer.send(destination, textMessage, DeliveryMode.NON_PERSISTENT, i, 1000*60*2); messageProducer.send(destination, textMessage); //TimeUnit.SECONDS.sleep(1); System.out.println("生產者:"+textMessage.getText()); } //提交資料 //session.commit(); //session.rollback(); if (connection!=null) { connection.close(); } } }
消費端(訊息持久化):
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Receiver { public static void main(String[] args) { // ConnectionFactory :連線工廠,JMS 用它建立連線 ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的連線 Connection connection = null; // Session: 一個傳送或接收訊息的執行緒 Session session; // Destination :訊息的目的地;訊息傳送給誰. Destination destination; // 消費者,訊息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( "admin", "1234", "tcp://localhost:61616"); try { // 構造從工廠得到連線物件 connection = connectionFactory.createConnection(); // 啟動 connection.start(); // 獲取操作連線 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置 destination = session.createQueue("first"); consumer = session.createConsumer(destination); while (true) { // 設定接收者接收訊息的時間,為了便於測試,這裡誰定為100s //TextMessage message = (TextMessage) consumer.receive(100000); TextMessage message = (TextMessage) consumer.receive(); if (null != message) { //System.out.println("收到訊息" + message.getText()); System.out.println("消費資料:" + message.getText()); //message.acknowledge(); } else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }