ActiveMQ的配置以及使用方法
阿新 • • 發佈:2019-02-19
ActiveMQ的簡單使用
ActiveMQ是一種開源的,實現了JMS規範的,面向訊息(MOM)的中介軟體,為應用程式提供高效的、可擴充套件的、穩定的和安全的企業級訊息通訊
ActiveMQ接傳送訊息流程圖:
Spring結合ActiveMQ使用
1.pom檔案引入依賴,引入jar包
<!--active mq start--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.13.3</version> </dependency> <!--active mq end-->
2 .spring-mq.xml 配置檔案
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- ActiveMQ服務地址 --> <property name="brokerURL" value="${mq.brokerURL}"/> <property name="userName" value="${mq.userName}"></property> <property name="password" value="${mq.password}"></property> <!-- 這裡定義重試策略,注意:只有持久化的才會重試--> <property name="redeliveryPolicyMap" ref="redeliveryPolicyMap"/> </bean> <!--這裡設定各個訊息佇列的重發機制--> <bean id="redeliveryPolicyMap" class="org.apache.activemq.broker.region.policy.RedeliveryPolicyMap"> <property name="redeliveryPolicyEntries"> <list> <ref bean="smsRedeliveryPolicy"/> <ref bean="mailRedeliveryPolicy"/> </list> </property> </bean> <bean id="smsRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> <!--重發次數 延時、延時係數、延時指數開關、目標(重發等待時間1s, 2s, 4s, 8s)--> <property name="maximumRedeliveries" value="3"/> <property name="redeliveryDelay" value="1000"/> <property name="backOffMultiplier" value="2"/> <property name="useExponentialBackOff" value="true"/> <property name="destination" ref="smsQueue"/> </bean> <bean id="mailRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> <!--重發次數 延時、延時係數、延時指數開關--> <property name="maximumRedeliveries" value="2"/> <property name="redeliveryDelay" value="5000"/> <property name="destination" ref="mailQueue"/> </bean> <!-- ActiveMQ為我們提供了一個PooledConnectionFactory,通過往裡面注入一個ActiveMQConnectionFactory 可以用來將Connection、Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗。 要依賴於 activemq-pool包 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory"/> <property name="maxConnections" value="${mq.pool.maxConnections}"/> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="pooledConnectionFactory"/> <property name="reconnectOnException" value="true"/> </bean> <!-- 佇列目的地--> <bean id="smsQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="${sms.queueName}"/> </bean> <bean id="mailQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="${mail.queueName}"/> </bean> <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 --> <!-- 佇列模板 這裡配置2個,一個用於分散式業務,一個用於傳送郵件--> <bean id="smsMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 --> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestination" ref="smsQueue"/> <!-- 使 deliveryMode, priority, timeToLive設定生效--> <property name="explicitQosEnabled" value="true" /> <!-- 持久化 如果設定為非持久化MQ伺服器重啟後MQ中的資料會丟失--> <property name="deliveryPersistent" value="true"/> <!--這裡注意:如果不開啟事務,訊息在異常的情況下是不會重試的--> <property name="sessionTransacted" value="false"/> </bean> <bean id="mailMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 --> <property name="connectionFactory" ref="connectionFactory"/> <property name="defaultDestination" ref="mailQueue"/> <!-- 使 deliveryMode, priority, timeToLive設定生效--> <property name="explicitQosEnabled" value="true" /> <!-- 持久化 如果設定為非持久化MQ伺服器重啟後MQ中的資料會丟失--> <property name="deliveryPersistent" value="true"/> <!--這裡注意:如果不開啟事務,訊息在異常的情況下是不會重試的--> <property name="sessionTransacted" value="true"/> </bean> <!-- 訊息監聽實現方法一 --> <bean id="smsListener" class="com.cn.ssm.mq.listener.SmsMessageListener"/> <bean id="mailListener" class="com.cn.ssm.mq.listener.MailMessageListener"/> <!-- 訊息接收監聽器用於非同步接收訊息--> <bean id="smsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="smsQueue"/> <property name="messageListener" ref="smsListener"/> <!--這裡注意:如果不開啟事務,訊息在異常的情況下是不會重試的--> <property name="sessionTransacted" value="true"/> <!-- 同時啟動幾個listener消費訊息 或者可使用 <property name="concurrency" value="4-8"/> 可以根據訊息佇列中的訊息規模自動調整並行數量,最小4, 最大8個。 --> <property name="concurrentConsumers" value="1"/> </bean> <!- 可定義多個訊息監聽容器,監聽不同佇列內容 --> <bean id="mailContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="mailQueue"/> <property name="messageListener" ref="mailListener"/> <!--這裡注意:如果不開啟事務,訊息在異常的情況下是不會重試的--> <property name="sessionTransacted" value="true"/> <property name="concurrentConsumers" value="1"/> </bean> </beans>
傳送端程式碼: @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:application.xml") public class ActiveMqProducer { private final Logger log = LoggerFactory.getLogger(ActiveMqProducer.class); @Autowired private JmsTemplate smsMqJmsTemplate; @Test public void smsSend() throws Exception { bizMqJmsTemplate.setSessionTransacted(true); for (int i = 0; i < 1; i++) { log.info("==>send message" + i); bizMqJmsTemplate.send(new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { log.info("getTransacted:" + session.getTransacted()); Sms sms = new Sms("你好,你的驗證碼是", 365986); return session.createTextMessage(JSONObject .toJSONString(sms)); } }); log.info("==>finish send message" + i); } } } 接收端程式碼: @Component public class SmsMessageListener implements SessionAwareMessageListener<Message> { private static final Logger log = LoggerFactory.getLogger(TransactionBizMessageListener.class); private final String transactionBiz = "testDistributedTransaction"; @Autowired private TransactionBizService transactionBizService; /** * @param message * @param session */ public void onMessage(Message message, Session session) throws JMSException{ //這裡建議不要try catch,讓異常丟擲,通過redeliveryPolicy去重試,達到重試次數進入死信DLQ(Dead Letter Queue) ActiveMQTextMessage msg = (ActiveMQTextMessage) message; String ms = ms = msg.getText(); log.info("==>receive message:" + ms); // 轉換成相應的物件 Sms sms = JSONObject.parseObject(ms, Sms.class); if (sms != null ) { // do something //throw new RuntimeException("throw runtimeExcetpion"); } else { log.info("==>message:" + ms + " sms is null!"); } } } //普通java 程式碼類實現 //傳送端 public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection; Session session; Destination destination; MessageProducer producer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.1.100:61616"); try { connection = connectionFactory.createConnection(); connection.start(); //第一個引數是是否是事務型訊息,設定為true,第二個引數無效 //第二個引數是 //Session.AUTO_ACKNOWLEDGE為自動確認,客戶端傳送和接收訊息不需要做額外的工作。異常也會確認訊息,應該是在執行之前確認的 //Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到訊息後,必須呼叫javax.jms.Message的acknowledge方法。jms伺服器才會刪除訊息。可以在失敗的 //時候不確認訊息,不確認的話不會移出佇列,一直存在,下次啟動繼續接受。接收訊息的連線不斷開,其他的消費者也不會接受(正常情況下佇列模式不存在其他消費者) //DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程式的方法呼叫從處理訊息處返回,會話物件就會確認訊息的接收;而且允許重複確認。在需要考慮資源使用時,這種模式非常有效。 //待測試 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); destination = session.createQueue("test-queue"); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//是否持久化,不持久化,一旦服務掛掉 重啟,訊息會丟失 //優先順序不能影響先進先出。。。那這個用處究竟是什麼呢呢呢呢 for(int i=0;i<100;i++){ producer.send(session.createTextMessage("send:"+i)); } producer.close(); } catch (JMSException e) { e.printStackTrace(); } } // 接收端 public static void main(String[] args) { ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的連線 Connection connection = null; // Session: 一個傳送或接收訊息的執行緒 Session session; // Destination :訊息的目的地;訊息傳送給誰. Destination destination; // 消費者,訊息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.1.100:61616"); try { // 構造從工廠得到連線物件 connection = connectionFactory.createConnection(); // 啟動 connection.start(); // 獲取操作連線 //這個最好還是有事務 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置 destination = session.createQueue("test-queue"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { if (null != message) { System.out.println("收到訊息" +((TextMessage)message).getText()); } } catch (Exception e) { // TODO: handle exception } } }); } catch (Exception e) { e.printStackTrace(); } }