中介軟體--ActiveMQ詳細入門使用教程
MQ是訊息中介軟體,是一種在分散式系統中應用程式藉以傳遞訊息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的開源專案,完全支援JMS1.1和J2EE1.4規範的JMS Provider實現。
特點:
1、支援多種語言編寫客戶端
2、對spring的支援,很容易和spring整合
3、支援多種傳輸協議:TCP,SSL,NIO,UDP等
4、支援AJAX
訊息形式:
1、點對點(queue)
2、一對多(topic)
ActiveMQ安裝
我這裡提供一個安裝好的虛擬機器:http://download.csdn.net/download/liuyuanq123/10217892
伺服器執行後,我們可以直接訪問到activeMQ的介面:
然後點選queues可以看到現在沒有一條訊息:
ActiveMQ測試
編寫一個測試類對ActiveMQ進行測試,首先得向pom檔案中新增ActiveMQ相關的jar包:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
1
2
3
4
queue的傳送程式碼如下:
public void testMQProducerQueue() throws Exception{
//1、建立工廠連線物件,需要制定ip和埠號
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
//2、使用連線工廠建立一個連線物件
Connection connection = connectionFactory.createConnection();
//3、開啟連線
connection.start();
//4、使用連線物件建立會話(session)物件
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用會話物件建立目標物件,包含queue和topic(一對一和一對多)
Queue queue = session.createQueue("test-queue");
//6、使用會話物件建立生產者物件
MessageProducer producer = session.createProducer(queue);
//7、使用會話物件建立一個訊息物件
TextMessage textMessage = session.createTextMessage("hello!test-queue");
//8、傳送訊息
producer.send(textMessage);
//9、關閉資源
producer.close();
session.close();
connection.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
接收程式碼:
public void TestMQConsumerQueue() throws Exception{
//1、建立工廠連線物件,需要制定ip和埠號
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
//2、使用連線工廠建立一個連線物件
Connection connection = connectionFactory.createConnection();
//3、開啟連線
connection.start();
//4、使用連線物件建立會話(session)物件
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用會話物件建立目標物件,包含queue和topic(一對一和一對多)
Queue queue = session.createQueue("test-queue");
//6、使用會話物件建立生產者物件
MessageConsumer consumer = session.createConsumer(queue);
//7、向consumer物件中設定一個messageListener物件,用來接收訊息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
//8、程式等待接收使用者訊息
System.in.read();
//9、關閉資源
consumer.close();
session.close();
connection.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
然後當我們執行queue傳送的時候可以看到佇列裡已經有一條訊息了,但沒有傳送出去:
然後在執行queue 的接收端,可以看到訊息已經發出了:
接著對topic進行測試,傳送程式碼如下:
public void TestTopicProducer() throws Exception{
//1、建立工廠連線物件,需要制定ip和埠號
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
//2、使用連線工廠建立一個連線物件
Connection connection = connectionFactory.createConnection();
//3、開啟連線
connection.start();
//4、使用連線物件建立會話(session)物件
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用會話物件建立目標物件,包含queue和topic(一對一和一對多)
Topic topic = session.createTopic("test-topic");
//6、使用會話物件建立生產者物件
MessageProducer producer = session.createProducer(topic);
//7、使用會話物件建立一個訊息物件
TextMessage textMessage = session.createTextMessage("hello!test-topic");
//8、傳送訊息
producer.send(textMessage);
//9、關閉資源
producer.close();
session.close();
connection.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
接收程式碼:
public void TestTopicConsumer() throws Exception{
//1、建立工廠連線物件,需要制定ip和埠號
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
//2、使用連線工廠建立一個連線物件
Connection connection = connectionFactory.createConnection();
//3、開啟連線
connection.start();
//4、使用連線物件建立會話(session)物件
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用會話物件建立目標物件,包含queue和topic(一對一和一對多)
Topic topic = session.createTopic("test-topic");
//6、使用會話物件建立生產者物件
MessageConsumer consumer = session.createConsumer(topic);
//7、向consumer物件中設定一個messageListener物件,用來接收訊息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
//8、程式等待接收使用者訊息
System.in.read();
//9、關閉資源
consumer.close();
session.close();
connection.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
然後執行topic傳送:
可以看到訊息已經發送出去。再執行topic接收:
可以看到有了一個消費者,但是沒有接收的訊息,這是因為正常情況下我們的topic訊息不會再伺服器持久化,所以要先開啟消費者,再開啟生產者,這個時候我們再執行生產者傳送一條訊息看到訊息已經接收到了:
ActiveMQ整合spring及專案中運用
activeMQ與spring看一整合到一起使用,除了新增ActiveMQ相關的jar包外,還需要新增spring的jar包:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
1
2
3
4
然後編寫applicationContext-activemq.xml檔案,
程式碼如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 配置能夠產生connection的connectionfactory,由JMS對應的服務廠商提供 -->
<bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<constructor-arg name="brokerURL" value="tcp://192.168.156.44:61616"/>
</bean>
<!-- 配置spring管理真正connectionfactory的connectionfactory,相當於spring對connectionfactory的一層封裝 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="tagertConnectionFactory"/>
</bean>
<!-- 配置生產者 -->
<!-- Spring使用JMS工具類,可以用來發送和接收訊息 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這裡是配置的spring用來管理connectionfactory的connectionfactory -->
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!-- 配置destination -->
<!-- 佇列目的地 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="spring-queue"/>
</bean>
<!-- 話題目的地 -->
<bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="item-add-topic"/>
</bean>
</beans>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
然後在我們淘淘商城中,商品新增到資料庫的時候,對應也要新增資料到我們的solr索引中,所以生產者應該在插入資料後建立:
當然,在xml檔案中配置好的jmstemplate和destination也要注入進來:
@Autowired
private JmsTemplate jmsTemplate;
@Resource(name="itemAddTopic")
private Destination destination;
1
2
3
4
然後消費者應該寫在我們的搜尋工程中,首先新增spring和activeMQ的jar包,然後配置xml檔案,再編寫一個監聽器,當接收到訊息時,就講資料存入索引庫,xml檔案程式碼如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 配置能夠產生connection的connectionfactory,由JMS對應的服務廠商提供 -->
<bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<constructor-arg name="brokerURL" value="tcp://192.168.156.44:61616"/>
</bean>
<!-- 配置spring管理真正connectionfactory的connectionfactory,相當於spring對connectionfactory的一層封裝 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="tagertConnectionFactory"/>
</bean>
<!-- 配置destination -->
<!-- 佇列目的地 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="spring-queue"/>
</bean>
<!-- 話題目的地 -->
<bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="item-add-topic"/>
</bean>
<!-- 配置監聽器 -->
<bean id="myListener" class="com.taotao.search.listener.MyListener"/>
<bean id="itemAddListener" class="com.taotao.search.listener.ItemAddListener"/>
<!-- 系統監聽器 -->
<!-- <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="queueDestination"/>
<property name="messageListener" ref="myListener"/>
</bean> -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="itemAddTopic"/>
<property name="messageListener" ref="itemAddListener"/>
</bean>
</beans>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
接收訊息程式碼:
最後同時開啟測試即可。
---------------------
作者:JHON_YUAN
來源:CSDN
原文:https://blog.csdn.net/liuyuanq123/article/details/79109218
版權宣告:本文為博主原創文章,轉載請附上博文連結!