1. 程式人生 > >ActiveMQ(二、程式碼實現)

ActiveMQ(二、程式碼實現)

程式碼實現

一、springboot+activeMQ

1.1 yml配置(部分)

spring:
  activemq:
    queueName: mingQueue
    topicName: mingTopic
    broker-url: tcp://0.0.0.0:61616
    user:
    password:
    in-memory: true
    pool:
      enabled: false

1.2 配置類(支援佇列和主題)

package com.ming.activitymq.conf;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsMessagingTemplate;

import javax.annotation.Resource;
import javax.jms.Queue;
import javax.jms.Topic;

/**
 * 檔名:  ${file_name}
 * 版權:    Copyright by ljm
 * 描述:    activemq簡單但配置(支援佇列和主題)
 * 修改人:  HuamingChen
 * 修改時間:2018/10/23
 * 跟蹤單號:
 * 修改單號:
 * 修改內容:
 */
@Configuration
@EnableConfigurationProperties({ActiveMQProperties.class})
public class MqConfig {
    /**
     * 佇列名稱
     */
    @Value("${spring.activemq.queueName}")
    private String queueName;
    /**
     * 主題名稱
     */
    @Value("${spring.activemq.topicName}")
    private String topicName;

    /**
     * 佇列
     * @return
     */
    @Bean
    public Queue getQueue(){
        return new ActiveMQQueue(queueName);
    }

    /**
     * 主題
     * @return
     */
    @Bean
    public Topic getTopic(){
        return new ActiveMQTopic(topicName);
    }

    /**
     * 連結工廠
     * @return
     */
    @Bean
    public ActiveMQConnectionFactory connectionFactory(ActiveMQProperties properties) {
        return new ActiveMQConnectionFactory(properties.getUser(), properties.getPassword(), properties.getBrokerUrl());
    }

    /**
     * 主題監聽
     * @param connectionFactory
     * @return
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setSessionTransacted(true);
        bean.setConcurrency("3-10");
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }

    /**
     * 佇列監聽
     * @param connectionFactory
     * @return
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }

    /**
     * jms訊息模版
     * @param connectionFactory
     * @return
     */
    @Bean
    @Primary
    public JmsMessagingTemplate jmsMessagingTemplate(ActiveMQConnectionFactory connectionFactory){
        return new JmsMessagingTemplate(connectionFactory);
    }
}

1.3 簡單使用

package com.ming.activitymq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;

/**
 * 檔名:  ${file_name}
 * 版權:    Copyright by ljm
 * 描述:
 * 修改人:  HuamingChen
 * 修改時間:2018/10/23
 * 跟蹤單號:
 * 修改單號:
 * 修改內容:
 */
@RestController
@RequestMapping(value="/mq")
public class ActivityMQController {
    private Logger logger= LoggerFactory.getLogger(ActivityMQController.class);
    @Resource
    private JmsMessagingTemplate jmsMessagingTemplate;
    @Resource
    private Queue queue;
    @Resource
    private Topic topic;

    @RequestMapping(value="queueSend")
    @ResponseBody
    public void queueSend(String message) {
        jmsMessagingTemplate.convertAndSend(queue,message);
    }

    @RequestMapping(value="topicSend")
    @ResponseBody
    public void topicSend(String message) {
        jmsMessagingTemplate.convertAndSend(topic,message);
    }

    @JmsListener(destination = "mingQueue",containerFactory = "jmsListenerContainerQueue")
    public void listenQueueMQ(String message){
        logger.info("**************************佇列訊息="+message+"************************************");
    }

    @JmsListener(destination = "mingTopic",containerFactory = "jmsListenerContainerTopic")
    public void listenTopicMQ(String message){
        logger.info("**************************主題訊息"+message+"************************************");
    }
}

二、傳統方法

2.1 基礎流程
按照JMS的規範,我們首先需要獲得一個JMS connection factory.,通過這個connection factory來建立connection.在這個基礎之上我們再建立session, destination, producer和consumer。因此主要的幾個步驟如下:

1. 獲得JMS connection factory. 通過我們提供特定環境的連線資訊來構造factory。
2. 利用factory構造JMS connection
3. 啟動connection
4. 通過connection建立JMS session.
5. 指定JMS destination.
6. 建立JMS producer或者建立JMS message並提供destination.
7. 建立JMS consumer或註冊JMS message listener.
8. 傳送和接收JMS message.
9. 關閉所有JMS資源,包括connection, session, producer, consumer等。

2.2 建立消費者

package com.ming.activitymq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.UUID;

/**
 * 檔名:  ${file_name}
 * 版權:    Copyright by ljm
 * 描述:    mq提供者
 * 修改人:  HuamingChen
 * 修改時間:2018/10/23
 * 跟蹤單號:
 * 修改單號:
 * 修改內容:
 */
public class MQProduct {
    private static final String USERNAME= ActiveMQConnection.DEFAULT_USER; // 預設的連線使用者名稱
    private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD; // 預設的連線密碼
    private static final String BROKEURL= ActiveMQConnection.DEFAULT_BROKER_URL; // 預設的連線地址

    public static void main(String[] args){
        //連線工廠
        ConnectionFactory connectionFactory=null;
        //連線
        Connection connection=null;
        //會話
        Session session=null;
        //目的地
        Destination destination=null;
        //生產者
        MessageProducer messageProducer=null;
        //訊息
        TextMessage textMessage=null;
        try {
            //建立連線工廠
            connectionFactory=new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
            //建立連線
            connection=connectionFactory.createConnection();
            //開始連線
            connection.start();
            //建立會話 第一個引數是開啟事務true,第二個引數是訊息確認機制屬性
            session=connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
            //設定目的地
            //destination=session.createQueue("mingTest");//ptp模式
            destination=session.createTopic("mingTestTopic"); //訂閱模式
            //建立生產者
            messageProducer=session.createProducer(destination);
            //建立訊息
            textMessage=session.createTextMessage("測試訊息傳送"+ UUID.randomUUID().toString());
            //傳送訊息
            messageProducer.send(textMessage);
            //提交會話
            session.commit();
            //關閉連線
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

2.3 建立消費者

package com.ming.activitymq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;

/**
 * 檔名:  ${file_name}
 * 版權:    Copyright by ljm
 * 描述:
 * 修改人:  HuamingChen
 * 修改時間:2018/10/23
 * 跟蹤單號:
 * 修改單號:
 * 修改內容:
 */
public class MQConsumer {
    private static final Logger logger= LoggerFactory.getLogger(MQConsumer.class);
    private static final String USERNAME= ActiveMQConnection.DEFAULT_USER; // 預設的連線使用者名稱
    private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD; // 預設的連線密碼
    private static final String BROKEURL= ActiveMQConnection.DEFAULT_BROKER_URL; // 預設的連線地址

    public static void main(String[] args){
        //連線工廠
        ConnectionFactory connectionFactory=null;
        //連線
        Connection connection=null;
        //會話
        Session session=null;
        //目的地
        Destination destination=null;
        //消費者
        MessageConsumer messageConsumer=null;
        //訊息
        TextMessage textMessage=null;

        try {
            //建立連線工廠
            connectionFactory=new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
            //建立連線
            connection=connectionFactory.createConnection();
            //啟動連線
            connection.start();
            //建立會話
            session=connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
            //建立目標
            //destination=session.createQueue("mingTest");
            destination=session.createTopic("mingTestTopic");
            //建立消費者
            messageConsumer=session.createConsumer(destination);
            //獲取訊息
            while (true){
                textMessage= (TextMessage) messageConsumer.receive(10000);
                if(textMessage!=null){
                    logger.info("收到的訊息:"+textMessage.getText());
                }else{
                    break;
                }
            }
            session.commit();
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

2.4 消費者監聽模式

package com.ming.activitymq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;

/**
 * 檔名:  ${file_name}
 * 版權:    Copyright by ljm
 * 描述:
 * 修改人:  HuamingChen
 * 修改時間:2018/10/23
 * 跟蹤單號:
 * 修改單號:
 * 修改內容:
 */
public class MQConsumerListener {
    private static final Logger logger= LoggerFactory.getLogger(MQConsumerListener.class);
    private static final String USERNAME= ActiveMQConnection.DEFAULT_USER; // 預設的連線使用者名稱
    private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD; // 預設的連線密碼
    private static final String BROKEURL= ActiveMQConnection.DEFAULT_BROKER_URL; // 預設的連線地址

    public static void main(String[] args){
        //連線工廠
        ConnectionFactory connectionFactory=null;
        //連線
        Connection connection=null;
        //會話
        Session session=null;
        //目的地
        Destination destination=null;
        //消費者
        MessageConsumer messageConsumer=null;
        //訊息
        TextMessage textMessage=null;

        try {
            //建立連線工廠
            connectionFactory=new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
            //建立連線
            connection=connectionFactory.createConnection();
            //啟動連線
            connection.start();
            //建立會話
            session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            //建立目標
            //destination=session.createQueue("mingTest");//ptp模式
            destination=session.createTopic("mingTestTopic");//訂閱模式
            //建立消費者
            messageConsumer=session.createConsumer(destination);
            //註冊監聽
            messageConsumer.setMessageListener(new MQListener());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

2.5 實現監聽介面

package com.ming.activitymq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 檔名:  ${file_name}
 * 版權:    Copyright by ljm
 * 描述:    jms監聽介面
 * 修改人:  HuamingChen
 * 修改時間:2018/10/23
 * 跟蹤單號:
 * 修改單號:
 * 修改內容:
 */
public class MQListener implements MessageListener {
    private static final Logger logger= LoggerFactory.getLogger(MQListener.class);
    @Override
    public void onMessage(Message message) {
        try {
            logger.info("收到的訊息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

完整程式碼:
https://download.csdn.net/download/qq_16055765/10739393