ActiveMQ入門例項,以及類封裝
阿新 • • 發佈:2019-01-26
這篇文章適合已經搭建好了activeMQ環境的人,需要封裝下activeMQ基本功能的人。封裝的不好,僅作參考。
package com.quhuhu.sync.util; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * * @author liang.he * */ public class ActiveMqCreater { //queue的名字,這個對應你在activeMQ上面建立的Queue的名字 public static String subject = "FirstQueue"; // ConnectionFactory :連線工廠,JMS 用它建立連線 public static ConnectionFactory connectionFactory = null; // Connection :JMS 客戶端到JMS Provider 的連線 public static Connection connection = null; // Session: 一個傳送或接收訊息的執行緒 private Session session = null; // Destination :訊息的目的地;訊息傳送給誰. public static Destination destination = null; // MessageProducer:訊息傳送者 private MessageProducer producer = null; //MessageConsumer : 訊息接收者 private MessageConsumer receiver = null; static { connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { connection = connectionFactory.createConnection(); connection.start(); } catch (JMSException e) { e.printStackTrace(); } } /** * * @return 初始化訊息傳送者 * @throws Exception */ public MessageProducer initMessageProducer() throws Exception{ session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //訊息目的地 destination = session.createQueue(subject); // 得到訊息【傳送者】 producer = session.createProducer(destination); // 設定不持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); return producer; } /** * * @return 初始化訊息接收者 * @throws JMSException */ public MessageConsumer initMessageConsumer() throws JMSException { session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //訊息目的地 destination = session.createQueue(subject); return session.createConsumer(destination); } /** * 訊息傳送方法 * @param msg 訊息 * @throws JMSException */ public void sendMsg(String msg) throws JMSException { try { System.out.println("fasongle "); initMessageProducer().send(session.createTextMessage(msg)); session.commit(); System.out.println("傳送訊息:" + msg); } catch (Exception e) { e.printStackTrace(); } } /** * 訊息接受方法 * @return msg 訊息 * @throws JMSException */ public String receiveMsg(MessageConsumer consumer) throws JMSException { TextMessage message = (TextMessage)consumer.receive(); if (null != message) { return message.getText(); } return null; } /** * 訊息接受方法 * @return msg 訊息 * @throws JMSException */ public String receiveMsg(MessageConsumer consumer, long time) throws JMSException { TextMessage message = (TextMessage)consumer.receive(time); if (null != message) { return message.getText(); } return null; } /** * 釋放MessageConsumer資源 * @throws JMSException */ public void close(MessageConsumer receiver) throws JMSException { System.out.println("close JMSconnection!"); if(null != receiver) { receiver.close(); } } /** * 釋放MessageProducer資源 * @throws JMSException */ public void close(MessageProducer producer) throws JMSException { System.out.println("close JMSconnection!"); if(null != producer) { producer.close(); } } /** * 釋放Session資源 * @throws JMSException */ public void close(Session session) throws JMSException { System.out.println("close JMSconnection!"); if(null != session) { session.close(); } } /** * 釋放Connection資源 * @throws JMSException */ public void close(Connection connection) throws JMSException { System.out.println("close JMSconnection!"); if(null != connection) { connection.close(); } } }
封裝的ActiveMQ類
下面是測試接受類:
import javax.jms.JMSException; import javax.jms.MessageConsumer; public class TestReceive { public static void main(String[] args) throws JMSException { JMSUtil js = new JMSUtil(); MessageConsumer consumer = js.initMessageConsumer(); while (true) { //設定接收者接收訊息的時間,為了便於測試,這裡誰定為100s System.out.println("收到訊息" + js.receiveMsg(consumer, 50000)); } } }
下面是測試傳送類:
import javax.jms.JMSException; public class TestSend { public static void main(String[] args) throws JMSException { JMSUtil js = new JMSUtil(); js.sendMsg("11111111111111ssssssssssssssssssssssassssssssssss"); js.sendMsg("2222222222sssssssssssssssssss2ssssssssss"); js.sendMsg("333333333333sssssssssssssssssssdssssssssss"); js.sendMsg("444444444sssssssssssssssssssssssssssss"); js.sendMsg("55555555555ssssssssss2sssssssssssssssssss"); js.sendMsg("666666666666sssssssssssss3ssssssssssssssss"); } }
執行的時候先執行接收類,在執行傳送類;
activemq的環境自行百度,很簡單,我這裡是使用的activeMQ的QUEUE訊息佇列,activeMQ還提供topic,感興趣的可以自己研究下他們的區別與用法。