activeMQ訊息中介軟體的與spring的整合
阿新 • • 發佈:2019-01-26
activemq是基於jetty服務容器的,可在原始碼中發現。
1.maven依賴jar
<!-- ActiveMQ --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId> <version>5.8.0</version> </dependency>
注意:這裡依賴沒有直接引入activemq-all 的依賴,因為activemq-all 會依賴slf4j 日誌jar容易和 單獨引進的slf4的日誌jar衝突,如下(我是單獨引進slf4j日誌的)
<!-- 日誌 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>2.生產者 (這裡指定的是訂閱與釋出模式)slf4j-api</artifactId> <version>1.7.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.12</version> </dependency>
<!-- 配置JMS這裡配置訊息佇列名稱有兩種方式,1:直接在jsmTemplate中指點預設目標名稱(即:設定defaultDestionationName屬性)服務提供商 ActiveMQ --> <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- 連線ActiveMQ服務地址 --> <property name="brokerURL" value="tcp://mqserver:61616"/> <property name="userName" value="admin"/> <property name="password" value="admin"/> </bean> <!-- 配置ActiveMQ的連線池工廠 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="activeMQConnectionFactory"/> <property name="sessionCacheSize" value="100"/> </bean> <!-- 點對點佇列 --> <bean id="defaultQueueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="cn.zyy.tender.admin.queue.default"/> </bean> <!-- 一對多佇列 --> <bean id="defaultTopicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="cn.zyy.tender.admin.topic.default"/> </bean> <!-- spring管理JMS(ActiveMQ) --> <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="connectionFactory"/> </bean> <!-- spring管理jsmTemplate用於傳送訊息值ActiveMQ服務 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 指定ActiveMQ連線工廠 --> <property name="connectionFactory" ref="singleConnectionFactory"/> <!-- 指定訊息佇列名稱 --> <property name="defaultDestinationName" value="cn.zyy.tender.admin.topic.default"/> <!--<property name="defaultDestination" ref="defaultTopicDestination"/>--> <!-- 指定為訊息的訂閱與釋出模式 --> <property name="pubSubDomain" value="false"/> </bean>
2通過defaultDestionation屬性引進(就是上面的一對多佇列)·
3.消費者配置
訂閱模式配置:
<!-- 連線工廠 --> <bean id="activeMqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${AvtiveMQ.brokerURL}"/> <property name="useAsyncSend" value="true"/> </bean> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="activeMqConnectionFactory"/> <property name="sessionCacheSize" value="100"/> </bean> <!-- 點對點佇列 --> <!-- <bean id="defaultQueueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="cn.zyy.tender.admin.queue.default"/> </bean>--> <!-- 一對多佇列 --> <bean id="defaultTopicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="cn.zyy.tender.admin.topic.default"/> </bean> <!-- 生產者 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <property name="pubSubDomain" value="false"/> </bean> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <!-- 指定為訊息的訂閱與釋出模式 --> <property name="pubSubDomain" value="true"/> </bean> <!-- 消費者 --> <bean id="defaultMessageQueueListener" class="com.zyy.tender.admin.jms.DefaultMessageQueueListener"/> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <!-- 指定佇列名稱 --> <property name="destination" ref="defaultTopicDestination"/> <!-- 注入自定監聽器,處理佇列中訊息 --> <property name="messageListener" ref="defaultMessageQueueListener"/> <property name="sessionTransacted" value="true"/> <!--<property name="concurrency" value="4-10"/>--> <!-- 指定為訊息的訂閱與釋出模式 --> <property name="pubSubDomain" value="true"/> </bean>
點對點配置:
<!-- 連線工廠 --> <bean id="activeMqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${AvtiveMQ.brokerURL}"/> <property name="useAsyncSend" value="true"/> </bean> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="activeMqConnectionFactory"/> <property name="sessionCacheSize" value="100"/> </bean> <!-- 點對點佇列 --> <bean id="defaultQueueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="cn.zyy.tender.admin.queue.default"/> </bean> <!-- 一對多佇列 --> <!--<bean id="defaultTopicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="cn.zyy.tender.admin.topic.default"/> </bean>--> <!-- 生產者 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <property name="pubSubDomain" value="false"/> </bean> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <!-- 指定為訊息的訂閱與釋出模式 --> <property name="pubSubDomain" value="true"/> </bean> <!-- 消費者 --> <bean id="defaultMessageQueueListener" class="com.zyy.tender.admin.jms.DefaultMessageQueueListener"/> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <!-- 指定佇列名稱 --> <property name="destination" ref="defaultQueueDestination"/> <!-- 注入自定監聽器,處理佇列中訊息 --> <property name="messageListener" ref="defaultMessageQueueListener"/> <property name="sessionTransacted" value="true"/> <!--<property name="concurrency" value="4-10"/>--> <!-- 指定為訊息的訂閱與釋出模式 這裡的訂閱模式可由生產者(優先順序高)指定--> <property name="pubSubDomain" value="false"/> </bean>
4. 生產者傳送訊息
jmsTemplate是交給spring容器,然後注入到spring的bean中,這裡是注入到了serviceImpl中
@Resource private JmsTemplate jmsTemplate;
@Override public int isShowByPrimaryKeys(String ids) { Integer count=0; String[] split = ids.split("-"); for (final String idStr: split) { if (StringUtils.isBlank(idStr)) { continue; } jmsTemplate.send(new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(idStr); return textMessage; } }); zyyTenderNewsMapper.isShowByPrimaryKey(Integer.parseInt(idStr)); count++; } return count; }
5.消費者的訊息監聽器
將這個訊息監聽器交個spring來管理,然後注入到activeMQ的 jmsContainer中(見上面消費者的配置),可以訊息監聽器中注入service介面來處理相應的業務需求。
/** * MQ消費者 */ public class DefaultMessageQueueListener implements MessageListener { private static Logger _log = LoggerFactory.getLogger(DefaultMessageQueueListener.class); @Autowired ThreadPoolTaskExecutor threadPoolTaskExecutor; public void onMessage(final Message message) { // 使用執行緒池多執行緒處理 threadPoolTaskExecutor.execute(new Runnable() { public void run() { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { _log.info("消費訊息:{}", textMessage.getText()); System.out.println("消費訊息:"+textMessage.getText()); } catch (Exception e){ e.printStackTrace(); } } } }); } }
6.訊息模式
點對點:一個訊息只能被一個消費者消費一次。
訂閱模式:一個訊息可以被多個消費者消費一次。