activemq總結--重發機制
在activemq中存在訊息確認機制,即ACK機制,ACK (Acknowledgement),即確認字元,在資料通訊中,接收站發給傳送站的一種傳輸類控制字元。表示發來的資料已確認接收無誤。JMS API中約定了Client端可以使用四種ACK_MODE,在javax.jms.Session介面中:
AUTO_ACKNOWLEDGE = 1 自動確認
CLIENT_ACKNOWLEDGE = 2 客戶端手動確認
DUPS_OK_ACKNOWLEDGE = 3 自動批量確認
SESSION_TRANSACTED = 0 事務提交併確認
此外AcitveMQ補充了一個自定義的ACK_MODE: INDIVIDUAL_ACKNOWLEDGE = 4 單條訊息確認
Client端指定了ACK_MODE,但是在Client與broker在交換ACK指令的時候,還需要告知ACK_TYPE,ACK_TYPE表示此確認指令的型別,不同的ACK_TYPE將傳遞著訊息的狀態,broker可以根據不同的ACK_TYPE對訊息進行不同的操作。
比如Consumer消費訊息時出現異常,就需要向broker傳送ACK指令,ACK_TYPE為"REDELIVERED_ACK_TYPE",那麼broker就會重新發送此訊息。在JMS API中並沒有定義ACT_TYPE,因為它通常是一種內部機制,並不會面向開發者。ActiveMQ中定義瞭如下幾種ACK_TYPE(參看MessageAck類):
DELIVERED_ACK_TYPE = 0 訊息"已接收",但尚未處理結束
STANDARD_ACK_TYPE = 2 "標準"型別,通常表示為訊息"處理成功",broker端可以刪除訊息了
POSION_ACK_TYPE = 1 訊息"錯誤",通常表示"拋棄"此訊息,比如訊息重發多次後,都無法正確處理時,訊息將會被刪除或者DLQ(死信佇列)
REDELIVERED_ACK_TYPE = 3 訊息需"重發",比如consumer處理訊息時丟擲了異常,broker稍後會重新發送此訊息
INDIVIDUAL_ACK_TYPE = 4 表示只確認"單條訊息",無論在任何ACK_MODE下
UNMATCHED_ACK_TYPE = 5 BROKER間轉發訊息時,接收端"拒絕"訊息
到目前為止,我們已經清楚了大概的原理: Client端在不同的ACK_MODE時,將意味著在不同的時機發送ACK指令,每個ACK Command中會包含ACK_TYPE,那麼broker端就可以根據ACK_TYPE來決定此訊息的後續操作.
下面就簡單的對activemq的重發機制做個案例說(重發機制也是基於確認機制上實現的)
一、訊息生成者
package com.schooling.activemq.producer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ProducerTest {
private static final int SENDNUM = 10;
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory;//連線工程
Connection connection = null;//連線
Session session;//會話 結束或簽字傳送訊息的執行緒
Destination destination;//訊息的目的地
MessageProducer messageProducer;//訊息生產者
try {
//例項化連線工廠
//connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "failover:(tcp://115.159.89.80:51511,tcp://115.159.89.80:51512,tcp://115.159.89.80:51513)?randomize=false");
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://115.159.89.80:61616");
//通過連線工程獲取連線
connection = connectionFactory.createConnection();
connection.start();//啟動連線
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//建立session
destination = session.createQueue("testQueue");//建立佇列
messageProducer = session.createProducer(destination);//建立訊息生產者
sendMessage(session, messageProducer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
}finally{
if(connection != null)
connection.close();
}
}
public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
for (int i = 0; i < 5; i++) {
TextMessage message = session.createTextMessage("ActiveMq 傳送訊息"+i);
messageProducer.send(message);
}
}
}
二、訊息消費者(這邊基於Spring實現的,使用監聽器)
1、spring-activemq.xml
<!-- 將多個配置檔案讀取到容器中,交給Spring管理 -->
<!-- 這裡支援多種定址方式:classpath和file -->
<!-- 推薦使用file的方式引入,這樣可以將配置和程式碼分離 -->
<!--<value>file:mq.properties</value>-->
<bean id="mqPropertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:activemq.properties</value>
</list>
</property>
</bean>
<!-- 第三方MQ工廠: ConnectionFactory -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!-- ActiveMQ Address -->
<property name="brokerURL" value="${activemq.brokerURL}"/>
<property name="userName" value="${activemq.userName}"/>
<property name="password" value="${activemq.password}"/>
<!-- 是否非同步傳送 -->
<property name="useAsyncSend" value="true"/>
<!-- 引用重發機制 -->
<property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />
</bean>
<!--
ActiveMQ為我們提供了一個PooledConnectionFactory,通過往裡面注入一個ActiveMQConnectionFactory
可以用來將Connection、Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗,要依賴於 activemq-pool包
-->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory"/>
<property name="maxConnections" value="${activemq.pool.maxConnections}"/>
</bean>
<!-- 定義ReDelivery(重發機制)機制 ,重發時間間隔是100毫秒,最大重發次數是3次 -->
<bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<!--是否在每次嘗試重新發送失敗後,增長這個等待時間 -->
<property name="useExponentialBackOff" value="true"/>
<!--重發次數,預設為6次 這裡設定為1次 -->
<property name="maximumRedeliveries" value="2"/>
<!--重發時間間隔,預設為1秒 -->
<property name="initialRedeliveryDelay" value="1000"/>
<!--第一次失敗後重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這裡的2就是value -->
<property name="backOffMultiplier" value="2"/>
<!--最大傳送延遲,只在useExponentialBackOff為true時有效(V5.5),假設首次重連間隔為10ms,倍數為2,那麼第二次重連時間間隔為 20ms,
第三次重連時間間隔為40ms,當重連時間間隔大的最大重連時間間隔時,以後每次重連時間間隔都為最大重連時間間隔。 -->
<property name="maximumRedeliveryDelay" value="1000"/>
</bean>
<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>
<!--這個是目的地-->
<bean id="msgQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="${activemq.queueName}"/>
</bean>
<!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
<!-- 佇列模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestinationName" value="${activemq.queueName}"/>
</bean>
<!-- 配置自定義監聽:MessageListener -->
<bean id="msgQueueMessageListener" class="com.schooling.activemq.consumer.MsgQueueMessageListener"/>
<!-- 將連線工廠、目標對了、自定義監聽注入jms模板 -->
<bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="msgQueue"/>
<property name="messageListener" ref="msgQueueMessageListener"/>
<!--應答模式是 INDIVIDUAL_ACKNOWLEDGE-->
<property name="sessionAcknowledgeMode" value="4"/>
</bean>
2、spring-context.xml
<!-- 註釋配置 -->
<context:annotation-config/>
<!-- 掃描包起始位置 -->
<context:component-scan base-package="com.schooling.activemq"/>
<import resource="classpath:spring-activemq.xml"/>
.3、activemq.properties
# ActiveMQ Config
activemq.brokerURL=tcp://ip:61616
activemq.userName=admin
activemq.password=admin
activemq.pool.maxConnections=10
# queueName
activemq.queueName=testQueue
4、監聽器
MsgQueueMessageListener
package com.schooling.activemq.consumer;
import org.springframework.jms.listener.SessionAwareMessageListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* Created with IntelliJ IDEA.
* User:Tab.
* Date:2018/4/26.
* <p>
* 消費者
*/
public class MsgQueueMessageListener implements SessionAwareMessageListener<Message> {
@Override
public void onMessage(Message message, Session session) throws JMSException {
if (message instanceof TextMessage) {
String msg = ((TextMessage) message).getText();
System.out.println("============================================================");
System.out.println("消費者收到的訊息:" + msg);
System.out.println("============================================================");
try {
if ("我是佇列訊息002".equals(msg)) {
throw new RuntimeException("故意丟擲的異常");
}
// 只要被確認後 就會出隊,接受失敗沒有確認成功,會在原佇列裡面
message.acknowledge();
} catch (Exception e) {
// 此不可省略 重發資訊使用
session.recover();
}
}
}
}
注:這邊message.acknowledge();是消費的確認,只要被確認後,訊息就是出隊,接受失敗沒有確認成功,會在原佇列裡面
session.recover();是進行通知broker進行重發。
5、啟動類
public class MQConsumer {
private static final Log log = LogFactory.getLog(MQConsumer.class);
public static void main(String[] args) {
try {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml");
context.start();
} catch (Exception e) {
log.error("==>MQ context start error:", e);
System.exit(0);
}
}
}
執行結果:
============================================================
消費者收到的訊息:我是佇列訊息000
============================================================
============================================================
消費者收到的訊息:我是佇列訊息001
============================================================
============================================================
消費者收到的訊息:我是佇列訊息002
============================================================
============================================================
消費者收到的訊息:我是佇列訊息002
============================================================
============================================================
消費者收到的訊息:我是佇列訊息002
============================================================
============================================================
消費者收到的訊息:我是佇列訊息003
============================================================
============================================================
消費者收到的訊息:我是佇列訊息004
============================================================
============================================================
消費者收到的訊息:我是佇列訊息005
============================================================