1. 程式人生 > >ActiveMQ的配置以及使用方法

ActiveMQ的配置以及使用方法

ActiveMQ的簡單使用

ActiveMQ是一種開源的,實現了JMS規範的,面向訊息(MOM)的中介軟體,為應用程式提供高效的、可擴充套件的、穩定的和安全的企業級訊息通訊

ActiveMQ接傳送訊息流程圖:


Spring結合ActiveMQ使用

1.pom檔案引入依賴,引入jar包

<!--active mq start-->  
<dependency>  
    <groupId>org.apache.activemq</groupId>  
    <artifactId>activemq-core</artifactId>  
    <version>5.7.0</version>  
</dependency>  
<dependency>  
    <groupId>org.apache.activemq</groupId>  
    <artifactId>activemq-pool</artifactId>  
    <version>5.13.3</version>  
</dependency>  
<!--active mq end--> 

 2 .spring-mq.xml 配置檔案

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
       xsi:schemaLocation="  
        http://www.springframework.org/schema/beans  
        http://www.springframework.org/schema/beans/spring-beans.xsd">  
  
    <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->  
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
        <!-- ActiveMQ服務地址 -->  
        <property name="brokerURL" value="${mq.brokerURL}"/>  
        <property name="userName" value="${mq.userName}"></property>  
        <property name="password" value="${mq.password}"></property>  
        <!-- 這裡定義重試策略,注意:只有持久化的才會重試-->  
        <property name="redeliveryPolicyMap" ref="redeliveryPolicyMap"/>  
    </bean>  
  
  
    <!--這裡設定各個訊息佇列的重發機制-->  
    <bean id="redeliveryPolicyMap" class="org.apache.activemq.broker.region.policy.RedeliveryPolicyMap">  
        <property name="redeliveryPolicyEntries">  
            <list>  
                <ref bean="smsRedeliveryPolicy"/>  
                <ref bean="mailRedeliveryPolicy"/>  
            </list>  
        </property>  
    </bean>  
    <bean id="smsRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">  
        <!--重發次數 延時、延時係數、延時指數開關、目標(重發等待時間1s, 2s, 4s, 8s)-->  
        <property name="maximumRedeliveries" value="3"/>  
        <property name="redeliveryDelay" value="1000"/>  
        <property name="backOffMultiplier" value="2"/>  
        <property name="useExponentialBackOff" value="true"/>  
        <property name="destination" ref="smsQueue"/>  
    </bean>  
    <bean id="mailRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">  
        <!--重發次數 延時、延時係數、延時指數開關-->  
        <property name="maximumRedeliveries" value="2"/>  
        <property name="redeliveryDelay" value="5000"/>  
        <property name="destination" ref="mailQueue"/>  
    </bean>  
  
    <!--  
        ActiveMQ為我們提供了一個PooledConnectionFactory,通過往裡面注入一個ActiveMQConnectionFactory  
        可以用來將Connection、Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗。  
        要依賴於 activemq-pool包  
     -->  
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">  
        <property name="connectionFactory" ref="targetConnectionFactory"/>  
        <property name="maxConnections" value="${mq.pool.maxConnections}"/>  
    </bean>  
  
    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->  
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>  
        <property name="reconnectOnException" value="true"/>  
    </bean>  
  
    <!-- 佇列目的地-->  
    <bean id="smsQueue" class="org.apache.activemq.command.ActiveMQQueue">  
        <constructor-arg index="0" value="${sms.queueName}"/>  
    </bean>  
    <bean id="mailQueue" class="org.apache.activemq.command.ActiveMQQueue">  
        <constructor-arg index="0" value="${mail.queueName}"/>  
    </bean>  
  
  
    <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->  
    <!-- 佇列模板 這裡配置2個,一個用於分散式業務,一個用於傳送郵件-->  
    <bean id="smsMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
        <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->  
        <property name="connectionFactory" ref="connectionFactory"/>  
        <property name="defaultDestination" ref="smsQueue"/>  
        <!-- 使 deliveryMode, priority, timeToLive設定生效-->  
        <property name="explicitQosEnabled" value="true" />  
        <!-- 持久化 如果設定為非持久化MQ伺服器重啟後MQ中的資料會丟失-->  
        <property name="deliveryPersistent" value="true"/>  
        <!--這裡注意:如果不開啟事務,訊息在異常的情況下是不會重試的-->  
        <property name="sessionTransacted" value="false"/>  
    </bean>  
  
    <bean id="mailMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
        <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->  
        <property name="connectionFactory" ref="connectionFactory"/>  
        <property name="defaultDestination" ref="mailQueue"/>  
        <!-- 使 deliveryMode, priority, timeToLive設定生效-->  
        <property name="explicitQosEnabled" value="true" />  
        <!-- 持久化 如果設定為非持久化MQ伺服器重啟後MQ中的資料會丟失-->  
        <property name="deliveryPersistent" value="true"/>  
        <!--這裡注意:如果不開啟事務,訊息在異常的情況下是不會重試的-->  
        <property name="sessionTransacted" value="true"/>  
    </bean>  
  
    <!-- 訊息監聽實現方法一 -->  
    <bean id="smsListener" class="com.cn.ssm.mq.listener.SmsMessageListener"/>  
    <bean id="mailListener" class="com.cn.ssm.mq.listener.MailMessageListener"/>  
  
  
    <!-- 訊息接收監聽器用於非同步接收訊息-->  
    <bean id="smsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="connectionFactory"/>  
        <property name="destination" ref="smsQueue"/>  
        <property name="messageListener" ref="smsListener"/>  
        <!--這裡注意:如果不開啟事務,訊息在異常的情況下是不會重試的-->  
        <property name="sessionTransacted" value="true"/> 
	<!-- 同時啟動幾個listener消費訊息  或者可使用 
		<property name="concurrency" value="4-8"/> 
		可以根據訊息佇列中的訊息規模自動調整並行數量,最小4, 最大8個。 -->  		
 	<property name="concurrentConsumers" value="1"/> 
	</bean>


	
<!- 可定義多個訊息監聽容器,監聽不同佇列內容 -->
    <bean id="mailContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="connectionFactory"/>  
        <property name="destination" ref="mailQueue"/>  
        <property name="messageListener" ref="mailListener"/>  
        <!--這裡注意:如果不開啟事務,訊息在異常的情況下是不會重試的-->  
        <property name="sessionTransacted" value="true"/>  
        <property name="concurrentConsumers" value="1"/>  
    </bean>  
  </beans>






傳送端程式碼:
@RunWith(SpringJUnit4ClassRunner.class)  
@ContextConfiguration("classpath:application.xml")  
public class ActiveMqProducer {

	private final Logger log = LoggerFactory.getLogger(ActiveMqProducer.class);

	@Autowired
	private JmsTemplate smsMqJmsTemplate;

	@Test
	public void smsSend() throws Exception {
		bizMqJmsTemplate.setSessionTransacted(true);
		for (int i = 0; i < 1; i++) {
			log.info("==>send message" + i);
			bizMqJmsTemplate.send(new MessageCreator() {
				@Override
				public Message createMessage(Session session)
						throws JMSException {
					log.info("getTransacted:" + session.getTransacted());
					Sms sms = new Sms("你好,你的驗證碼是", 365986);
					return session.createTextMessage(JSONObject
							.toJSONString(sms));
				}
			});
			log.info("==>finish send message" + i);
		}

	}

}

接收端程式碼:
@Component  
public class SmsMessageListener implements SessionAwareMessageListener<Message> {  
  
    private static final Logger log = LoggerFactory.getLogger(TransactionBizMessageListener.class);  
    private final String transactionBiz = "testDistributedTransaction";  
  
    @Autowired  
    private TransactionBizService transactionBizService;  
  
    /** 
     * @param message 
     * @param session 
     */  
    public void onMessage(Message message, Session session) throws JMSException{  
        //這裡建議不要try catch,讓異常丟擲,通過redeliveryPolicy去重試,達到重試次數進入死信DLQ(Dead Letter Queue)  
        ActiveMQTextMessage msg = (ActiveMQTextMessage) message;  
        String ms = ms = msg.getText();  
        log.info("==>receive message:" + ms);  
        // 轉換成相應的物件  
        Sms sms = JSONObject.parseObject(ms, Sms.class);  
        if (sms != null ) {  
            // do something 
            //throw new RuntimeException("throw runtimeExcetpion");  
        } else {  
            log.info("==>message:" + ms + " sms is null!");  
        }  
    }  
}  





//普通java 程式碼類實現
//傳送端
public static void main(String[] args) {
	ConnectionFactory connectionFactory;
	Connection connection;
	Session session;
	Destination destination;
	MessageProducer producer;
	connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.1.100:61616");
	try {
		connection = connectionFactory.createConnection();
		connection.start();
		//第一個引數是是否是事務型訊息,設定為true,第二個引數無效
		//第二個引數是
		//Session.AUTO_ACKNOWLEDGE為自動確認,客戶端傳送和接收訊息不需要做額外的工作。異常也會確認訊息,應該是在執行之前確認的
		//Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到訊息後,必須呼叫javax.jms.Message的acknowledge方法。jms伺服器才會刪除訊息。可以在失敗的
		//時候不確認訊息,不確認的話不會移出佇列,一直存在,下次啟動繼續接受。接收訊息的連線不斷開,其他的消費者也不會接受(正常情況下佇列模式不存在其他消費者)
		//DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程式的方法呼叫從處理訊息處返回,會話物件就會確認訊息的接收;而且允許重複確認。在需要考慮資源使用時,這種模式非常有效。
		//待測試
		session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
		destination = session.createQueue("test-queue");
		producer = session.createProducer(destination);
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//是否持久化,不持久化,一旦服務掛掉 重啟,訊息會丟失
		//優先順序不能影響先進先出。。。那這個用處究竟是什麼呢呢呢呢
		
		for(int i=0;i<100;i++){
			
			producer.send(session.createTextMessage("send:"+i));
		}
		producer.close();
		
	} catch (JMSException e) {
		e.printStackTrace();
	}
}



// 接收端  
public static void main(String[] args) {
	ConnectionFactory connectionFactory;
	// Connection :JMS 客戶端到JMS Provider 的連線  
	Connection connection = null;
	// Session: 一個傳送或接收訊息的執行緒  
	Session session;
	// Destination :訊息的目的地;訊息傳送給誰.  
	Destination destination;
	// 消費者,訊息接收者  
	MessageConsumer consumer;
	connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.1.100:61616");
	try {
		// 構造從工廠得到連線物件  
		connection = connectionFactory.createConnection();
		// 啟動  
		connection.start();
		// 獲取操作連線  
		//這個最好還是有事務
		session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		// 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置  
		destination = session.createQueue("test-queue");
		consumer = session.createConsumer(destination);
		consumer.setMessageListener(new MessageListener() {
			@Override
			public void onMessage(Message message) {
				try {
					if (null != message) {
						System.out.println("收到訊息" +((TextMessage)message).getText());
					}
				} catch (Exception e) {
					// TODO: handle exception
				}
			}
		});
	} catch (Exception e) {
		e.printStackTrace();
	}
}