淘淘商城24_ActiveMq訊息佇列02_activeMq與spring的整合
阿新 • • 發佈:2019-01-02
一、新增依賴
<!-- activeMQ -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
二、配置檔案
applicationContext-activeMq.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.1.104:61616" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!-- 配置生產者 --> <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 --> <property name="connectionFactory" ref="connectionFactory" /> </bean> <!--這個是佇列目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <!-- 傳送訊息的名稱 --> <value>spring-queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean> <!-- 接收訊息 --> <!-- 配置監聽器 --> <bean id="myMessageListener" class="com.taotao.listener.MyMessageListener" /> <!-- 訊息監聽容器 --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="myMessageListener" /> </bean> </beans>
三、程式碼測試
1. 傳送訊息
第一步:初始化一個spring容器
第二步:從容器中獲得JMSTemplate物件。
第三步:從容器中獲得一個Destination物件
第四步:使用JMSTemplate物件傳送訊息,需要知道Destination
@Test public void testQueueProducer() throws Exception { // 第一步:初始化一個spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activeMq.xml"); // 第二步:從容器中獲得JMSTemplate物件。 JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class); // 第三步:從容器中獲得一個Destination物件 Queue queue = (Queue) applicationContext.getBean("queueDestination"); // 第四步:使用JMSTemplate物件傳送訊息,需要知道Destination jmsTemplate.send(queue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage("spring activemq testsss"); return textMessage; } }); }
2. 接收訊息
第一步:把Activemq相關的jar包新增到工程中
第二步:建立一個MessageListener的實現類。
MessageListener.java
public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; //獲取訊息內容 String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }
@Test
public void testQueueConsumer() throws Exception {
//初始化spring容器
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activeMq.xml");
//等待
System.in.read();
}
3、測試
先執行consumer程式碼,執行起來以後,再執行”testQueueProducers”這個程式,就可以看到執行結果了
四、在專案中的應用-----資料庫與索引庫的同步
1. 思想:
taotao-manager工程中新增 一個商品,然後會將這個商品的itemId傳送給activeMq,Mq接收到這個itemId,然後將itemId再次傳送給taotao-search工程,
2. 生產者taotao-manager-service ,依賴和配置檔案本文開始已新增
商品新增功能
/**
* 新增商品資訊
*/
@Override
public TaotaoResult saveItem(TbItem tbItem, String desc,String paramData) {
//資料補全
tbItem.setCreated(new Date());
tbItem.setUpdated(new Date());
tbItem.setStatus((byte)1);//'商品狀態,1-正常,2-下架,3-刪除
final long itemId = IDUtils.genItemId();//生成隨機id
tbItem.setId(itemId);
itemMapper.saveItem(tbItem);
TaotaoResult result = saveItemDesc(itemId, desc);
if (result.getStatus()!=200) {
try {
throw new Exception();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
TaotaoResult itemResult = saveItemParamItem(itemId, paramData);
if (itemResult.getStatus()!=200) {
try {
throw new Exception();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//向activeMq傳送訊息,將商品的id傳送給activeMq
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(itemId+"");//傳送的訊息
return textMessage;
}
});
return TaotaoResult.ok();
}
3. 消費者taotao-search
3.1. 配置檔案 applicationContext-activeMq.xml(taotao-search-service)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.1.101:61616" />
</bean>
<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!--這個是佇列目的地,點對點的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--這個是主題目的地,一對多的 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="item-change-topic" />
</bean>
<!-- 配置監聽器 -->
<bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener" />
<!-- 訊息監聽容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="topicDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
</beans>
3.2. pom新增依賴taotao-search-service
<!-- activeMQ -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
3.3. Mapper層
3.4 SearchItemMapper.xml
3.5 Service層SearchItemService.java
3.6 SearchItemServiceImpl.java
/**
* 新增商品到索引庫
*/
@Override
public TaotaoResult addDocument(long itemId) {
try {
//1.查詢出taotao-manager新增的商品
SearchItem searchItem = searchItemMapper.getSearchItemById(itemId);
//2.建立一個SolrInputDocument物件
SolrInputDocument document = new SolrInputDocument();
//3.將查詢到的資料新增到索引庫
//根據索引庫域的定義,將欄位寫入到索引庫
document.setField("id", searchItem.getId());
document.setField("item_title", searchItem.getTitle());
document.setField("item_category_name", searchItem.getCatName());
document.setField("item_image", searchItem.getImage());
document.setField("item_desc", searchItem.getItem_desc());
document.setField("item_price", searchItem.getPrice());
document.setField("item_sell_point", searchItem.getSell_point());
solrServer.add(document);
solrServer.commit();
} catch (Exception e) {
e.printStackTrace();
}
return TaotaoResult.ok();
}
3.7 配置監聽 MyMessageListener.java
建立一個監聽類
package com.taotao.search.listener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import com.taotao.search.service.SearchItemService;
/**
* 配置一個監聽類
* @author fengjinzhu
*
*/
public class MyMessageListener implements MessageListener {
@Autowired
private SearchItemService searchItemService;
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
//獲取訊息內容,生產者傳送的訊息內容
String itemId = textMessage.getText();
//等待一秒鐘
Thread.sleep(1000);
searchItemService.addDocument(Long.valueOf(itemId));
} catch (Exception e) {
e.printStackTrace();
}
}
}