ActiveMq-Spring整合
阿新 • • 發佈:2018-12-26
1,引入jar
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${springframework}</version> </dependency>
2,程式碼解析
配置檔案
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"> <!-- 連線工廠 --> <bean id="activeMqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="failover:(tcp://192.168.81.86:51511,tcp://192.168.81.86:51512,tcp://192.168.81.86:51513)"/> <property name="userName" value="admin"/> <property name="password" value="admin"/> <property name="useAsyncSend" value="true"/> </bean> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="activeMqConnectionFactory"/> <property name="sessionCacheSize" value="100"/> </bean> <!-- 點對點佇列 生產 網站提單bank--> <bean id="bankReqQueueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="vz.queue.service.trading"/> </bean> <!-- 生產者 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <!-- 預設傳送的訊息佇列--> <property name="defaultDestination" ref="bankReqQueueDestination"/> <property name="receiveTimeout" value="10000" /> <property name="pubSubDomain" value="false"/> </bean> <!-- 點對點佇列 消費 銀行前置preBank--> <bean id="preBankRespQueueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="vz.queue.service.netweb.order.status"/> </bean> <!-- 點對點佇列 消費 稅局tax--> <bean id="taxRespQueueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="vz.queue.service.trading"/> </bean> <!-- 消費者 設定監聽 銀行前置preBank--> <bean id="preBankQueueMessageListener" class="com.vzoom.bank.mq.finance.consumer.listener.PreBankQueueMessageListener"/> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="preBankRespQueueDestination"/> <property name="messageListener" ref="preBankQueueMessageListener"/> <property name="sessionTransacted" value="true"/> <!--<property name="concurrency" value="4-10"/>--> <!-- 設定固定的執行緒數 --> <property name="concurrentConsumers" value="6"></property> <!-- 設定動態的執行緒數 --> <property name="concurrency" value="2-9"></property> <!-- 設定最大的執行緒數 --> <property name="maxConcurrentConsumers" value="15"></property> </bean> <!-- 消費者 設定監聽 稅局tax--> <bean id="taxQueueMessageListener" class="com.vzoom.bank.mq.finance.consumer.listener.TaxQueueMessageListener"/> <bean id="jmsContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="taxRespQueueDestination"/> <property name="messageListener" ref="taxQueueMessageListener"/> <property name="sessionTransacted" value="true"/> <!--<property name="concurrency" value="4-10"/>--> <!-- 設定固定的執行緒數 --> <property name="concurrentConsumers" value="6"></property> <!-- 設定動態的執行緒數 --> <property name="concurrency" value="2-9"></property> <!-- 設定最大的執行緒數 --> <property name="maxConcurrentConsumers" value="15"></property> </bean> </beans>
生產者:定製佇列,預設佇列傳送
package com.vzoom.bank.mq.finance.producer.service; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; @Service public class ProducerService { @Resource(name="jmsTemplate") private JmsTemplate jmsTemplate; /** * 向指定佇列傳送訊息,定製佇列 */ public void sendMessage(Destination destination, final String msg) { System.out.println("向佇列" + destination.toString() + "傳送了訊息------------" + msg); jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } /** * 向預設佇列傳送訊息,預設佇列 */ public void sendMessage(final String msg) { String destination = jmsTemplate.getDefaultDestination().toString(); System.out.println("向佇列" +destination+ "傳送了訊息------------" + msg); jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(msg); } }); } }
消費者:主動消費(需要人為主動觸發消費),監聽消費(長連線,訊息佇列有訊息立馬自動消費)兩種方式,監聽消費需要在配置檔案中配置相應監聽器
主動消費
package com.vzoom.bank.mq.finance.consumer.service;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;
@Service
public class ConsumerService {
@Resource(name="jmsTemplate")
private JmsTemplate jmsTemplate;
/**
* 接收訊息:區別於用監聽的方式接收訊息
*/
public TextMessage receive(Destination destination) {
TextMessage tm = (TextMessage) jmsTemplate.receive(destination);
try {
System.out.println("從佇列" + destination.toString() + "收到了訊息:\t"
+ tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
return tm;
}
}
監聽消費1
package com.vzoom.bank.mq.finance.consumer.listener;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.context.ApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import javax.jms.*;
public class PreBankQueueMessageListener implements MessageListener {
/**
* 當收到訊息後,自動呼叫該方法
* @param message
*/
@Override
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("PreBankQueueMessageListener監聽到了文字訊息:" + tm.getText());
//TODO
} catch (JMSException e){
e.printStackTrace();
}
}
public static void main(String[] args) {
// 建立spring容器
ApplicationContext context = new ClassPathXmlApplicationContext("common/ActiveMQ.xml");
// 從spring容器中獲取JMSTemplate,這個物件是用於傳送訊息的
JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
//建立訊息模式
Destination destination = (Destination)context.getBean("bankReqQueueDestination");
// 使用JMSTemplate傳送訊息
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("spring-queue的方式傳送");
System.out.println("已傳送訊息...");
return textMessage;
}
});
}
}
監聽消費2
package com.vzoom.bank.mq.finance.consumer.listener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class TaxQueueMessageListener implements MessageListener {
/**
* 當收到訊息後,自動呼叫該方法
* @param message
*/
@Override
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("TaxQueueMessageListener監聽到了文字訊息:" + tm.getText());
//TODO
} catch (JMSException e){
e.printStackTrace();
}
}
}
測試
package com.vzoom.bank.controller;
import com.vzoom.bank.mq.finance.consumer.service.ConsumerService;
import com.vzoom.bank.mq.finance.producer.service.ProducerService;
import com.vzoom.domain.util.JsonBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import javax.jms.Destination;
@Controller
@RequestMapping(value = "/mq")
public class TestController {
@Autowired
ConsumerService consumerService;
@Autowired
ProducerService producerService;
@Resource(name="preBankRespQueueDestination")
private Destination receiveQueue;
@RequestMapping(value = "/sendMsg")
@ResponseBody
public String sendMsg(String msg){
try {
producerService.sendMessage(msg);
}catch (Exception e){
return JsonBean.toJsonString(JsonBean.EXCEPTION_ERROR,"失敗");
}
return JsonBean.toJsonString(JsonBean.SUCCESS,"成功");
}
@RequestMapping(value = "/getMsg")
@ResponseBody
public Object getMsg(){
try {
consumerService.receive(receiveQueue);
}catch (Exception e){
return JsonBean.toJsonString(JsonBean.EXCEPTION_ERROR,"失敗");
}
return JsonBean.toJsonString(JsonBean.SUCCESS,"成功");
}
}
結果
特別說明,以上點對點方式,監聽器消費方式和主動消費方式只能選擇一種,及配置了監聽,那麼主動消費就會失效,因為在訊息產生的瞬間,訊息就會被監聽器監聽到而被消費。