ActiveMQ(四)--JMS的可靠性機制-1
訊息接收確認
JMS訊息只有在被確認之後,才認為已經被成功地消費了。訊息的成功消費通常包含三個階段:客戶接收訊息、客戶處理訊息和訊息被確認。
在事務性會話中,當一個事務被提交的時候,確認自動發生。
在非事務性會話中,訊息何時被確認取決於建立會話時的應答模式(acknowledgement mode)。該引數有以下3個可選值:
1.Session.AUTO_ACKNOWLEDGE:當客戶成功的從receive方法返回的時候,或者從MessageListener.onMessage方法成功返回的時候,會話自動確認客戶收到的訊息。
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class QueueSender { public static void main(String[] args) throws JMSException, InterruptedException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 3; i++) { MapMessage message = session.createMapMessage(); message.setStringProperty("extra" + i, "okok"); message.setString("message---" + i, "my map message===" + i); producer.send(message); } session.commit(); session.close(); connection.close(); } }
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.util.Enumeration; public class QueueReceiver { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Enumeration names = connection.getMetaData().getJMSXPropertyNames(); while (names.hasMoreElements()) { String name = (String) names.nextElement(); System.out.println("jmsx name===" + name); } Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("my-queue"); MessageConsumer consumer = session.createConsumer(destination); int i = 0; while (i < 3) { MapMessage message = (MapMessage) consumer.receive(); // session.commit(); System.out.println("收到的訊息:" + message.getString("message---" + i) + ", property==" + message.getStringProperty("extra" + i)); i++; } session.close(); connection.close(); } }
這裡的消費者的session.commit()註釋掉了。
先執行生產者程式,再執行消費者程式,此時,消費者成功收到三條資訊。再次直接執行消費者程式,竟然,還收到3條訊息。也就說明,session沒有commit,就是沒有確認,沒有簽收,訊息佇列會認為消費者沒收到,這樣在下次執行消費者程式的時候,會繼續傳送原來的訊息。
現在,把session.commit()的註釋放開。執行消費者程式,成功收到3條訊息。再次執行消費者程式,沒有收到訊息,並且程式阻塞了。因為沒有訊息了,所以,一直在receive那裡等。
再執行一下生產者,發現,消費者程式也不阻塞了,收到了3條訊息。
2.Session.CLIENT_ACKNOWLEDGE:
修改程式:
先執行生產者,後執行消費者。消費者成功收到3條訊息。再次執行消費者,消費者沒有收到訊息,並且阻塞。說明,接收確認了。
再次修改程式碼:
先執行生產者,再執行消費者,消費者成功收到3條訊息。再執行消費者,消費者繼續收到3條訊息。說明,沒有接收確認。
3.Session.DUPS_ACKNOWLEDGE:該選擇只是會話遲鈍的確認訊息的提交。如果JMS Provider失敗,那麼可能會導致一些重複的訊息。如果是重複的訊息,那麼JMS Provider必須把訊息頭的JMSRedelivered欄位設定為true。
注意:在事務性會話中,事務提交的時候,確認自動發生。以上這3個引數,只有在非事務性會話中起作用。