ActiveMQ整合Spring JMS
JMS即Java訊息服務(Java Message Service),是Java平臺上的一套關於訊息中介軟體的規範,或者說是一套統一的API。支援JMS的訊息中介軟體有很多,ActiveMQ算是其中最常用的一個。
JMS兩種模型
JMS支援以下兩種模型,本文將會對這兩種模型分別介紹如何整合Spring:
- 點對點(Point-to-Point),對應的destination是Queue
- 釋出訂閱(Publish/Subscribe),對應的destination是Topic
建立連線
無論是點對點還是釋出訂閱,生產者或者消費者,第一步是要獲取連線。ActiveMQ提供了org.apache.activemq.pool.PooledConnectionFactory
<!-- ActiveMQ連線池 -->
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</property>
</bean>
生產者
Spring提供了JmsTemplate可以很方便的傳送訊息。
點對點模型
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination">
<bean class="org.apache.activemq.command.ActiveMQQueue"> <!-- 指定Destination為Queue(點對點模型) -->
<constructor-arg value="testqueue" /> <!-- Queue name -->
</bean>
</property>
</bean>
釋出訂閱模型
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination">
<bean class="org.apache.activemq.command.ActiveMQTopic"> <!-- 指定Destination為Topic(釋出訂閱模型) -->
<constructor-arg value="testtopic" /> <!-- Topic name -->
</bean>
</property>
</bean>
傳送訊息程式碼
使用上面定義的JmsTemplate,通過JmsTemplate.send方法可以傳送一條文字型別訊息。
@Autowired
private JmsTemplate jmsTemplate;
public void send(String mesasage) {
jmsTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(mesasage);
}
});
}
如果你正在使用Java 8,使用Lambda表示式傳送訊息會更加方便:
@Autowired
private JmsTemplate jmsTemplate;
public void send(String mesasage) {
jmsTemplate.send(session -> session.createTextMessage(mesasage));
}
消費者
JmsTemplate也可以作為消費者使用,但是它是同步的。下面介紹Spring JMS提供的非同步的消費者方案。
點對點模型
<!-- 訊息監聽器 -->
<bean id="messageListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination">
<bean class="org.apache.activemq.command.ActiveMQQueue"> <!-- 指定Destination為Queue(點對點模型) -->
<constructor-arg value="testqueue" /> <!-- Queue name -->
</bean>
</property>
<property name="messageListener">
<bean class="com.xxg.jms.listener.ConsumerMessageListener" />
</property>
</bean>
釋出訂閱模型
<!-- 訊息監聽器 -->
<bean id="messageListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination">
<bean class="org.apache.activemq.command.ActiveMQTopic"> <!-- 指定Destination為Topic(釋出訂閱模型) -->
<constructor-arg value="testtopic" /> <!-- Topic name -->
</bean>
</property>
<property name="messageListener">
<bean class="com.xxg.jms.listener.ConsumerMessageListener" />
</property>
</bean>
釋出訂閱模型訊息者持久訂閱(Durable Subscriber)
上面給出的是非持久訂閱的釋出訂閱模型消費者,這裡來單獨說一下持久訂閱。需要注意的是,持久訂閱(Durable Subscriber)並非訊息持久化(DeliveryMode.PERSISTENT),這是兩個不同的概念。
非持久訂閱的消費者,如果消費者程式掛了,那麼掛了的這段時間的訊息是收不到的,即使再重啟起來也收不到。持久訂閱消費者可以讓消費者在重啟後任然能收到停止的這段時間的訊息,避免遺漏。
要想使用持久訂閱,需要給消費者設定一個唯一的client ID和Subscriber Name,這樣可以讓ActiveMQ記住這個消費者,當消費者斷開連線程式停止,ActiveMQ也會給這個消費者保留這段時間內的訊息,下次同一個消費者(client ID和Subscriber Name相同)重新連線上還能收到訊息。
<!-- ActiveMQ連線池 -->
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
<property name="clientID" value="clientID_123456" /> <!-- 指定一個唯一的clientID -->
</bean>
</property>
</bean>
<!-- 訊息監聽器 -->
<bean id="messageListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination">
<bean class="org.apache.activemq.command.ActiveMQTopic"> <!-- 指定Destination為Topic(釋出訂閱模型) -->
<constructor-arg value="testtopic" /> <!-- Topic name -->
</bean>
</property>
<property name="messageListener">
<bean class="com.xxg.jms.listener.ConsumerMessageListener" />
</property>
<property name="durableSubscriptionName" value="durableSubscriptionName_123456" /> <!-- 指定一個唯一的Name -->
</bean>
接收訊息程式碼
public class ConsumerMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
System.out.println(((TextMessage) message).getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}