activemq訂閱模式以及訊息時長和確認機制
阿新 • • 發佈:2019-02-13
程式碼如下:
訂閱主題,注:如果在釋出主題前,沒有訂閱,是收不到訊息的,這跟點對點的佇列模式不同
- package com.activemq;
- import org.apache.activemq.ActiveMQConnectionFactory;
- import javax.jms.*;
- publicclass TopicPub {
- publicstaticvoid main(String[] args) throws JMSException {
-
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
- Connection connection = factory.createConnection();
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- /**
-
Session javax.jms.Connection.createSession(boolean transacted, int acknowledgeMode) throws JMSException
- 1.transacted事務,事務成功commit,才會將訊息傳送到mom中
- 2.acknowledgeMode訊息確認機制
- 1)、帶事務的session
- 如果session帶有事務,並且事務成功提交,則訊息被自動簽收。如果事務回滾,則訊息會被再次傳送。
- 2)、不帶事務的session
- 不帶事務的session的簽收方式,取決於session的配置。
-
Activemq支援一下三種模式:
- Session.AUTO_ACKNOWLEDGE 訊息自動簽收
- Session.CLIENT_ACKNOWLEDGE 客戶端呼叫acknowledge方法手動簽收
- Session.DUPS_OK_ACKNOWLEDGE 不是必須簽收,訊息可能會重複傳送。在第二次重新傳送訊息的時候,訊息
- 頭的JmsDelivered會被置為true標示當前訊息已經傳送過一次,客戶端需要進行訊息的重複處理控制。
- 程式碼示例如下:
- session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
- textMsg.acknowledge();
- */
- Topic topic = session.createTopic("wm5920.topic");
- MessageProducer producer = session.createProducer(topic);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//設定非持久化
- // producer.setTimeToLive(5000);//5秒後過期,這個對點對點模式有效
- TextMessage message = session.createTextMessage();
- message.setText("message_" + System.currentTimeMillis());
- producer.send(message);
- System.out.println("Sent message: " + message.getText());
- session.close();
- connection.stop();
- connection.close();
- }
- }
訂閱主題,注:如果在釋出主題前,沒有訂閱,是收不到訊息的,這跟點對點的佇列模式不同
- package com.activemq;
- import org.apache.activemq.ActiveMQConnectionFactory;
- import javax.jms.*;
- publicclass TopicSubs{
- publicstaticvoid main(String[] args) throws JMSException {
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
- Connection connection = factory.createConnection();
- connection.setClientID("wm5920");
- connection.start();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic("wm5920.topic");
- //持久訂閱方式,不會漏掉資訊
- TopicSubscriber subs=session.createDurableSubscriber(topic, "wm5920");
- subs.setMessageListener(new MessageListener() {
- publicvoid onMessage(Message message) {
- TextMessage tm = (TextMessage) message;
- try {
- System.out.println("Received message: " + tm.getText());
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
- //非持久訂閱方式
- // MessageConsumer consumer = session.createConsumer(topic);
- // consumer.setMessageListener(new MessageListener() {
- // public void onMessage(Message message) {
- // TextMessage tm = (TextMessage) message;
- // try {
- // System.out.println("Received message: " + tm.getText());
- // } catch (JMSException e) {
- // e.printStackTrace();
- // }
- // }
- // });
- // session.commit();
- // session.close();
- // connection.stop();
- // connection.close();
- }
- }