1. 程式人生 > >spring activeMQ使用連線池工廠報錯

spring activeMQ使用連線池工廠報錯

1:環境和版本

java:jdk7

spring:4.1.3

activemq:5.8.0

2:spring與activeMQ的結合配置

  1. <?xml version="1.0" encoding="UTF-8"?>

  2. <beans xmlns="http://www.springframework.org/schema/beans"

  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

  5. ">

  6. <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->

  7. <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">

  8. <property name="brokerURL" value="tcp://localhost:61636"></property>

  9. </bean>

  10. <!-- ActiveMQ為我們提供了一個PooledConnectionFactory,往裡面注入一個ActiveMQConnectionFactory可以用來將Connection,

  11. Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗。

  12. 問題:使用poolConnectionFactory時候,用JMSTemplate同步迴圈接收訊息,因為JMSTemplate會自動在接收訊息後關閉連線,

  13. 所以迴圈到第二次的時候會報錯,這個問題待解決

  14. 問題:使用poolConnectionFactory時候,用監聽來接收訊息,會有部分訊息殘留在佇列裡面,問題待解決

  15. 結論:還是先別用連線池了-->

  16. <bean id="poolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" >

  17. <property name="connectionFactory" ref="activeMQConnectionFactory" />

  18. <property name="maxConnections" value="10"/>

  19. </bean>

  20. <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory 這裡我使用的是singleConnectionFactory-->

  21. <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">

  22. <property name="targetConnectionFactory" ref="activeMQConnectionFactory"/>

  23. </bean>

  24. <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">

  25. <property name="targetConnectionFactory" ref="activeMQConnectionFactory"/>

  26. </bean>

  27. <!-- 配置生產者 -->

  28. <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->

  29. <bean id="senderJmsTemplate" class="org.springframework.jms.core.JmsTemplate">

  30. <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->

  31. <property name="connectionFactory" ref="singleConnectionFactory"/>

  32. <!-- NON_PERSISTENT非持久化 1 ,PERSISTENT持久化 2 -->

  33. <property name="deliveryMode" value="2"/>

  34. <property name="sessionTransacted" value="true"/>

  35. <property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE"/>

  36. </bean>

  37. <!--這個是佇列目的地,點對點的 -->

  38. <bean id="activeMQQueue" class="org.apache.activemq.command.ActiveMQQueue">

  39. <constructor-arg value="FirstQueue"/>

  40. </bean>

  41. <!--這個是主題目的地,一對多的 -->

  42. <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">

  43. <constructor-arg value="topic"/>

  44. </bean>

  45. <!-- 自定義消費者 -->

  46. <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->

  47. <bean id="receiverJmsTemplate" class="com.system.freemwork.amq.SimpleJmsTemplate">

  48. <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->

  49. <property name="connectionFactory" ref="singleConnectionFactory"/>

  50. <!-- 如果是原生的amq建立的session,將session設定為true時候,ack會固定被設定為AUTO_ACKNOWLEDGE

  51. 所以想要手動確認,那麼session的事物必須設定為false,並且ack設定為CLIENT_ACKNOWLEDGE -->

  52. <property name="sessionTransacted" value="false"/>

  53. <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"/>

  54. <property name="receiveTimeout" value="1000"/>

  55. <property name="autoAcknowledge" value="true"/>

  56. </bean>

  57. </beans>

3:編寫send測試類

  1. package com.test.spring;

  2. import org.junit.Test;

  3. import org.springframework.beans.factory.annotation.Autowired;

  4. import org.springframework.jms.core.JmsTemplate;

  5. /**

  6. * 類描述:sender測試類

  7. *

  8. * @author fengyong

  9. * @version 1.0

  10. * @since 1.0

  11. * Created by fengyong on 16/8/3 下午7:45.

  12. */

  13. public class ActiveMqSender extends BaseTest {

  14. @Autowired

  15. private JmsTemplate senderJmsTemplate;

  16. @Test

  17. public void activeMq(){

  18. for(int i = 1;i<=10;i++){

  19. senderJmsTemplate.convertAndSend("FirstQueue","我是第"+i+"個");

  20. }

  21. System.out.print("全部執行完畢!!!");

  22. }

  23. }

4:編寫receiver測試類

  1. package com.test.spring;

  2. import com.system.freemwork.amq.SimpleJmsTemplate;

  3. import org.apache.activemq.command.ActiveMQQueue;

  4. import org.junit.Test;

  5. import org.springframework.beans.factory.annotation.Autowired;

  6. import org.springframework.jms.core.JmsTemplate;

  7. import javax.jms.JMSException;

  8. import javax.jms.TextMessage;

  9. /**

  10. * 類描述:receiver測試類

  11. *

  12. * @author fengyong

  13. * @version 1.0

  14. * @since 1.0

  15. * Created by fengyong on 16/8/8 下午5:28.

  16. */

  17. public class ActiveMqReceiver extends BaseTest{

  18. @Autowired

  19. private SimpleJmsTemplate receiverJmsTemplate;

  20. @Autowired

  21. private ActiveMQQueue activeMQQueue;

  22. /**

  23. * 坑爹的方法,如果session事物設定為true,receiver直接將sessioin進行commit,

  24. *

  25. * 如果設定為false,receiver方法會直接判斷進行訊息確認,無法做到手動的訊息確認,所以一旦發生異常,這條訊息不會回到訊息佇列中

  26. *

  27. * session的提交可以認為是訊息確認收到

  28. * @throws JMSException

  29. */

  30. @Test

  31. public void receiver() throws JMSException {

  32. int i=1;

  33. while (true){

  34. i++;

  35. TextMessage message = (TextMessage)receiverJmsTemplate.receive(activeMQQueue);

  36. if (null != message) {

  37. System.out.println("收到訊息==================" + message.getText());

  38. } else {

  39. System.out.print("超時10秒");

  40. break;

  41. }

  42. }

  43. }

  44. }

注:如果session事物設定為true,receiver直接將sessioin進行commit.原始碼如下

if (session.getTransacted()) {
   // Commit necessary - but avoid commit call within a JTA transaction.
   if (isSessionLocallyTransacted(session)) {
      // Transacted session created by this template -> commit.
      JmsUtils.commitIfNecessary(session);
   }
}

如果session事物設定為false,receiver方法會直接判斷進行訊息確認,無法做到手動的訊息確認,所以一旦發生異常,這條訊息不會回到訊息佇列中.原始碼如下

else if (isClientAcknowledge(session)) {
   // Manually acknowledge message, if any.
   if (message != null) {
      message.acknowledge();
   }
}

所以需要修改原始碼不讓其在receiver的時候自動確認收到訊息

5:新建SimpleJmsTemplate繼承JmsTemplate

  1. package com.system.freemwork.amq;

  2. import org.springframework.jms.JmsException;

  3. import org.springframework.jms.connection.ConnectionFactoryUtils;

  4. import org.springframework.jms.connection.JmsResourceHolder;

  5. import org.springframework.jms.core.JmsTemplate;

  6. import org.springframework.jms.core.SessionCallback;

  7. import org.springframework.jms.support.JmsUtils;

  8. import org.springframework.transaction.support.TransactionSynchronizationManager;

  9. import org.springframework.util.Assert;

  10. import sun.misc.resources.Messages_ja;

  11. import javax.jms.Connection;

  12. import javax.jms.JMSException;

  13. import javax.jms.Message;

  14. import javax.jms.MessageConsumer;

  15. import javax.jms.Session;

  16. /**

  17. * 類描述:自定義JmsTemplate,實現客戶手動確認

  18. *

  19. * @author fengyong

  20. * @version 1.0

  21. * @since 1.0

  22. * Created by fengyong on 16/8/10 上午10:03.

  23. */

  24. public class SimpleJmsTemplate extends JmsTemplate {

  25. private final JmsTemplateResourceFactory transactionalResourceFactory = new JmsTemplateResourceFactory();

  26. /**

  27. * 是否開啟手動確認標記

  28. */

  29. private Boolean autoAcknowledge;

  30. MessageConsumer consumer = null;

  31. Session sessionToClose = null;

  32. Connection conToClose = null;

  33. boolean startConnection = false;

  34. /**

  35. * 接收訊息

  36. * @param session

  37. * @param consumer

  38. * @return

  39. * @throws JMSException

  40. */

  41. protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException {

  42. try {

  43. this.consumer = consumer;

  44. // Use transaction timeout (if available).

  45. long timeout = getReceiveTimeout();

  46. JmsResourceHolder resourceHolder =

  47. (JmsResourceHolder) TransactionSynchronizationManager.getResource(getConnectionFactory());

  48. if (resourceHolder != null && resourceHolder.hasTimeout()) {

  49. timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis());

  50. }

  51. Message message = doReceive(consumer, timeout);

  52. if (session.getTransacted()) {

  53. // Commit necessary - but avoid commit call within a JTA transaction.

  54. // 如果開啟了jta事物,那麼不會進行提交,jta事物會直接覆蓋掉session事物

  55. if (isSessionLocallyTransacted(session)) {

  56. // Transacted session created by this template -> commit.

  57. JmsUtils.commitIfNecessary(session);

  58. }

  59. }

  60. //autoAcknowledge如果為真,不進行自動確認

  61. else if (isClientAcknowledge(session) && !autoAcknowledge) {

  62. // Manually acknowledge message, if any.

  63. if (message != null) {

  64. message.acknowledge();

  65. }

  66. }

  67. return message;

  68. }

  69. finally {

  70. consumer = null;

  71. }

  72. }

  73. /**

  74. * 自定義的訊息確認,關閉consumer和sesseionToClose是父類本身就要執行的,這裡直接拷貝下來,能不改的地方儘量不改

  75. * 該子類只是為了自定義確認訊息

  76. * @param message

  77. * @throws JMSException

  78. */

  79. public void msgAckAndcloseSession(Message message) throws JMSException {

  80. message.acknowledge();

  81. JmsUtils.closeMessageConsumer(consumer);

  82. JmsUtils.closeSession(sessionToClose);

  83. ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection);

  84. }

  85. /**

  86. * 由於上面的doReceive(Session session, MessageConsumer consumer)需要呼叫這個方法,

  87. * 而在父類裡面這個方法是私有的,所以直接拷貝下來了

  88. * @param consumer

  89. * @param timeout

  90. * @return

  91. * @throws JMSException

  92. */

  93. private Message doReceive(MessageConsumer consumer, long timeout) throws JMSException {

  94. if (timeout == RECEIVE_TIMEOUT_NO_WAIT) {

  95. return consumer.receiveNoWait();

  96. }

  97. else if (timeout > 0) {

  98. return consumer.receive(timeout);

  99. }

  100. else {

  101. return consumer.receive();

  102. }

  103. }

  104. /**

  105. * 該方法是為了防止確認訊息前session被關閉,不然確認訊息前session關閉會導致異常發生

  106. * transactionalResourceFactory在父類中是私有且不可修改,因為只有這一個方法用到了transactionalResourceFactory

  107. * 所以直接將JmsTemplateResourceFactory拷貝下來使用

  108. * @param action

  109. * @param startConnection

  110. * @param <T>

  111. * @return

  112. * @throws JmsException

  113. */

  114. public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException {

  115. Assert.notNull(action, "Callback object must not be null");

  116. this.startConnection = startConnection;

  117. try {

  118. Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(

  119. getConnectionFactory(), transactionalResourceFactory, startConnection);

  120. if (sessionToUse == null) {

  121. conToClose = createConnection();

  122. sessionToClose = createSession(conToClose);

  123. if (startConnection) {

  124. conToClose.start();

  125. }

  126. sessionToUse = sessionToClose;

  127. }

  128. if (logger.isDebugEnabled()) {

  129. logger.debug("Executing callback on JMS Session: " + sessionToUse);

  130. }

  131. return action.doInJms(sessionToUse);

  132. }

  133. catch (JMSException ex) {

  134. throw convertJmsAccessException(ex);

  135. }

  136. finally {

  137. sessionToClose = null;

  138. conToClose = null;

  139. startConnection = false;

  140. }

  141. }

  142. /**

  143. * Sets new 是否開啟手動確認標記.

  144. *

  145. * @param autoAcknowledge New value of 是否開啟手動確認標記.

  146. */

  147. public void setAutoAcknowledge(Boolean autoAcknowledge) {

  148. this.autoAcknowledge = autoAcknowledge;

  149. }

  150. /**

  151. * Gets 是否開啟手動確認標記.

  152. *

  153. * @return Value of 是否開啟手動確認標記.

  154. */

  155. public Boolean getAutoAcknowledge() {

  156. return autoAcknowledge;

  157. }

  158. /**

  159. * 直接拷貝下來的

  160. */

  161. private class JmsTemplateResourceFactory implements ConnectionFactoryUtils.ResourceFactory {

  162. @Override

  163. public Connection getConnection(JmsResourceHolder holder) {

  164. return SimpleJmsTemplate.this.getConnection(holder);

  165. }

  166. @Override

  167. public Session getSession(JmsResourceHolder holder) {

  168. return SimpleJmsTemplate.this.getSession(holder);

  169. }

  170. @Override

  171. public Connection createConnection() throws JMSException {

  172. return SimpleJmsTemplate.this.createConnection();

  173. }

  174. @Override

  175. public Session createSession(Connection con) throws JMSException {

  176. return SimpleJmsTemplate.this.createSession(con);

  177. }

  178. @Override

  179. public boolean isSynchedLocalTransactionAllowed() {

  180. return SimpleJmsTemplate.this.isSessionTransacted();

  181. }

  182. }

  183. }

6:第二個receiver測試類

  1. package com.test.spring;

  2. import com.system.freemwork.amq.SimpleJmsTemplate;

  3. import org.apache.activemq.command.ActiveMQQueue;

  4. import org.junit.Test;

  5. import org.springframework.beans.factory.annotation.Autowired;

  6. import org.springframework.jms.core.JmsTemplate;

  7. import javax.jms.JMSException;

  8. import javax.jms.TextMessage;

  9. /**

  10. * 類描述:receiver測試類

  11. *

  12. * @author fengyong

  13. * @version 1.0

  14. * @since 1.0

  15. * Created by fengyong on 16/8/8 下午5:28.

  16. */

  17. public class ActiveMqReceiver extends BaseTest{

  18. @Autowired

  19. private SimpleJmsTemplate receiverJmsTemplate;

  20. @Autowired

  21. private ActiveMQQueue activeMQQueue;

  22. @Test

  23. public void recerver() throws JMSException {

  24. int i=1;

  25. while (true){

  26. i++;

  27. TextMessage message = (TextMessage)receiverJmsTemplate.receive(activeMQQueue);

  28. if (null != message) {

  29. System.out.println("收到訊息==================" + message.getText());

  30. if(i==4){

  31. throw new RuntimeException("Exception");

  32. }

  33. receiverJmsTemplate.msgAckAndcloseSession(message);

  34. } else {

  35. System.out.print("超時10秒");

  36. break;

  37. }

  38. }

  39. }

  40. }

7:測試結果

  1. 收到訊息==================我是第1個

  2. 收到訊息==================我是第2個

  3. 收到訊息==================我是第3個

  4. java.lang.RuntimeException

  5. : Exception

  6. at com.test.spring.ActiveMqReceiver.show(ActiveMqReceiver.java:43)

  7. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

  8. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

  9. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

  10. at java.lang.reflect.Method.invoke(Method.java:606)

  11. at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)

  12. at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)

  13. at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)

  14. at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)

  15. at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:73)

注:接收第三個時失敗,雖然這裡打印出來了,但是第三個訊息並沒有被確認接收,同步自定義接收訊息修改成功