1. 程式人生 > >activemq總結--重發機制

activemq總結--重發機制

在activemq中存在訊息確認機制,即ACK機制,ACK (Acknowledgement),即確認字元,在資料通訊中,接收站發給傳送站的一種傳輸類控制字元。表示發來的資料已確認接收無誤。JMS API中約定了Client端可以使用四種ACK_MODE,在javax.jms.Session介面中:
         AUTO_ACKNOWLEDGE = 1    自動確認
         CLIENT_ACKNOWLEDGE = 2    客戶端手動確認   
         DUPS_OK_ACKNOWLEDGE = 3    自動批量確認
         SESSION_TRANSACTED = 0    事務提交併確認
         此外AcitveMQ補充了一個自定義的ACK_MODE:    INDIVIDUAL_ACKNOWLEDGE = 4    單條訊息確認

Client端指定了ACK_MODE,但是在Client與broker在交換ACK指令的時候,還需要告知ACK_TYPE,ACK_TYPE表示此確認指令的型別,不同的ACK_TYPE將傳遞著訊息的狀態,broker可以根據不同的ACK_TYPE對訊息進行不同的操作。

 比如Consumer消費訊息時出現異常,就需要向broker傳送ACK指令,ACK_TYPE為"REDELIVERED_ACK_TYPE",那麼broker就會重新發送此訊息。在JMS API中並沒有定義ACT_TYPE,因為它通常是一種內部機制,並不會面向開發者。ActiveMQ中定義瞭如下幾種ACK_TYPE(參看MessageAck類):

         DELIVERED_ACK_TYPE = 0    訊息"已接收",但尚未處理結束
         STANDARD_ACK_TYPE = 2    "標準"型別,通常表示為訊息"處理成功",broker端可以刪除訊息了
         POSION_ACK_TYPE = 1    訊息"錯誤",通常表示"拋棄"此訊息,比如訊息重發多次後,都無法正確處理時,訊息將會被刪除或者DLQ(死信佇列)
         REDELIVERED_ACK_TYPE = 3    訊息需"重發",比如consumer處理訊息時丟擲了異常,broker稍後會重新發送此訊息
         INDIVIDUAL_ACK_TYPE = 4    表示只確認"單條訊息",無論在任何ACK_MODE下    
         UNMATCHED_ACK_TYPE = 5    BROKER間轉發訊息時,接收端"拒絕"訊息
    到目前為止,我們已經清楚了大概的原理: Client端在不同的ACK_MODE時,將意味著在不同的時機發送ACK指令,每個ACK Command中會包含ACK_TYPE,那麼broker端就可以根據ACK_TYPE來決定此訊息的後續操作. 

  下面就簡單的對activemq的重發機制做個案例說(重發機制也是基於確認機制上實現的)

一、訊息生成者

package com.schooling.activemq.producer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ProducerTest {

	private static final int SENDNUM = 10;
	public static void main(String[] args) throws Exception {
		ConnectionFactory connectionFactory;//連線工程
		Connection connection = null;//連線
		Session session;//會話 結束或簽字傳送訊息的執行緒
		Destination destination;//訊息的目的地
		MessageProducer messageProducer;//訊息生產者
		try {
			
			//例項化連線工廠
			//connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "failover:(tcp://115.159.89.80:51511,tcp://115.159.89.80:51512,tcp://115.159.89.80:51513)?randomize=false");
			connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://115.159.89.80:61616");
			//通過連線工程獲取連線
			connection = connectionFactory.createConnection();
			connection.start();//啟動連線
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//建立session
			destination = session.createQueue("testQueue");//建立佇列
			messageProducer = session.createProducer(destination);//建立訊息生產者
			
			sendMessage(session, messageProducer);
			session.commit();
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			if(connection != null)
				connection.close();
		}
		
	}
	
	public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
		for (int i = 0; i < 5; i++) {
			TextMessage message = session.createTextMessage("ActiveMq 傳送訊息"+i);
			messageProducer.send(message);
		}
	}
}

二、訊息消費者(這邊基於Spring實現的,使用監聽器)

1、spring-activemq.xml

 <!-- 將多個配置檔案讀取到容器中,交給Spring管理 -->
    <!-- 這裡支援多種定址方式:classpath和file -->
    <!-- 推薦使用file的方式引入,這樣可以將配置和程式碼分離 -->
    <!--<value>file:mq.properties</value>-->
    <bean id="mqPropertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <list>
                <value>classpath:activemq.properties</value>
            </list>
        </property>
    </bean>

    <!-- 第三方MQ工廠: ConnectionFactory -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <!-- ActiveMQ Address -->
        <property name="brokerURL" value="${activemq.brokerURL}"/>
        <property name="userName" value="${activemq.userName}"/>
        <property name="password" value="${activemq.password}"/>
        <!-- 是否非同步傳送 -->
        <property name="useAsyncSend" value="true"/>
        <!-- 引用重發機制 -->
        <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />
    </bean>

    <!--
        ActiveMQ為我們提供了一個PooledConnectionFactory,通過往裡面注入一個ActiveMQConnectionFactory
        可以用來將Connection、Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗,要依賴於 activemq-pool包
     -->
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="connectionFactory" ref="targetConnectionFactory"/>
        <property name="maxConnections" value="${activemq.pool.maxConnections}"/>
    </bean>

    <!-- 定義ReDelivery(重發機制)機制 ,重發時間間隔是100毫秒,最大重發次數是3次 -->
    <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
        <!--是否在每次嘗試重新發送失敗後,增長這個等待時間 -->
        <property name="useExponentialBackOff" value="true"/>
        <!--重發次數,預設為6次   這裡設定為1次 -->
        <property name="maximumRedeliveries" value="2"/>
        <!--重發時間間隔,預設為1秒 -->
        <property name="initialRedeliveryDelay" value="1000"/>
        <!--第一次失敗後重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這裡的2就是value -->
        <property name="backOffMultiplier" value="2"/>
        <!--最大傳送延遲,只在useExponentialBackOff為true時有效(V5.5),假設首次重連間隔為10ms,倍數為2,那麼第二次重連時間間隔為 20ms,
        第三次重連時間間隔為40ms,當重連時間間隔大的最大重連時間間隔時,以後每次重連時間間隔都為最大重連時間間隔。 -->
        <property name="maximumRedeliveryDelay" value="1000"/>
    </bean>

    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
    </bean>

    <!--這個是目的地-->
    <bean id="msgQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="${activemq.queueName}"/>
    </bean>

    <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
    <!-- 佇列模板 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="defaultDestinationName" value="${activemq.queueName}"/>
    </bean>

    <!-- 配置自定義監聽:MessageListener -->
    <bean id="msgQueueMessageListener" class="com.schooling.activemq.consumer.MsgQueueMessageListener"/>

    <!-- 將連線工廠、目標對了、自定義監聽注入jms模板 -->
    <bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="msgQueue"/>
        <property name="messageListener" ref="msgQueueMessageListener"/>
        <!--應答模式是 INDIVIDUAL_ACKNOWLEDGE-->
        <property name="sessionAcknowledgeMode" value="4"/>
    </bean>

2、spring-context.xml

<!-- 註釋配置 -->
    <context:annotation-config/>

    <!-- 掃描包起始位置 -->
    <context:component-scan base-package="com.schooling.activemq"/>

    <import resource="classpath:spring-activemq.xml"/>

.3、activemq.properties

# ActiveMQ Config
activemq.brokerURL=tcp://ip:61616
activemq.userName=admin
activemq.password=admin
activemq.pool.maxConnections=10
# queueName
activemq.queueName=testQueue

4、監聽器

MsgQueueMessageListener

package com.schooling.activemq.consumer;

import org.springframework.jms.listener.SessionAwareMessageListener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * Created with IntelliJ IDEA.
 * User:Tab.
 * Date:2018/4/26.
 * <p>
 * 消費者
 */
public class MsgQueueMessageListener implements SessionAwareMessageListener<Message> {

    @Override
    public void onMessage(Message message, Session session) throws JMSException {

        if (message instanceof TextMessage) {

            String msg = ((TextMessage) message).getText();

            System.out.println("============================================================");
            System.out.println("消費者收到的訊息:" + msg);
            System.out.println("============================================================");

            try {
                if ("我是佇列訊息002".equals(msg)) {
                    throw new RuntimeException("故意丟擲的異常");
                }
                // 只要被確認後  就會出隊,接受失敗沒有確認成功,會在原佇列裡面
                message.acknowledge();
            } catch (Exception e) {
                // 此不可省略 重發資訊使用
                session.recover();
            }
        }
    }
}

注:這邊message.acknowledge();是消費的確認,只要被確認後,訊息就是出隊,接受失敗沒有確認成功,會在原佇列裡面

     session.recover();是進行通知broker進行重發。

5、啟動類

public class MQConsumer {
	private static final Log log = LogFactory.getLog(MQConsumer.class);

	public static void main(String[] args) {
		try {
			ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml");
			context.start();
		} catch (Exception e) {
			log.error("==>MQ context start error:", e);
			System.exit(0);
		}
	}
}

執行結果:

============================================================
消費者收到的訊息:我是佇列訊息000
============================================================
============================================================
消費者收到的訊息:我是佇列訊息001
============================================================
============================================================
消費者收到的訊息:我是佇列訊息002
============================================================
============================================================
消費者收到的訊息:我是佇列訊息002
============================================================
============================================================
消費者收到的訊息:我是佇列訊息002
============================================================
============================================================
消費者收到的訊息:我是佇列訊息003
============================================================
============================================================
消費者收到的訊息:我是佇列訊息004
============================================================
============================================================
消費者收到的訊息:我是佇列訊息005
============================================================