1. 程式人生 > 程式設計 >ActiveMQ訊息簽收機制程式碼例項詳解

ActiveMQ訊息簽收機制程式碼例項詳解

這篇文章主要介紹了ActiveMQ訊息簽收機制程式碼例項解析,文中通過示例程式碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下

消費者客戶端成功接收一條訊息的標誌是:這條訊息被簽收。

消費者客戶端成功接收一條訊息一般包括三個階段:

1、消費者接收訊息,也即從MessageConsumer的receive方法返回

2、消費者處理訊息

3、訊息被簽收

其中,第三階段的簽收可以有ActiveMQ發起,也可以由消費者客戶端發起,取決於Session是否開啟事務以及簽收模式的設定。

在帶事務的Session中,消費者客戶端事務提交之時,訊息自動完成簽收。

在不帶事務的Session中,訊息何時以及如何被簽收取決於Session的簽收模式設定

非事務Session可以設定如下幾種簽收模式:

1.Session.AUTO_ACKNOWLEDGE

當訊息從MessageConsumer的receive方法返回或者從MessageListener介面的onMessage方法返回時,會話自動確認訊息簽收

2.Session.CLIENT_ACKNOWLEDGE

需要消費者客戶端主動呼叫acknowledge方法簽收訊息,這種模式實在Session層面進行簽收的,簽收一個已經消費的訊息會自動的簽收這個Session已消費的所有訊息:

例如一個消費者在一個Session中消費了5條訊息,然後確認第3條訊息,所有這5條訊息都會被簽收

3.Session.DUPS_OK_ACKNOWLEDGE

這種方式允許JMS不必急於確認收到的訊息,允許在收到多個訊息之後一次完成確認,與Auto_AcKnowledge相比,這種確認方式在某些情況下可能更有效,因為沒有確認,當系統崩潰或者網路出現故障的時候,訊息可以被重新傳遞.

這種方式會引起訊息的重複,但是降低了Session的開銷,所以只有客戶端能容忍重複的訊息才可使用。(如果ActiveMQ再次傳送同一訊息,那麼訊息頭中的JMSRedelivered將被設定為true)

帶事務session的案例

  生產者

    必須在生產完資料之後手動提交session

package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class Producter {
 public static void main(String[] args) throws JMSException {
  // ConnectionFactory :連線工廠,JMS 用它建立連線
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://127.0.0.1:61616");
  // JMS 客戶端到JMS Provider 的連線
  Connection connection = connectionFactory.createConnection();
  //啟動連線
  connection.start();
  // Session: 一個傳送或接收訊息的執行緒 false:代表不帶事務的session AUTO_ACKNOWLEDGE:代表自動簽收
  Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
  // Destination :訊息的目的地;訊息傳送給誰.
  // 獲取session注意引數值my-queue是Query的名字
  Queue queue = session.createQueue("my-queue");
  // MessageProducer:建立訊息生產者
  MessageProducer producer = session.createProducer(queue);
  // 設定不持久化 PERSISTENT:代表持久化 NON_PERSISTENT:代表不持久化
  producer.setDeliveryMode(DeliveryMode.PERSISTENT);
  // 傳送訊息
  for (int i = 1; i <= 5; i++) {
   sendMsg(session,producer,i);
  }
  System.out.println("傳送成功!");
  session.commit();
  session.close();
  connection.close();
 }
 /**
  * 在指定的會話上,通過指定的訊息生產者發出一條訊息
  *
  * @param session
  *   訊息會話
  * @param producer
  *   訊息生產者
  */
 public static void sendMsg(Session session,MessageProducer producer,int i) throws JMSException {
  // 建立一條文字訊息
  TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
  // 通過訊息生產者發出訊息
  producer.send(message);
 }
}

  消費者

    消費完資料之後必須手動提交session

package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsReceiver {
 public static void main(String[] args) throws JMSException {
  // ConnectionFactory :連線工廠,JMS 用它建立連線
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,"tcp://127.0.0.1:61616");
  // JMS 客戶端到JMS Provider 的連線
  Connection connection = connectionFactory.createConnection();
  connection.start();
  // Session: 一個傳送或接收訊息的執行緒 true:表單開啟事務 AUTO_ACKNOWLEDGE:代表自動簽收
  Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
  // Destination :訊息的目的地;訊息傳送給誰.
  // 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置
  Queue queue = session.createQueue("my-queue");
  // 消費者,訊息接收者
  MessageConsumer consumer = session.createConsumer(queue);
  while (true) {
   //receive():獲取訊息
   TextMessage message = (TextMessage) consumer.receive();
   if (null != message) {
    System.out.println("收到訊息:" + message.getText());
    session.commit();
   } else {
    break;
   }
  }
  //回收資源
  session.close();
  connection.close();
 }
}

不帶事務session的案例

  1.自動簽收

ActiveMQ訊息簽收機制程式碼例項詳解

  2.手動簽收

    生產者

package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class Producter {
 public static void main(String[] args) throws JMSException {
  // ConnectionFactory :連線工廠,JMS 用它建立連線
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,"tcp://127.0.0.1:61616");
  // JMS 客戶端到JMS Provider 的連線
  Connection connection = connectionFactory.createConnection();
  //啟動連線
  connection.start();
  // Session: 一個傳送或接收訊息的執行緒 false:代表不帶事務的session AUTO_ACKNOWLEDGE:代表自動簽收
  /* Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);*/
  Session session = connection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE);
  // Destination :訊息的目的地;訊息傳送給誰.
  // 獲取session注意引數值my-queue是Query的名字
  Queue queue = session.createQueue("my-queue");
  // MessageProducer:建立訊息生產者
  MessageProducer producer = session.createProducer(queue);
  // 設定不持久化 PERSISTENT:代表持久化 NON_PERSISTENT:代表不持久化
  producer.setDeliveryMode(DeliveryMode.PERSISTENT);
  // 傳送訊息
  for (int i = 1; i <= 5; i++) {
   sendMsg(session,i);
  }
  System.out.println("傳送成功!");
  session.close();
  connection.close();
 }
 /**
  * 在指定的會話上,通過指定的訊息生產者發出一條訊息
  *
  * @param session
  *   訊息會話
  * @param producer
  *   訊息生產者
  */
 public static void sendMsg(Session session,int i) throws JMSException {
  // 建立一條文字訊息
  TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
  // 通過訊息生產者發出訊息
  producer.send(message);
     message.acknowledge();  //手動提交
  } 
}

    消費者

package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import sun.plugin2.os.windows.SECURITY_ATTRIBUTES;

import javax.jms.*;

public class JmsReceiver {
 public static void main(String[] args) throws JMSException {
  // ConnectionFactory :連線工廠,JMS 用它建立連線
  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,"tcp://127.0.0.1:61616");
  // JMS 客戶端到JMS Provider 的連線
  Connection connection = connectionFactory.createConnection();
  connection.start();
  // Session: 一個傳送或接收訊息的執行緒 true:表單開啟事務 AUTO_ACKNOWLEDGE:代表自動簽收
  /*Session session = connection.createSession(Boolean.TRUE,Session.CLIENT_ACKNOWLEDGE);
  // Destination :訊息的目的地;訊息傳送給誰.
  // 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置
  Queue queue = session.createQueue("my-queue");
  // 消費者,訊息接收者
  MessageConsumer consumer = session.createConsumer(queue);
  while (true) {
   //receive():獲取訊息
   TextMessage message = (TextMessage) consumer.receive();
   if (null != message) {
    System.out.println("收到訊息:" + message.getText());
    message.acknowledge();  //手動提交
   } else {
    break;
   }
  }
  //回收資源
  session.close();
  connection.close();
 }
}

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。