springboot activemq 2 持久化訊息 與 持久化訂閱
阿新 • • 發佈:2019-02-05
改動1.減少springboot重複建立session的問題
jmsTemplate的地方加入了CachingConnectionFactory,這樣配置可以
@Bean(name = "myJmsTemplate") public JmsTemplate getJmsTemplate(ActiveMQConnectionFactory nonXaJmsConnectionFactory, MessageConverter jacksonJmsMessageConverter){ //使用CachingConnectionFactory可以提高部分效能。 CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setSessionCacheSize(100); cachingConnectionFactory.setTargetConnectionFactory(nonXaJmsConnectionFactory); JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory); //設定deliveryMode(持久化), priority, timeToLive必須開啟 jmsTemplate.setExplicitQosEnabled(true); // 設定訊息是否持久化 jmsTemplate.setDeliveryPersistent(true); // 設定訊息轉換器 jmsTemplate.setMessageConverter(jacksonJmsMessageConverter); // 設定訊息是否以事務 jmsTemplate.setSessionTransacted(true); return jmsTemplate; }
改動2 : 加入訂閱持久化
什麼是訂閱持久化?簡單講就是MQ記住了給每個訂閱者傳送到了哪裡,如果中途發生故障又恢復後,可以從記憶的位置繼續傳送給訂閱者。預設是非持久化訂閱,即:訂閱者在宕機重啟之後,佇列中有訊息,訂閱者也是收不到任何訊息的,即便這些訊息已經持久化在日誌檔案或者資料庫中。
將原來預設的訊息監聽容器替換為如下兩個配置Bean。這是兩個分別的監聽Bean,並且他們開啟了持久化訂閱引數setSubscriptionDurable( true ) ,並且擁有各自的ID,factory.setClientId(“10001”); factory.setClientId(“10002”);具體為什麼要這麼設定可以參看這篇博文 https://www.tuicool.com/articles/UfimyuR 。
@Bean(name = "topicContainerFactory1") public DefaultJmsListenerContainerFactory topicClient1(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer){ DefaultJmsListenerContainerFactory factory = defaultJmsListenerContainerFactoryTopic(connectionFactory,configurer); factory.setClientId("10001"); return factory; } @Bean(name = "topicContainerFactory2") public DefaultJmsListenerContainerFactory topicClient2(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer){ DefaultJmsListenerContainerFactory factory = defaultJmsListenerContainerFactoryTopic(connectionFactory,configurer); factory.setClientId("10002"); return factory; } /** * * @param connectionFactory * @param configurer * @return */ public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactoryTopic(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); configurer.configure(factory,connectionFactory); factory.setPubSubDomain(true); factory.setSessionTransacted(true); factory.setAutoStartup(true); //開啟持久化訂閱 factory.setSubscriptionDurable(true); return factory; }
改動3:對同一個佇列的訊息多添加了一個訂閱者。
MQ可以在訂閱者第一次訂閱的時候記憶每個訂閱者的傳送位置,之後進行分別的推送。
@JmsListener(destination = MQ_TOPIC_NAME,containerFactory = "topicContainerFactory1") // 監聽指定訊息主題
public void receiveMessage1(Message message) throws Exception {
log.debug("[接收佇列1] [收到訊息] {}",message);
}
@JmsListener(destination = MQ_TOPIC_NAME,containerFactory = "topicContainerFactory2") // 監聽指定訊息主題
public void receiveMessage1(Message message) throws Exception {
log.debug("[接收佇列2] [收到訊息] {}",message);
}
測試結果:
1>同時開啟發布者和兩個訂閱者,釋出者向MQ推送訊息。訂閱者接收。
2017-10-23 01:09:33.258 DEBUG 14436 --- [enerContainer-1] c.thinvent.service.MessageHandleService : [接收佇列1] [收到訊息] Message
2017-10-23 01:09:33.258 DEBUG 14436 --- [enerContainer-1] c.t.service.MessageHandleService2 : [接收佇列2] [收到訊息] Message
2017-10-23 01:09:33.354 DEBUG 14436 --- [enerContainer-1] c.t.service.MessageHandleService2 : [接收佇列2] [收到訊息] Message
2017-10-23 01:09:33.354 DEBUG 14436 --- [enerContainer-1] c.thinvent.service.MessageHandleService : [接收佇列1] [收到訊息] Message
2017-10-23 01:09:33.407 DEBUG 14436 --- [enerContainer-1] c.t.service.MessageHandleService2 : [接收佇列2] [收到訊息] Message
2017-10-23 01:09:33.461 DEBUG 14436 --- [enerContainer-1] c.thinvent.service.MessageHandleService : [接收佇列1] [收到訊息] Message
2>之後關閉消費者。釋出者再發布3條訊息。
3>此時可以將MQ服務關閉,模擬宕機。然後啟動MQ服務,啟動一個訂閱者,緊接著會受到後續未傳送的3條訊息,然後再啟動另外的訂閱者,緊接著也會受到後續的三條訊息。