spring activeMQ使用連線池工廠報錯
1:環境和版本
java:jdk7
spring:4.1.3
activemq:5.8.0
2:spring與activeMQ的結合配置
-
<?xml version="1.0" encoding="UTF-8"?>
-
<beans xmlns="http://www.springframework.org/schema/beans"
-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
-
">
-
<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
-
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
-
<property name="brokerURL" value="tcp://localhost:61636"></property>
-
</bean>
-
<!-- ActiveMQ為我們提供了一個PooledConnectionFactory,往裡面注入一個ActiveMQConnectionFactory可以用來將Connection,
-
Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗。
-
問題:使用poolConnectionFactory時候,用JMSTemplate同步迴圈接收訊息,因為JMSTemplate會自動在接收訊息後關閉連線,
-
所以迴圈到第二次的時候會報錯,這個問題待解決
-
問題:使用poolConnectionFactory時候,用監聽來接收訊息,會有部分訊息殘留在佇列裡面,問題待解決
-
結論:還是先別用連線池了-->
-
<bean id="poolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" >
-
<property name="connectionFactory" ref="activeMQConnectionFactory" />
-
<property name="maxConnections" value="10"/>
-
</bean>
-
<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory 這裡我使用的是singleConnectionFactory-->
-
<bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
-
<property name="targetConnectionFactory" ref="activeMQConnectionFactory"/>
-
</bean>
-
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
-
<property name="targetConnectionFactory" ref="activeMQConnectionFactory"/>
-
</bean>
-
<!-- 配置生產者 -->
-
<!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
-
<bean id="senderJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
-
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
-
<property name="connectionFactory" ref="singleConnectionFactory"/>
-
<!-- NON_PERSISTENT非持久化 1 ,PERSISTENT持久化 2 -->
-
<property name="deliveryMode" value="2"/>
-
<property name="sessionTransacted" value="true"/>
-
<property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE"/>
-
</bean>
-
<!--這個是佇列目的地,點對點的 -->
-
<bean id="activeMQQueue" class="org.apache.activemq.command.ActiveMQQueue">
-
<constructor-arg value="FirstQueue"/>
-
</bean>
-
<!--這個是主題目的地,一對多的 -->
-
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
-
<constructor-arg value="topic"/>
-
</bean>
-
<!-- 自定義消費者 -->
-
<!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
-
<bean id="receiverJmsTemplate" class="com.system.freemwork.amq.SimpleJmsTemplate">
-
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
-
<property name="connectionFactory" ref="singleConnectionFactory"/>
-
<!-- 如果是原生的amq建立的session,將session設定為true時候,ack會固定被設定為AUTO_ACKNOWLEDGE
-
所以想要手動確認,那麼session的事物必須設定為false,並且ack設定為CLIENT_ACKNOWLEDGE -->
-
<property name="sessionTransacted" value="false"/>
-
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"/>
-
<property name="receiveTimeout" value="1000"/>
-
<property name="autoAcknowledge" value="true"/>
-
</bean>
-
</beans>
3:編寫send測試類
-
package com.test.spring;
-
import org.junit.Test;
-
import org.springframework.beans.factory.annotation.Autowired;
-
import org.springframework.jms.core.JmsTemplate;
-
/**
-
* 類描述:sender測試類
-
*
-
* @author fengyong
-
* @version 1.0
-
* @since 1.0
-
* Created by fengyong on 16/8/3 下午7:45.
-
*/
-
public class ActiveMqSender extends BaseTest {
-
@Autowired
-
private JmsTemplate senderJmsTemplate;
-
@Test
-
public void activeMq(){
-
for(int i = 1;i<=10;i++){
-
senderJmsTemplate.convertAndSend("FirstQueue","我是第"+i+"個");
-
}
-
System.out.print("全部執行完畢!!!");
-
}
-
}
4:編寫receiver測試類
-
package com.test.spring;
-
import com.system.freemwork.amq.SimpleJmsTemplate;
-
import org.apache.activemq.command.ActiveMQQueue;
-
import org.junit.Test;
-
import org.springframework.beans.factory.annotation.Autowired;
-
import org.springframework.jms.core.JmsTemplate;
-
import javax.jms.JMSException;
-
import javax.jms.TextMessage;
-
/**
-
* 類描述:receiver測試類
-
*
-
* @author fengyong
-
* @version 1.0
-
* @since 1.0
-
* Created by fengyong on 16/8/8 下午5:28.
-
*/
-
public class ActiveMqReceiver extends BaseTest{
-
@Autowired
-
private SimpleJmsTemplate receiverJmsTemplate;
-
@Autowired
-
private ActiveMQQueue activeMQQueue;
-
/**
-
* 坑爹的方法,如果session事物設定為true,receiver直接將sessioin進行commit,
-
*
-
* 如果設定為false,receiver方法會直接判斷進行訊息確認,無法做到手動的訊息確認,所以一旦發生異常,這條訊息不會回到訊息佇列中
-
*
-
* session的提交可以認為是訊息確認收到
-
* @throws JMSException
-
*/
-
@Test
-
public void receiver() throws JMSException {
-
int i=1;
-
while (true){
-
i++;
-
TextMessage message = (TextMessage)receiverJmsTemplate.receive(activeMQQueue);
-
if (null != message) {
-
System.out.println("收到訊息==================" + message.getText());
-
} else {
-
System.out.print("超時10秒");
-
break;
-
}
-
}
-
}
-
}
注:如果session事物設定為true,receiver直接將sessioin進行commit.原始碼如下
if (session.getTransacted()) { // Commit necessary - but avoid commit call within a JTA transaction. if (isSessionLocallyTransacted(session)) { // Transacted session created by this template -> commit. JmsUtils.commitIfNecessary(session); } }
如果session事物設定為false,receiver方法會直接判斷進行訊息確認,無法做到手動的訊息確認,所以一旦發生異常,這條訊息不會回到訊息佇列中.原始碼如下
else if (isClientAcknowledge(session)) { // Manually acknowledge message, if any. if (message != null) { message.acknowledge(); } }
所以需要修改原始碼不讓其在receiver的時候自動確認收到訊息
5:新建SimpleJmsTemplate繼承JmsTemplate
-
package com.system.freemwork.amq;
-
import org.springframework.jms.JmsException;
-
import org.springframework.jms.connection.ConnectionFactoryUtils;
-
import org.springframework.jms.connection.JmsResourceHolder;
-
import org.springframework.jms.core.JmsTemplate;
-
import org.springframework.jms.core.SessionCallback;
-
import org.springframework.jms.support.JmsUtils;
-
import org.springframework.transaction.support.TransactionSynchronizationManager;
-
import org.springframework.util.Assert;
-
import sun.misc.resources.Messages_ja;
-
import javax.jms.Connection;
-
import javax.jms.JMSException;
-
import javax.jms.Message;
-
import javax.jms.MessageConsumer;
-
import javax.jms.Session;
-
/**
-
* 類描述:自定義JmsTemplate,實現客戶手動確認
-
*
-
* @author fengyong
-
* @version 1.0
-
* @since 1.0
-
* Created by fengyong on 16/8/10 上午10:03.
-
*/
-
public class SimpleJmsTemplate extends JmsTemplate {
-
private final JmsTemplateResourceFactory transactionalResourceFactory = new JmsTemplateResourceFactory();
-
/**
-
* 是否開啟手動確認標記
-
*/
-
private Boolean autoAcknowledge;
-
MessageConsumer consumer = null;
-
Session sessionToClose = null;
-
Connection conToClose = null;
-
boolean startConnection = false;
-
/**
-
* 接收訊息
-
* @param session
-
* @param consumer
-
* @return
-
* @throws JMSException
-
*/
-
protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException {
-
try {
-
this.consumer = consumer;
-
// Use transaction timeout (if available).
-
long timeout = getReceiveTimeout();
-
JmsResourceHolder resourceHolder =
-
(JmsResourceHolder) TransactionSynchronizationManager.getResource(getConnectionFactory());
-
if (resourceHolder != null && resourceHolder.hasTimeout()) {
-
timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis());
-
}
-
Message message = doReceive(consumer, timeout);
-
if (session.getTransacted()) {
-
// Commit necessary - but avoid commit call within a JTA transaction.
-
// 如果開啟了jta事物,那麼不會進行提交,jta事物會直接覆蓋掉session事物
-
if (isSessionLocallyTransacted(session)) {
-
// Transacted session created by this template -> commit.
-
JmsUtils.commitIfNecessary(session);
-
}
-
}
-
//autoAcknowledge如果為真,不進行自動確認
-
else if (isClientAcknowledge(session) && !autoAcknowledge) {
-
// Manually acknowledge message, if any.
-
if (message != null) {
-
message.acknowledge();
-
}
-
}
-
return message;
-
}
-
finally {
-
consumer = null;
-
}
-
}
-
/**
-
* 自定義的訊息確認,關閉consumer和sesseionToClose是父類本身就要執行的,這裡直接拷貝下來,能不改的地方儘量不改
-
* 該子類只是為了自定義確認訊息
-
* @param message
-
* @throws JMSException
-
*/
-
public void msgAckAndcloseSession(Message message) throws JMSException {
-
message.acknowledge();
-
JmsUtils.closeMessageConsumer(consumer);
-
JmsUtils.closeSession(sessionToClose);
-
ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection);
-
}
-
/**
-
* 由於上面的doReceive(Session session, MessageConsumer consumer)需要呼叫這個方法,
-
* 而在父類裡面這個方法是私有的,所以直接拷貝下來了
-
* @param consumer
-
* @param timeout
-
* @return
-
* @throws JMSException
-
*/
-
private Message doReceive(MessageConsumer consumer, long timeout) throws JMSException {
-
if (timeout == RECEIVE_TIMEOUT_NO_WAIT) {
-
return consumer.receiveNoWait();
-
}
-
else if (timeout > 0) {
-
return consumer.receive(timeout);
-
}
-
else {
-
return consumer.receive();
-
}
-
}
-
/**
-
* 該方法是為了防止確認訊息前session被關閉,不然確認訊息前session關閉會導致異常發生
-
* transactionalResourceFactory在父類中是私有且不可修改,因為只有這一個方法用到了transactionalResourceFactory
-
* 所以直接將JmsTemplateResourceFactory拷貝下來使用
-
* @param action
-
* @param startConnection
-
* @param <T>
-
* @return
-
* @throws JmsException
-
*/
-
public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException {
-
Assert.notNull(action, "Callback object must not be null");
-
this.startConnection = startConnection;
-
try {
-
Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
-
getConnectionFactory(), transactionalResourceFactory, startConnection);
-
if (sessionToUse == null) {
-
conToClose = createConnection();
-
sessionToClose = createSession(conToClose);
-
if (startConnection) {
-
conToClose.start();
-
}
-
sessionToUse = sessionToClose;
-
}
-
if (logger.isDebugEnabled()) {
-
logger.debug("Executing callback on JMS Session: " + sessionToUse);
-
}
-
return action.doInJms(sessionToUse);
-
}
-
catch (JMSException ex) {
-
throw convertJmsAccessException(ex);
-
}
-
finally {
-
sessionToClose = null;
-
conToClose = null;
-
startConnection = false;
-
}
-
}
-
/**
-
* Sets new 是否開啟手動確認標記.
-
*
-
* @param autoAcknowledge New value of 是否開啟手動確認標記.
-
*/
-
public void setAutoAcknowledge(Boolean autoAcknowledge) {
-
this.autoAcknowledge = autoAcknowledge;
-
}
-
/**
-
* Gets 是否開啟手動確認標記.
-
*
-
* @return Value of 是否開啟手動確認標記.
-
*/
-
public Boolean getAutoAcknowledge() {
-
return autoAcknowledge;
-
}
-
/**
-
* 直接拷貝下來的
-
*/
-
private class JmsTemplateResourceFactory implements ConnectionFactoryUtils.ResourceFactory {
-
@Override
-
public Connection getConnection(JmsResourceHolder holder) {
-
return SimpleJmsTemplate.this.getConnection(holder);
-
}
-
@Override
-
public Session getSession(JmsResourceHolder holder) {
-
return SimpleJmsTemplate.this.getSession(holder);
-
}
-
@Override
-
public Connection createConnection() throws JMSException {
-
return SimpleJmsTemplate.this.createConnection();
-
}
-
@Override
-
public Session createSession(Connection con) throws JMSException {
-
return SimpleJmsTemplate.this.createSession(con);
-
}
-
@Override
-
public boolean isSynchedLocalTransactionAllowed() {
-
return SimpleJmsTemplate.this.isSessionTransacted();
-
}
-
}
-
}
6:第二個receiver測試類
-
package com.test.spring;
-
import com.system.freemwork.amq.SimpleJmsTemplate;
-
import org.apache.activemq.command.ActiveMQQueue;
-
import org.junit.Test;
-
import org.springframework.beans.factory.annotation.Autowired;
-
import org.springframework.jms.core.JmsTemplate;
-
import javax.jms.JMSException;
-
import javax.jms.TextMessage;
-
/**
-
* 類描述:receiver測試類
-
*
-
* @author fengyong
-
* @version 1.0
-
* @since 1.0
-
* Created by fengyong on 16/8/8 下午5:28.
-
*/
-
public class ActiveMqReceiver extends BaseTest{
-
@Autowired
-
private SimpleJmsTemplate receiverJmsTemplate;
-
@Autowired
-
private ActiveMQQueue activeMQQueue;
-
@Test
-
public void recerver() throws JMSException {
-
int i=1;
-
while (true){
-
i++;
-
TextMessage message = (TextMessage)receiverJmsTemplate.receive(activeMQQueue);
-
if (null != message) {
-
System.out.println("收到訊息==================" + message.getText());
-
if(i==4){
-
throw new RuntimeException("Exception");
-
}
-
receiverJmsTemplate.msgAckAndcloseSession(message);
-
} else {
-
System.out.print("超時10秒");
-
break;
-
}
-
}
-
}
-
}
7:測試結果
-
收到訊息==================我是第1個
-
收到訊息==================我是第2個
-
收到訊息==================我是第3個
-
java.lang.RuntimeException
-
: Exception
-
at com.test.spring.ActiveMqReceiver.show(ActiveMqReceiver.java:43)
-
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
-
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
-
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
-
at java.lang.reflect.Method.invoke(Method.java:606)
-
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
-
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
-
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
-
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
-
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:73)
注:接收第三個時失敗,雖然這裡打印出來了,但是第三個訊息並沒有被確認接收,同步自定義接收訊息修改成功