1. 程式人生 > >activeMQ訊息中介軟體的與spring的整合

activeMQ訊息中介軟體的與spring的整合

activemq是基於jetty服務容器的,可在原始碼中發現。

1.maven依賴jar

<!-- ActiveMQ -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId> <version>5.8.0</version> </dependency>

  注意:這裡依賴沒有直接引入activemq-all 的依賴,因為activemq-all 會依賴slf4j 日誌jar容易和 單獨引進的slf4的日誌jar衝突,如下(我是單獨引進slf4j日誌的)

<!-- 日誌 -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>
slf4j-api</artifactId> <version>1.7.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.12</version> </dependency>
2.生產者 (這裡指定的是訂閱與釋出模式)
<!-- 配置JMS
服務提供商 ActiveMQ --> <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- 連線ActiveMQ服務地址 --> <property name="brokerURL" value="tcp://mqserver:61616"/> <property name="userName" value="admin"/> <property name="password" value="admin"/> </bean> <!-- 配置ActiveMQ的連線池工廠 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="activeMQConnectionFactory"/> <property name="sessionCacheSize" value="100"/> </bean> <!-- 點對點佇列 --> <bean id="defaultQueueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="cn.zyy.tender.admin.queue.default"/> </bean> <!-- 一對多佇列 --> <bean id="defaultTopicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="cn.zyy.tender.admin.topic.default"/> </bean> <!-- spring管理JMS(ActiveMQ) --> <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="connectionFactory"/> </bean> <!-- spring管理jsmTemplate用於傳送訊息值ActiveMQ服務 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 指定ActiveMQ連線工廠 --> <property name="connectionFactory" ref="singleConnectionFactory"/> <!-- 指定訊息佇列名稱 --> <property name="defaultDestinationName" value="cn.zyy.tender.admin.topic.default"/> <!--<property name="defaultDestination" ref="defaultTopicDestination"/>--> <!-- 指定為訊息的訂閱與釋出模式 --> <property name="pubSubDomain" value="false"/> </bean>
這裡配置訊息佇列名稱有兩種方式,1:直接在jsmTemplate中指點預設目標名稱(即:設定defaultDestionationName屬性)

2通過defaultDestionation屬性引進(就是上面的一對多佇列)·

3.消費者配置

訂閱模式配置:

<!-- 連線工廠 -->
<bean id="activeMqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
     <property name="brokerURL" value="${AvtiveMQ.brokerURL}"/>
     <property name="useAsyncSend" value="true"/>
 </bean>
 <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
     <property name="targetConnectionFactory" ref="activeMqConnectionFactory"/>
     <property name="sessionCacheSize" value="100"/>
 </bean>
<!-- 點對點佇列 -->
<!-- <bean id="defaultQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
     <constructor-arg index="0" value="cn.zyy.tender.admin.queue.default"/>
 </bean>-->
 <!-- 一對多佇列 -->
<bean id="defaultTopicDestination" class="org.apache.activemq.command.ActiveMQTopic">
     <constructor-arg index="0" value="cn.zyy.tender.admin.topic.default"/>
 </bean>
<!-- 生產者 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
     <property name="connectionFactory" ref="connectionFactory"/>
     <property name="pubSubDomain" value="false"/>
 </bean>
 <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
     <property name="connectionFactory" ref="connectionFactory"/>
<!-- 指定為訊息的訂閱與釋出模式 -->
<property name="pubSubDomain" value="true"/>
 </bean>
<!-- 消費者 -->
<bean id="defaultMessageQueueListener" class="com.zyy.tender.admin.jms.DefaultMessageQueueListener"/>
 <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
     <property name="connectionFactory" ref="connectionFactory"/>
<!-- 指定佇列名稱 -->
<property name="destination" ref="defaultTopicDestination"/>
<!-- 注入自定監聽器,處理佇列中訊息 -->
<property name="messageListener" ref="defaultMessageQueueListener"/>
     <property name="sessionTransacted" value="true"/>
<!--<property name="concurrency" value="4-10"/>-->
     <!-- 指定為訊息的訂閱與釋出模式 -->
<property name="pubSubDomain" value="true"/>
 </bean>

點對點配置:

<!-- 連線工廠 -->
<bean id="activeMqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="${AvtiveMQ.brokerURL}"/>
    <property name="useAsyncSend" value="true"/>
</bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="activeMqConnectionFactory"/>
    <property name="sessionCacheSize" value="100"/>
</bean>
<!-- 點對點佇列 -->
<bean id="defaultQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg index="0" value="cn.zyy.tender.admin.queue.default"/>
</bean>
<!-- 一對多佇列 -->
<!--<bean id="defaultTopicDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg index="0" value="cn.zyy.tender.admin.topic.default"/>
</bean>-->
<!-- 生產者 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="pubSubDomain" value="false"/>
</bean>
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory"/>
<!-- 指定為訊息的訂閱與釋出模式 -->
<property name="pubSubDomain" value="true"/>
</bean>
<!-- 消費者 -->
<bean id="defaultMessageQueueListener" class="com.zyy.tender.admin.jms.DefaultMessageQueueListener"/>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
<!-- 指定佇列名稱 -->
<property name="destination" ref="defaultQueueDestination"/>
<!-- 注入自定監聽器,處理佇列中訊息 -->
<property name="messageListener" ref="defaultMessageQueueListener"/>
    <property name="sessionTransacted" value="true"/>
<!--<property name="concurrency" value="4-10"/>-->
    <!-- 指定為訊息的訂閱與釋出模式  這裡的訂閱模式可由生產者(優先順序高)指定-->
<property name="pubSubDomain" value="false"/>
</bean>

4. 生產者傳送訊息 

jmsTemplate是交給spring容器,然後注入到spring的bean中,這裡是注入到了serviceImpl中

@Resource
private JmsTemplate jmsTemplate;

@Override
public int isShowByPrimaryKeys(String ids) {
    Integer count=0;
String[] split = ids.split("-");
    for (final String idStr: split) {
        if (StringUtils.isBlank(idStr)) {
            continue;
}
        jmsTemplate.send(new MessageCreator() {

            @Override
public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(idStr);
                return textMessage;
}
        });
zyyTenderNewsMapper.isShowByPrimaryKey(Integer.parseInt(idStr));
count++;
}
    return count;
}

5.消費者的訊息監聽器

將這個訊息監聽器交個spring來管理,然後注入到activeMQ的  jmsContainer中(見上面消費者的配置),可以訊息監聽器中注入service介面來處理相應的業務需求。

/**
 * MQ消費者
*/
public class DefaultMessageQueueListener implements MessageListener {

   private static Logger _log = LoggerFactory.getLogger(DefaultMessageQueueListener.class);
@Autowired
ThreadPoolTaskExecutor threadPoolTaskExecutor;
   public void onMessage(final Message message) {
      // 使用執行緒池多執行緒處理
threadPoolTaskExecutor.execute(new Runnable() {
         public void run() {
            if (message instanceof TextMessage) {
               TextMessage textMessage = (TextMessage) message;
               try {
                  _log.info("消費訊息:{}", textMessage.getText());
System.out.println("消費訊息:"+textMessage.getText());
} catch (Exception e){
                  e.printStackTrace();
}
            }
         }
      });
}

}

6.訊息模式

點對點:一個訊息只能被一個消費者消費一次。

訂閱模式:一個訊息可以被多個消費者消費一次。