1. 程式人生 > >中介軟體--ActiveMQ詳細入門使用教程

中介軟體--ActiveMQ詳細入門使用教程

MQ是訊息中介軟體,是一種在分散式系統中應用程式藉以傳遞訊息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的開源專案,完全支援JMS1.1和J2EE1.4規範的JMS Provider實現。 
特點: 
1、支援多種語言編寫客戶端 
2、對spring的支援,很容易和spring整合 
3、支援多種傳輸協議:TCP,SSL,NIO,UDP等 
4、支援AJAX 
訊息形式: 
1、點對點(queue) 
2、一對多(topic) 

ActiveMQ安裝

我這裡提供一個安裝好的虛擬機器:http://download.csdn.net/download/liuyuanq123/10217892 
伺服器執行後,我們可以直接訪問到activeMQ的介面: 

然後點選queues可以看到現在沒有一條訊息: 


ActiveMQ測試

      編寫一個測試類對ActiveMQ進行測試,首先得向pom檔案中新增ActiveMQ相關的jar包:

     <dependency>  
         <groupId>org.apache.activemq</groupId>  
         <artifactId>activemq-all</artifactId>  
    </dependency>  
1
2
3
4
queue的傳送程式碼如下:

    public void testMQProducerQueue() throws Exception{
        //1、建立工廠連線物件,需要制定ip和埠號
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
        //2、使用連線工廠建立一個連線物件
        Connection connection = connectionFactory.createConnection();
        //3、開啟連線
        connection.start();
        //4、使用連線物件建立會話(session)物件
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用會話物件建立目標物件,包含queue和topic(一對一和一對多)
        Queue queue = session.createQueue("test-queue");
        //6、使用會話物件建立生產者物件
        MessageProducer producer = session.createProducer(queue);
        //7、使用會話物件建立一個訊息物件
        TextMessage textMessage = session.createTextMessage("hello!test-queue");
        //8、傳送訊息
        producer.send(textMessage);
        //9、關閉資源
        producer.close();
        session.close();
        connection.close();
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
接收程式碼:

    public void TestMQConsumerQueue() throws Exception{
        //1、建立工廠連線物件,需要制定ip和埠號
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
        //2、使用連線工廠建立一個連線物件
        Connection connection = connectionFactory.createConnection();
        //3、開啟連線
        connection.start();
        //4、使用連線物件建立會話(session)物件
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用會話物件建立目標物件,包含queue和topic(一對一和一對多)
        Queue queue = session.createQueue("test-queue");
        //6、使用會話物件建立生產者物件
        MessageConsumer consumer = session.createConsumer(queue);
        //7、向consumer物件中設定一個messageListener物件,用來接收訊息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                // TODO Auto-generated method stub
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage)message;
                    try {
                        System.out.println(textMessage.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        });
        //8、程式等待接收使用者訊息
        System.in.read();
        //9、關閉資源
        consumer.close();
        session.close();
        connection.close();
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
然後當我們執行queue傳送的時候可以看到佇列裡已經有一條訊息了,但沒有傳送出去: 

然後在執行queue 的接收端,可以看到訊息已經發出了: 


接著對topic進行測試,傳送程式碼如下:

    public void TestTopicProducer() throws Exception{
        //1、建立工廠連線物件,需要制定ip和埠號
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
        //2、使用連線工廠建立一個連線物件
        Connection connection = connectionFactory.createConnection();
        //3、開啟連線
        connection.start();
        //4、使用連線物件建立會話(session)物件
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用會話物件建立目標物件,包含queue和topic(一對一和一對多)
        Topic topic = session.createTopic("test-topic");
        //6、使用會話物件建立生產者物件
        MessageProducer producer = session.createProducer(topic);
        //7、使用會話物件建立一個訊息物件
        TextMessage textMessage = session.createTextMessage("hello!test-topic");
        //8、傳送訊息
        producer.send(textMessage);
        //9、關閉資源
        producer.close();
        session.close();
        connection.close();
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
接收程式碼:

    public void TestTopicConsumer() throws Exception{
        //1、建立工廠連線物件,需要制定ip和埠號
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
        //2、使用連線工廠建立一個連線物件
        Connection connection = connectionFactory.createConnection();
        //3、開啟連線
        connection.start();
        //4、使用連線物件建立會話(session)物件
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用會話物件建立目標物件,包含queue和topic(一對一和一對多)
        Topic topic = session.createTopic("test-topic");
        //6、使用會話物件建立生產者物件
        MessageConsumer consumer = session.createConsumer(topic);
        //7、向consumer物件中設定一個messageListener物件,用來接收訊息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                // TODO Auto-generated method stub
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage)message;
                    try {
                        System.out.println(textMessage.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        });
        //8、程式等待接收使用者訊息
        System.in.read();
        //9、關閉資源
        consumer.close();
        session.close();
        connection.close();
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
然後執行topic傳送: 

可以看到訊息已經發送出去。再執行topic接收: 

可以看到有了一個消費者,但是沒有接收的訊息,這是因為正常情況下我們的topic訊息不會再伺服器持久化,所以要先開啟消費者,再開啟生產者,這個時候我們再執行生產者傳送一條訊息看到訊息已經接收到了: 


ActiveMQ整合spring及專案中運用

      activeMQ與spring看一整合到一起使用,除了新增ActiveMQ相關的jar包外,還需要新增spring的jar包:

    <dependency>  
        <groupId>org.springframework</groupId>  
        <artifactId>spring-context</artifactId>  
    </dependency>  
1
2
3
4
然後編寫applicationContext-activemq.xml檔案, 
程式碼如下:

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"  
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"  
    xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd  
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd  
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd  
    http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd  
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">  

    <!-- 配置能夠產生connection的connectionfactory,由JMS對應的服務廠商提供 -->
    <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg name="brokerURL" value="tcp://192.168.156.44:61616"/>
    </bean>
    <!-- 配置spring管理真正connectionfactory的connectionfactory,相當於spring對connectionfactory的一層封裝 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="tagertConnectionFactory"/>
    </bean>
    <!-- 配置生產者 -->
    <!-- Spring使用JMS工具類,可以用來發送和接收訊息 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這裡是配置的spring用來管理connectionfactory的connectionfactory -->
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
    <!-- 配置destination -->
    <!-- 佇列目的地 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="spring-queue"/>
    </bean>
    <!-- 話題目的地 -->
    <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="item-add-topic"/>
    </bean>
</beans>  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
然後在我們淘淘商城中,商品新增到資料庫的時候,對應也要新增資料到我們的solr索引中,所以生產者應該在插入資料後建立: 

當然,在xml檔案中配置好的jmstemplate和destination也要注入進來:

    @Autowired
    private JmsTemplate jmsTemplate;
    @Resource(name="itemAddTopic")
    private Destination destination;
1
2
3
4
然後消費者應該寫在我們的搜尋工程中,首先新增spring和activeMQ的jar包,然後配置xml檔案,再編寫一個監聽器,當接收到訊息時,就講資料存入索引庫,xml檔案程式碼如下:

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"  
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"  
    xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd  
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd  
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd  
    http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd  
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">  

    <!-- 配置能夠產生connection的connectionfactory,由JMS對應的服務廠商提供 -->
    <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg name="brokerURL" value="tcp://192.168.156.44:61616"/>
    </bean>
    <!-- 配置spring管理真正connectionfactory的connectionfactory,相當於spring對connectionfactory的一層封裝 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="tagertConnectionFactory"/>
    </bean>
    <!-- 配置destination -->
    <!-- 佇列目的地 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="spring-queue"/>
    </bean>
    <!-- 話題目的地 -->
    <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="item-add-topic"/>
    </bean>
    <!-- 配置監聽器 -->
    <bean id="myListener" class="com.taotao.search.listener.MyListener"/>
    <bean id="itemAddListener" class="com.taotao.search.listener.ItemAddListener"/>
    <!-- 系統監聽器 -->
<!--    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="queueDestination"/>
        <property name="messageListener" ref="myListener"/>
    </bean> -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="itemAddTopic"/>
        <property name="messageListener" ref="itemAddListener"/>
    </bean>
</beans>  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
接收訊息程式碼: 

最後同時開啟測試即可。 
--------------------- 
作者:JHON_YUAN 
來源:CSDN 
原文:https://blog.csdn.net/liuyuanq123/article/details/79109218 
版權宣告:本文為博主原創文章,轉載請附上博文連結!