1. 程式人生 > >ActiveMQ(四)--JMS的可靠性機制-1

ActiveMQ(四)--JMS的可靠性機制-1

訊息接收確認

JMS訊息只有在被確認之後,才認為已經被成功地消費了。訊息的成功消費通常包含三個階段:客戶接收訊息、客戶處理訊息和訊息被確認。
在事務性會話中,當一個事務被提交的時候,確認自動發生。

在非事務性會話中,訊息何時被確認取決於建立會話時的應答模式(acknowledgement mode)。該引數有以下3個可選值:
1.Session.AUTO_ACKNOWLEDGE:當客戶成功的從receive方法返回的時候,或者從MessageListener.onMessage方法成功返回的時候,會話自動確認客戶收到的訊息。

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class QueueSender {
    public static void main(String[] args) throws JMSException, InterruptedException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("my-queue");

        MessageProducer producer = session.createProducer(destination);

        for (int i = 0; i < 3; i++) {
            MapMessage message = session.createMapMessage();
            message.setStringProperty("extra" + i, "okok");
            message.setString("message---" + i, "my map message===" + i);
            producer.send(message);
        }

        session.commit();
        session.close();
        connection.close();
    }
}
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.Enumeration;

public class QueueReceiver {
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();

        Enumeration names = connection.getMetaData().getJMSXPropertyNames();

        while (names.hasMoreElements()) {
            String name = (String) names.nextElement();
            System.out.println("jmsx name===" + name);
        }


        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("my-queue");
        MessageConsumer consumer = session.createConsumer(destination);
        int i = 0;
        while (i < 3) {

            MapMessage message = (MapMessage) consumer.receive();
//            session.commit();
            System.out.println("收到的訊息:" + message.getString("message---" + i)
                    + ", property==" + message.getStringProperty("extra" + i));
            i++;
        }
        session.close();
        connection.close();
    }
}

這裡的消費者的session.commit()註釋掉了。

先執行生產者程式,再執行消費者程式,此時,消費者成功收到三條資訊。再次直接執行消費者程式,竟然,還收到3條訊息。也就說明,session沒有commit,就是沒有確認,沒有簽收,訊息佇列會認為消費者沒收到,這樣在下次執行消費者程式的時候,會繼續傳送原來的訊息。

現在,把session.commit()的註釋放開。執行消費者程式,成功收到3條訊息。再次執行消費者程式,沒有收到訊息,並且程式阻塞了。因為沒有訊息了,所以,一直在receive那裡等。

再執行一下生產者,發現,消費者程式也不阻塞了,收到了3條訊息。

2.Session.CLIENT_ACKNOWLEDGE:

客戶通過呼叫訊息的acknowledge方法確認訊息。需要注意的是,在這種模式中,確認是在會話層上進行,確認一個被消費的訊息將自動確認所有已被會話消費的訊息。例如,如果一個消費者消費了10個訊息,然後,確認第5個訊息,那麼所有10個訊息都被確認。

修改程式:

先執行生產者,後執行消費者。消費者成功收到3條訊息。再次執行消費者,消費者沒有收到訊息,並且阻塞。說明,接收確認了。

再次修改程式碼:

先執行生產者,再執行消費者,消費者成功收到3條訊息。再執行消費者,消費者繼續收到3條訊息。說明,沒有接收確認。

3.Session.DUPS_ACKNOWLEDGE:該選擇只是會話遲鈍的確認訊息的提交。如果JMS Provider失敗,那麼可能會導致一些重複的訊息。如果是重複的訊息,那麼JMS Provider必須把訊息頭的JMSRedelivered欄位設定為true。

注意:在事務性會話中,事務提交的時候,確認自動發生。以上這3個引數,只有在非事務性會話中起作用。