ActiveMQ訊息簽收機制程式碼例項詳解
這篇文章主要介紹了ActiveMQ訊息簽收機制程式碼例項解析,文中通過示例程式碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
消費者客戶端成功接收一條訊息的標誌是:這條訊息被簽收。
消費者客戶端成功接收一條訊息一般包括三個階段:
1、消費者接收訊息,也即從MessageConsumer的receive方法返回
2、消費者處理訊息
3、訊息被簽收
其中,第三階段的簽收可以有ActiveMQ發起,也可以由消費者客戶端發起,取決於Session是否開啟事務以及簽收模式的設定。
在帶事務的Session中,消費者客戶端事務提交之時,訊息自動完成簽收。
在不帶事務的Session中,訊息何時以及如何被簽收取決於Session的簽收模式設定
非事務Session可以設定如下幾種簽收模式:
1.Session.AUTO_ACKNOWLEDGE
當訊息從MessageConsumer的receive方法返回或者從MessageListener介面的onMessage方法返回時,會話自動確認訊息簽收
2.Session.CLIENT_ACKNOWLEDGE
需要消費者客戶端主動呼叫acknowledge方法簽收訊息,這種模式實在Session層面進行簽收的,簽收一個已經消費的訊息會自動的簽收這個Session已消費的所有訊息:
例如一個消費者在一個Session中消費了5條訊息,然後確認第3條訊息,所有這5條訊息都會被簽收
3.Session.DUPS_OK_ACKNOWLEDGE
這種方式允許JMS不必急於確認收到的訊息,允許在收到多個訊息之後一次完成確認,與Auto_AcKnowledge相比,這種確認方式在某些情況下可能更有效,因為沒有確認,當系統崩潰或者網路出現故障的時候,訊息可以被重新傳遞.
這種方式會引起訊息的重複,但是降低了Session的開銷,所以只有客戶端能容忍重複的訊息才可使用。(如果ActiveMQ再次傳送同一訊息,那麼訊息頭中的JMSRedelivered將被設定為true)
帶事務session的案例
生產者
必須在生產完資料之後手動提交session
package com.wn.ddd; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Producter { public static void main(String[] args) throws JMSException { // ConnectionFactory :連線工廠,JMS 用它建立連線 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://127.0.0.1:61616"); // JMS 客戶端到JMS Provider 的連線 Connection connection = connectionFactory.createConnection(); //啟動連線 connection.start(); // Session: 一個傳送或接收訊息的執行緒 false:代表不帶事務的session AUTO_ACKNOWLEDGE:代表自動簽收 Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); // Destination :訊息的目的地;訊息傳送給誰. // 獲取session注意引數值my-queue是Query的名字 Queue queue = session.createQueue("my-queue"); // MessageProducer:建立訊息生產者 MessageProducer producer = session.createProducer(queue); // 設定不持久化 PERSISTENT:代表持久化 NON_PERSISTENT:代表不持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 傳送訊息 for (int i = 1; i <= 5; i++) { sendMsg(session,producer,i); } System.out.println("傳送成功!"); session.commit(); session.close(); connection.close(); } /** * 在指定的會話上,通過指定的訊息生產者發出一條訊息 * * @param session * 訊息會話 * @param producer * 訊息生產者 */ public static void sendMsg(Session session,MessageProducer producer,int i) throws JMSException { // 建立一條文字訊息 TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i); // 通過訊息生產者發出訊息 producer.send(message); } }
消費者
消費完資料之後必須手動提交session
package com.wn.ddd; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class JmsReceiver { public static void main(String[] args) throws JMSException { // ConnectionFactory :連線工廠,JMS 用它建立連線 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,"tcp://127.0.0.1:61616"); // JMS 客戶端到JMS Provider 的連線 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一個傳送或接收訊息的執行緒 true:表單開啟事務 AUTO_ACKNOWLEDGE:代表自動簽收 Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); // Destination :訊息的目的地;訊息傳送給誰. // 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置 Queue queue = session.createQueue("my-queue"); // 消費者,訊息接收者 MessageConsumer consumer = session.createConsumer(queue); while (true) { //receive():獲取訊息 TextMessage message = (TextMessage) consumer.receive(); if (null != message) { System.out.println("收到訊息:" + message.getText()); session.commit(); } else { break; } } //回收資源 session.close(); connection.close(); } }
不帶事務session的案例
1.自動簽收
2.手動簽收
生產者
package com.wn.ddd; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Producter { public static void main(String[] args) throws JMSException { // ConnectionFactory :連線工廠,JMS 用它建立連線 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,"tcp://127.0.0.1:61616"); // JMS 客戶端到JMS Provider 的連線 Connection connection = connectionFactory.createConnection(); //啟動連線 connection.start(); // Session: 一個傳送或接收訊息的執行緒 false:代表不帶事務的session AUTO_ACKNOWLEDGE:代表自動簽收 /* Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);*/ Session session = connection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE); // Destination :訊息的目的地;訊息傳送給誰. // 獲取session注意引數值my-queue是Query的名字 Queue queue = session.createQueue("my-queue"); // MessageProducer:建立訊息生產者 MessageProducer producer = session.createProducer(queue); // 設定不持久化 PERSISTENT:代表持久化 NON_PERSISTENT:代表不持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 傳送訊息 for (int i = 1; i <= 5; i++) { sendMsg(session,i); } System.out.println("傳送成功!"); session.close(); connection.close(); } /** * 在指定的會話上,通過指定的訊息生產者發出一條訊息 * * @param session * 訊息會話 * @param producer * 訊息生產者 */ public static void sendMsg(Session session,int i) throws JMSException { // 建立一條文字訊息 TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i); // 通過訊息生產者發出訊息 producer.send(message); message.acknowledge(); //手動提交 } }
消費者
package com.wn.ddd; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import sun.plugin2.os.windows.SECURITY_ATTRIBUTES; import javax.jms.*; public class JmsReceiver { public static void main(String[] args) throws JMSException { // ConnectionFactory :連線工廠,JMS 用它建立連線 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,"tcp://127.0.0.1:61616"); // JMS 客戶端到JMS Provider 的連線 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一個傳送或接收訊息的執行緒 true:表單開啟事務 AUTO_ACKNOWLEDGE:代表自動簽收 /*Session session = connection.createSession(Boolean.TRUE,Session.CLIENT_ACKNOWLEDGE); // Destination :訊息的目的地;訊息傳送給誰. // 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置 Queue queue = session.createQueue("my-queue"); // 消費者,訊息接收者 MessageConsumer consumer = session.createConsumer(queue); while (true) { //receive():獲取訊息 TextMessage message = (TextMessage) consumer.receive(); if (null != message) { System.out.println("收到訊息:" + message.getText()); message.acknowledge(); //手動提交 } else { break; } } //回收資源 session.close(); connection.close(); } }
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。