1. 程式人生 > >SpringBoot與JMS整合(中介軟體為ActiveMQ)

SpringBoot與JMS整合(中介軟體為ActiveMQ)

一、ActiveMQ學習
1、基本概念以及介紹
Apache ActiveMQ是最受歡迎和強有力的開源訊息和整合模式伺服器,支援許多跨語言客戶端和協議,便利使用企業整合模式還有許多先進的特性。

相關命令
activemq start:啟動
activemq stop:關閉

管理介面:
http://127.0.0.1:8161/admin/
login:admin
password:admin

監聽埠
ActiveMQ預設埠是61616。
windows檢視埠:netstat -an|find "61616"

unix檢視埠:netstat -nl|grep 61616

2、兩種訊息模式
(1)點對點模式
點對點的模式主要建立在一個佇列上面,當連線一個佇列的時候,傳送端不需要知道接收端是否正在接收,傳送的訊息,將會先進入訊息佇列中,如果
有接收端在監聽,則會發向接收端,如果沒有接收端接收,則會儲存到activemq伺服器,直到接收端接收資訊,點對點的訊息模式可以有多個傳送端,
多個接收端,但是一條訊息,只會被一個接收端接收,那個接收端先連上activemq,則會先接收到,而後來的接收端則接收不到那條資訊。

(2)訂閱/釋出模式
訂閱/釋出模式,同樣可以有多個傳送端與接收端,但是接收端和傳送端存在時間上的依賴,如果傳送端傳送訊息的時候,接收端並沒有接收資訊,那麼
activemq不會儲存資訊,認為訊息已經發送。換一種說法,就是傳送端傳送訊息的時候,接收端不線上,是接收端不到資訊的,哪怕以後監聽訊息,同樣
也是接收不到的。這個模式還有一個特點,傳送端傳送的訊息,將會被所有的接收端接收。不類似點對點,一條訊息只會被一個接收端接收到。

點對點模式傳送端
public class QueueSender {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    Connection conn = factory.createConnection();
    Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 開啟連線
    conn.start();
    Destination des = session.createQueue("sampleQueue");
    MessageProducer producer = session.createProducer(des);
    // 預設為persistent,當activemq關閉時,佇列資料將會被儲存。當為non_persistent時,activemq關閉時,佇列資料將會被清空。
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    producer.send(session.createTextMessage("Hello World!"));
    producer.close();
    session.close();
    conn.close();
  }
}

點對點模式接收端
public class QueueConsumer {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    Connection conn = factory.createConnection();
    // 開啟連線
    conn.start();
    Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Destination des = session.createQueue("sampleQueue");
    MessageConsumer consumer = session.createConsumer(des);
    consumer.setMessageListener(message -> {
      if (message instanceof TextMessage) {
        TextMessage tm = TextMessage.class.cast(message);
        try {
          System.out.println(tm.getText());
        } catch (JMSException e) {
          e.printStackTrace();
        }
      }
    });
    // 程式等待
    System.in.read();
    consumer.close();
    session.close();
    conn.close();
  }
}

訂閱模式傳送端
public class TopicSender {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    Connection conn = factory.createConnection();
    // 開啟連線
    conn.start();
    Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Destination des = session.createTopic("myTopic");
    MessageProducer producer = session.createProducer(des);
    // 預設為persistent,當activemq關閉時,佇列資料將會被儲存。當為non_persistent時,activemq關閉時,佇列資料將會被清空。
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    producer.send(session.createTextMessage("Hello World!"));
    producer.close();
    session.close();
    conn.close();
  }
}

訂閱模式接收端
public class TopicConsumer {

  public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    Connection conn = factory.createConnection();
    // 開啟連線
    conn.start();
    Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Destination des = session.createTopic("myTopic");
    MessageConsumer consumer = session.createConsumer(des);
    consumer.setMessageListener(message -> {
      if (message instanceof TextMessage) {
        TextMessage tm = TextMessage.class.cast(message);
        try {
          System.out.println(tm.getText());
        } catch (JMSException e) {
          e.printStackTrace();
        }
      }
    });
    // 程式等待
    System.in.read();
    consumer.close();
    session.close();
    conn.close();
  }
}

3、傳送訊息的資料型別
//純字串的資料
session.createTextMessage();
//序列化的物件
session.createObjectMessage();
//流,可以用來傳遞檔案等
session.createStreamMessage();
//用來傳遞位元組
session.createBytesMessage();
//這個方法創建出來的就是一個map,可以把它當作map來用,當你看了它的一些方法,你就懂了
session.createMapMessage();
//這個方法,拿到的是javax.jms.Message,是所有message的介面
session.createMessage();

4、保證訊息的成功處理
建立session的時候使用客戶端確認模式,如:
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
AUTO_ACKNOWLEDGE:
當訊息傳送給接收端之後,就自動確認成功了,而不管接收端有沒有處理成功,而一旦確認成功後,就會把佇列裡面的訊息給清除掉,避免下一個接收端
收到同樣的訊息。
CLIENT_ACKNOWLEDGE:
由客戶端進行確認,當客戶端接收到訊息進行處理後,客戶端呼叫Message介面的acknowledge方法進行確認,如果不確定,activemq將會發給下一個接收端處理。
注意:CLIENT_ACKNOWLEDGE模式只在點對點模式有效,訂閱/釋出模式中,即使不確認,也不會儲存訊息。

二、Spring中的JMS
1、Spring整合JMS各包介紹
Spring提供了JMS整合框架,簡化了JMS API的使用,JMS大致能分為兩塊功能:訊息的生產和消費。JMSTemplate類用於訊息的生產和非同步訊息的接收,
Spring提供了許多訊息訊息監聽器容器用來建立訊息驅動的POJO,同時也提供了宣告式的方法來建立訊息監聽器。

org.springframework.jms.core提供了使用JMS的核心功能,包括通過處理資源建立和釋放來簡化JMS API使用的JMS模板類。Spring模板類的設計原則是提供
幫助方法執行通用的操作和為了更加複雜的使用,而把處理任務的核心委託給使用者實現的回撥介面。
org.springframework.jms.support提供了JMSException的異常轉化功能,會把受檢的JMSException轉換成非受檢異常,例如javax.jms.JMSException會被包裝為
UncategorizedJmsException。
org.springframework.jms.support.converter提供了在Java物件和JMS訊息之間的訊息轉換器抽象。
org.springframework.jms.support.destination提供了許多管理JMS目的地的策略,比如為儲存在JNDI的目的地提供服務定位器。
org.springframework.jms.annotation提供了支援使用@JMSListener註解的必要設施。
org.springframework.jms.config提供了jms名稱空間的解析實現以及配置監聽器容器的java配置支援。
org.springframework.jms.connection提供了獨立應用程式中適合使用的連線工廠的使用。同時也包含了JMS的平臺事務管理器,也就是JmsTransactionManager。

2、使用Spring JMS
(1)JmsTemplate
JmsTemplate類簡化了JMS的使用,因為當傳送訊息和非同步接收訊息它會處理資源的建立和釋放,使用JmsTemplate只需要實現回撥介面。
MessageCreator回撥介面指定session就能建立訊息,對於更復雜的使用,SessionCallback提供使用者Jms會話和ProducerCallbacn回撥。
備註:JmsTemplate例項一旦配置就是執行緒安全的,這就意味著可以配置一個JmsTemplate類的例項然後注入共享引用到多個協作者中,
簡單來說,JmsTemplate是有狀態的,因為它維護者一個連線工廠的引用,但是這種狀態是非會話狀態。

(2)Connections
JmsTemplate需要一個連線工廠的引用,通常被客戶端應用作為工廠建立連線,而且它還包含了一些配置引數,許多引數是供應相關的,比如SSL配置選項。

(3)Caching Messaging Resources
在連線工廠和傳送操作間,有三個中間物件會被建立和銷燬,如下:
ConnectionFactory->Connection->Session->MessageProducer->send
為了優化資源的使用、提升效能,將會提供兩個連線工廠。

(4)連線工廠分類
SingleConnectionFactory:每次都會建立一個新的連線,適合測試
CachingConnectionFactory:基於SingleConnectionFactory的基礎上增加了Sessions、MessageProducers和MessageConsumer的快取,初始快取大小為1,
通過sessionCacheSize可以增加快取sessions的數量。
注意:實際快取Sessions的數量將會比設定的要大,因為快取的sessions是基於確認模式的。

備註:推薦使用apache的PooledConnectionFactory,PooledConnectionFactory支援Connection、Session和MessageProducer的池化,連線、會話和訊息生產者例項
使用後會返回池裡,以便後續使用。而Consumer不會池化,消費者處理完訊息後應該關閉,而不是放到池裡後續使用。儘管消費者空閒,但ActiveMQ會繼續傳送訊息
到消費者的預獲取緩衝區,訊息會在預獲取緩衝區阻塞,直到消費者再次被啟用。

(5)Destination Management
JmsTemplate會把目的地名的解析委託給JMS目的地解析物件,也就是DestinationResolver的實現。DynamicDestinationResolver是JmsTemplate使用的
預設實現,它提供解析動態目的地。
通常在JMS應用中的目的地只在執行時得知,當應用程式部署時是不會被建立的。這是因為在互動式的系統元件中有共享程式邏輯,這些系統元件會根據
一個已知的命名規則在執行期間建立目的地。

(6)Message Listener Containers
訊息監聽容器用來接收來自JMS訊息佇列的訊息並驅動已注入的訊息監聽器。監聽容器負責所有訊息接收的執行緒並分發訊息給監聽器,訊息監聽器容器是在
MDP(message-driven POJO)和訊息提供者的媒介,它關注接收訊息的註冊、參與事務、資源獲取和釋放以及異常轉化。

分類:
SimpleMessageListenerContainer
這種訊息監聽容器會在啟動時建立固定數量的JMS sessions和consumers,並且用標準的MessageConsumer.setMessageListener()註冊監聽器,然後讓JMS
訊息生產者呼叫回撥介面。
注意:SimpleMessageListenerContainer並不允許動態適應執行時需求以及參與外部事務,它只支援本地事務,將sessionTransacted標識設為true即可。來自訊息監聽器的異常
會導致回滾,而且訊息會重新發送。

DefaultMessageListenerContainer
這種訊息監聽器是使用最多的,和SimpleMessageListenerContainer相反,這種容器變體允許動態適應執行時需求,當配置了JtaTransactionManager時,每條被
接收的訊息都會伴隨XA transaction而註冊。這種監聽器在JMS生產者的低需求、參與外部管理事務的功能和相容Java EE環境中打造了平衡。

(7)Transaction management
Spring提供了JmsTransactionManager為單個連線工廠管理事務,允許應用程式利用Spring的事務管理特性。JmsTransactionManager管理本地資源事務,
繫結來自指定連線工廠的JMS連線/會話對到執行緒中,JmsTemplate自動檢測事務資源並相應進行操作。

三、SpringBoot與ActiveMQ集合使用

1、加入相關依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.iboxpay</groupId>
	<artifactId>activemq_learning</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>war</packaging>

	<name>activemq_learning</name>
	<description>activemq_learning for Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.4.RELEASE</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<!-- spring框架 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-aop</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-activemq</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-tomcat</artifactId>
			<scope>provided</scope>
		</dependency>
		<!-- spring測試框架 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<!-- mybatis與spring整合 -->
		<dependency>
			<groupId>org.mybatis.spring.boot</groupId>
			<artifactId>mybatis-spring-boot-starter</artifactId>
			<version>1.3.2</version>
		</dependency>
		<!-- mysql連線驅動 -->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<scope>runtime</scope>
		</dependency>
		<!-- druid -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>druid-spring-boot-starter</artifactId>
			<version>1.1.10</version>
		</dependency>

		<!-- mq連線池 -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-pool</artifactId>
		</dependency>

		<!-- jsp -->
		<dependency>
			<groupId>org.apache.tomcat.embed</groupId>
			<artifactId>tomcat-embed-jasper</artifactId>
			<scope>provided</scope>
		</dependency>
		<!-- jstl中包含標準標籤庫 -->
		<dependency>
			<groupId>javax.servlet</groupId>
			<artifactId>jstl</artifactId>
		</dependency>

		<!--javamelody應用監控 -->
		<dependency>
			<groupId>net.bull.javamelody</groupId>
			<artifactId>javamelody-spring-boot-starter</artifactId>
			<version>1.74.0</version>
		</dependency>

		<!-- fastjson -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.49</version>
		</dependency>

		<!-- commons lang -->
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-lang3</artifactId>
		</dependency>
		<!-- commons code -->
		<dependency>
			<groupId>commons-codec</groupId>
			<artifactId>commons-codec</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>


</project>

2、整合相關配置

@Configuration
@PropertySource("classpath:jms.properties")
public class ActiveMQConfig {

  @Autowired
  private JmsConfig jmsConfig;

  @Autowired
  private MyListener myListener;

  /**
   * ActiveMQ連線池
   * @return
   */
  @Bean(name = "pooledConnectionFactory", destroyMethod = "stop")
  public PooledConnectionFactory pooledConnectionFactory() {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    connectionFactory.setBrokerURL(jmsConfig.getBrokerUrl());
    return new PooledConnectionFactory(connectionFactory);
  }

  /**
   * 傳送或接收目的地
   * @return
   */
  @Bean("activeMQQueue")
  public ActiveMQQueue activeMQQueue() {
    return new ActiveMQQueue(jmsConfig.getDestinationName());
  }

  /**
   * JmsTemplate訊息傳送模板類配置
   * @param connectionFactory
   * @param activeMQQueue
   * @return
   */
  @Bean
  public JmsTemplate jmsTemplate(@Qualifier("pooledConnectionFactory") PooledConnectionFactory connectionFactory,
      @Qualifier("activeMQQueue") ActiveMQQueue activeMQQueue) {
    JmsTemplate jmsTemplate = new JmsTemplate();
    // 指定連線工廠
    jmsTemplate.setConnectionFactory(connectionFactory);
    // 開啟事務
    jmsTemplate.setSessionTransacted(true);
    // 指定確認模式,預設為自動確認
    jmsTemplate.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
    // 指定傳送目的地
    jmsTemplate.setDefaultDestination(activeMQQueue);
    return jmsTemplate;
  }

  /**
   * jms事務管理器,建立session時需開啟事務
   * @param connectionFactory
   * @return
   */
  @Bean(name = "jmsTransactionManager")
  public JmsTransactionManager jmsTransactionManager(
      @Qualifier("pooledConnectionFactory") PooledConnectionFactory connectionFactory) {
    return new JmsTransactionManager(connectionFactory);
  }

  /**
   * 訊息監聽器容器,用來接收來自指定目的地的訊息
   * @param connectionFactory
   * @param jmsTransactionManager
   * @param threadPoolTaskExecutor
   * @param activeMQQueue
   * @return
   */
  @Bean
  public DefaultMessageListenerContainer defaultMessageListenerContainer(
      @Qualifier("pooledConnectionFactory") PooledConnectionFactory connectionFactory,
      @Qualifier("jmsTransactionManager") JmsTransactionManager jmsTransactionManager,
      @Qualifier("threadPoolTaskExecutor") ThreadPoolTaskExecutor threadPoolTaskExecutor,
      @Qualifier("activeMQQueue") ActiveMQQueue activeMQQueue) {
    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    // 開啟事務
    container.setSessionTransacted(true);
    container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
    container.setDestination(activeMQQueue);
    // 指定事務管理器
    container.setTransactionManager(jmsTransactionManager);

    // 指定訊息監聽器
    container.setMessageListener(myListener);
    // 併發消費者數量
    container.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
    // 最大併發消費者數量
    container.setMaxConcurrentConsumers(jmsConfig.getMaxConcurrentConsumers());
    // 指定任務執行器執行監聽執行緒
    container.setTaskExecutor(threadPoolTaskExecutor);
    // 接收時長
    container.setReceiveTimeout(jmsConfig.getReceiveTimeout());
    return container;
  }

  /**
   * 執行緒池任務執行器配置
   * @return
   */
  @Bean("threadPoolTaskExecutor")
  public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(20);
    threadPoolTaskExecutor.setMaxPoolSize(100);
    threadPoolTaskExecutor.setKeepAliveSeconds(300);
    threadPoolTaskExecutor.setQueueCapacity(1000);
    threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
    return threadPoolTaskExecutor;
  }

}
@Component
@ConfigurationProperties(prefix = "jms")
public class JmsConfig {

  /**
   * broker地址
   */
  private String brokerUrl;

  /**
   * 併發連線消費者數
   */
  private Integer concurrentConsumers;

  /**
   * 最大併發連線消費者數
   */
  private Integer maxConcurrentConsumers;

  /**
   * 目的地
   */
  private String destinationName;

  /**
   * 訊息接收時長
   */
  private Integer receiveTimeout;

  public String getBrokerUrl() {
    return brokerUrl;
  }

  public Integer getConcurrentConsumers() {
    return concurrentConsumers;
  }

  public Integer getMaxConcurrentConsumers() {
    return maxConcurrentConsumers;
  }

  public String getDestinationName() {
    return destinationName;
  }

  public Integer getReceiveTimeout() {
    return receiveTimeout;
  }

  public void setBrokerUrl(String brokerUrl) {
    this.brokerUrl = brokerUrl;
  }

  public void setConcurrentConsumers(Integer concurrentConsumers) {
    this.concurrentConsumers = concurrentConsumers;
  }

  public void setMaxConcurrentConsumers(Integer maxConcurrentConsumers) {
    this.maxConcurrentConsumers = maxConcurrentConsumers;
  }

  public void setDestinationName(String destinationName) {
    this.destinationName = destinationName;
  }

  public void setReceiveTimeout(Integer receiveTimeout) {
    this.receiveTimeout = receiveTimeout;
  }

  @Override
  public String toString() {
    return "JmsConfig [brokerUrl=" + brokerUrl + ", concurrentConsumers=" + concurrentConsumers
        + ", maxConcurrentConsumers=" + maxConcurrentConsumers + ", destinationName=" + destinationName
        + ", receiveTimeout=" + receiveTimeout + "]";
  }

}
#================ jms相關配置 ===============#

#broker地址
jms.brokerURL=tcp://127.0.0.1:61616
#併發消費者數量
jms.concurrentConsumers=1
#最大併發消費者數量
jms.maxConcurrentConsumers=1
#傳送目的地名稱
jms.destinationName=spring_jms_test
#訊息監聽器容器接收時長
jms.receiveTimeout=0

以上為jms.properties屬性檔案內容

3、訊息監聽器配置

@Component
public class MyListener implements MessageListener {

  private static Logger logger = LoggerFactory.getLogger(MyListener.class);

  @Override
  public void onMessage(Message message) {
    if (message instanceof TextMessage) {
      try {
        TextMessage textMessage = TextMessage.class.cast(message);
        System.out.println(textMessage.getText());
      } catch (JMSException e) {
        logger.error("接收訊息失敗:{}", e.getMessage(), e);
      }
    }
  }

}

4、訊息傳送控制器

@Controller
public class JmsController {

  @Autowired
  private JmsTemplate jmsTemplate;

  @Autowired
  private ThreadPoolTaskExecutor threadPoolTaskExecutor;

  @RequestMapping("sendMessage.action")
  @ResponseBody
  public Map<String, Object> sendMessage(String textMessage) {
    Map<String, Object> resp = new HashMap<>();
    if (StringUtils.isBlank(textMessage)) {
      resp.put("statusCode", 0);
      resp.put("msg", "訊息不能為空!");
      return resp;
    }

    // 非同步傳送訊息
    threadPoolTaskExecutor.execute(() -> {
      // 實現回撥介面建立訊息
      jmsTemplate.send(session -> {
        return session.createTextMessage(textMessage);
      });
    });

    resp.put("statusCode", 1);
    resp.put("msg", "訊息傳送成功!");
    return resp;
  }

}

該學習專案github地址:https://github.com/liuyalou1996/activemq_learning