1. 程式人生 > >ActiveMQ——activemq的使用java代碼實例

ActiveMQ——activemq的使用java代碼實例

pubsub sendto ogg ror contain pre ted new mman

ActiveMQ 在java中的使用,通過單例模式、工廠實現

一、導jar包

<dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>
org.slf4j</groupId> <artifactId>jul-to-slf4j</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId>
<version>5.13.3</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.1.RELEASE</version> <exclusions
> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> </exclusion> </exclusions> </dependency>

二、java代碼

1、連接工廠 配置

package com.broadsense.iov.base.jms;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
/**
 * 連接工廠 配置
 * 
 * @author flm
 * 2017年10月13日
 */
public class ConnectionFactory
{
  private static final String URL = "tcp://113.106.93.254:61616";
  private static final String USERNAME = "hkadmin";
  private static final String PASSWORD = "hk667";
  private static final int SESSIONCACHESIZE = 20;
  private javax.jms.ConnectionFactory factory;

  public static synchronized javax.jms.ConnectionFactory getInstance()
  {
    if (SingletonHolder.INSTANCE.factory == null) {
      SingletonHolder.INSTANCE.build();
    }
    return SingletonHolder.INSTANCE.factory;
  }

  private void build()
  {
    AMQConfigBean bean = loadConfigure();
    this.factory = buildConnectionFactory(bean);
  }

  private javax.jms.ConnectionFactory buildConnectionFactory(AMQConfigBean bean) {
    javax.jms.ConnectionFactory targetFactory = new ActiveMQConnectionFactory(bean.getUserName(), bean.getPassword(), bean.getBrokerURL());

    CachingConnectionFactory connectoryFacotry = new CachingConnectionFactory();
    connectoryFacotry.setTargetConnectionFactory(targetFactory);
    connectoryFacotry.setSessionCacheSize(bean.getSessionCacheSize());

    return connectoryFacotry;
  }

  private AMQConfigBean loadConfigure() {
    if ("tcp://113.106.93.254:61616" != null) {
      try {
        return new AMQConfigBean("tcp://100.100.10.100:61616", "hkadmin", "hk667", 20);
      } catch (Exception e) {
        throw new IllegalStateException("load amq config error!");
      }
    }
    throw new IllegalStateException("load amq config error!");
  }

  private static class AMQConfigBean
  {
    private String brokerURL;
    private String userName;
    private String password;
    private int sessionCacheSize;

    public AMQConfigBean() {
    }

    public AMQConfigBean(String brokerURL, String userName, String password, int sessionCacheSize) {
      this.brokerURL = brokerURL;
      this.userName = userName;
      this.password = password;
      this.sessionCacheSize = sessionCacheSize;
    }

    public String getBrokerURL() {
      return this.brokerURL;
    }

    public void setBrokerURL(String brokerURL) {
      this.brokerURL = brokerURL;
    }

    public String getUserName() {
      return this.userName;
    }

    public void setUserName(String userName) {
      this.userName = userName;
    }

    public String getPassword() {
      return this.password;
    }

    public void setPassword(String password) {
      this.password = password;
    }

    public int getSessionCacheSize() {
      return this.sessionCacheSize;
    }

    public void setSessionCacheSize(int sessionCacheSize) {
      this.sessionCacheSize = sessionCacheSize;
    }
  }

  private static class SingletonHolder
  {
    static ConnectionFactory INSTANCE = new ConnectionFactory(null);
  }
}

2、模版

package com.broadsense.iov.base.jms;

import org.springframework.jms.core.JmsTemplate;

/**
* 模板廠
*
* @author flm
* 2017年10月13日
*/

public class JmsTemplateFactory
{
  private final javax.jms.ConnectionFactory factory;
  private JmsTemplate topicJmsTemplate;
  private JmsTemplate queueJmsTemplate;

  public static JmsTemplateFactory getInstance()
  {
    return SingletonHolder.INSTANCE;
  }

  private JmsTemplateFactory()
  {
    this.factory = ConnectionFactory.getInstance();
  }

  public synchronized JmsTemplate getTopicJmsTemplate() {
    if (this.topicJmsTemplate == null) {
      this.topicJmsTemplate = createTemplate(this.factory, true);
    }
    return this.topicJmsTemplate;
  }

  public synchronized JmsTemplate getQueueJmsTemplate() {
    if (this.queueJmsTemplate == null) {
      this.queueJmsTemplate = createTemplate(this.factory, false);
    }
    return this.queueJmsTemplate;
  }

  private JmsTemplate createTemplate(javax.jms.ConnectionFactory factory, boolean pubSubDomain) {
    JmsTemplate template = new JmsTemplate(factory);
    template.setPubSubDomain(pubSubDomain);
    return template;
  }

  public static class SingletonHolder
  {
    static JmsTemplateFactory INSTANCE = new JmsTemplateFactory(null);
  }
}

3、消費者 模版

package com.broadsense.iov.base.jms;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Destination;
import javax.jms.MessageListener;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.SimpleMessageListenerContainer;
/**
 * JMS監聽器  創建消費者
 * 
 * @author flm
 * 2017年10月13日
 */
public class JMSListener
{
  private static final Logger LOGGER = LoggerFactory.getLogger(JMSListener.class);
  private static final Map<String, Destination> MQDESTS = new ConcurrentHashMap();

  public static synchronized void startJmsQueueListener(String queueName, MessageListener listener)
  {
    startJmsQueueListener(queueName, null, listener);
  }
  public static synchronized void startJmsQueueListener(String queueName, String subName, MessageListener listener) {
    Destination dst = (Destination)MQDESTS.get("QUEUE_" + queueName);
    if (dst == null) {
      ActiveMQQueue mq = new ActiveMQQueue(queueName);
      startJmsListener(mq, subName, listener);
      MQDESTS.put("QUEUE_" + queueName, mq);
    } else {
      LOGGER.warn(queueName + " already started");
    }
  }

  public static synchronized void startJmsTopicListener(String topicName, MessageListener listener)
  {
    startJmsTopicListener(topicName, null, listener);
  }

  public static synchronized void startJmsTopicListener(String topicName, String subName, MessageListener listener) {
    ActiveMQTopic mq = new ActiveMQTopic(topicName);
    startJmsListener(mq, subName, listener);
    MQDESTS.put("QUEUE_" + topicName, mq);
  }

  private static void startJmsListener(Destination dest, String subName, MessageListener msgListener)
  {
    javax.jms.ConnectionFactory factory = ConnectionFactory.getInstance();

    SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer();
    listener.setConnectionFactory(factory);
    listener.setDestination(dest);
    listener.setMessageListener(msgListener);
    if ((subName != null) && (subName != "")) {
      listener.setDurableSubscriptionName(subName);
    }
    listener.start();
  }
}

4、生產者 模版

package com.broadsense.iov.base.jms;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

/**
 * 創建 jms生產者
 * 
 * @author flm
 * 2017年10月13日
 */
public class JMSPublisher
{
  public static void sendTopicMessage(String dest, String msg)
  {
    JmsTemplateFactory.getInstance().getTopicJmsTemplate().send(dest, new MessageCreator(msg)
    {
      public Message createMessage(Session session) throws JMSException {
        return session.createTextMessage(this.val$msg);
      }
    });
  }

  public static void sendQueueMessage(String dest, String msg)
  {
    JmsTemplateFactory.getInstance().getQueueJmsTemplate().send(dest, new MessageCreator(msg)
    {
      public Message createMessage(Session session) throws JMSException {
        return session.createTextMessage(this.val$msg);
      }
    });
  }
}

ActiveMQ——activemq的使用java代碼實例