1. 程式人生 > >Spring整合activemq,實現單消費者消費

Spring整合activemq,實現單消費者消費

1.ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排,實現了JMS(Java MessageService,實際上是指JMS API)可用來實現基於訊息的RPC(過程遠端呼叫)

對於Spring和Java程式設計師來說不一定要實現JMS,還有另外一種可選方案—AMQP(advanced message queuing protocol),即高階佇列訊息協議

下面將介紹的是ActiveMQ使用方法

2.下載地址:http://http://activemq.apache.org/點選開啟連結

.解壓目錄:


4.新增Maven依賴

<span style="font-size:18px;">        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.13.3</version>
        </dependency></span>
5.Spring配置

C名稱空間:

  xmlns:c="http://www.springframework.org/schema/c"

P名稱空間:

<span style="font-size:18px;">xmlns:p="http://www.springframework.org/schema/p"</span>
Spring檔案:
    <!--宣告連線工廠-->
    <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"
          p:brokerURL="tcp://${activeMq.destination.adderss}:${activeMq.destination.port}"
          p:trustAllPackages="true"
            />
    <!--<amd:connectionFactory id="connectionFactory" brokerURL="tcp://localhost:61616"/>-->
    <!--宣告訊息目的地 佇列形式 topic主題-->
    <amd:queue id="queue" physicalName="access.control.msg"/>

    <!--資訊轉換器-->
    <bean id="messageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>

    <!--JMS模板-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
          c:_0-ref="connectionFactory"
          p:defaultDestination-ref="queue"
          p:messageConverter-ref="messageConverter"
            />
    <!--傳送資訊-->
    <bean id="sendActiveMqImpl" class="cn.com.reformer.communication.jmshandler.SendActiveMqImpl"/>
    <!--處理資訊-->
    <bean id="activeMqHandler" class="cn.com.reformer.communication.jmshandler.ActiveMqHandler"/>

    <!--監聽器容器,實現POJO訊息驅動,非同步處理資料-->
    <bean id="listenerContainer"
          class="org.springframework.jms.listener.SimpleMessageListenerContainer">
        <!-- 訊息監聽器輸出訊息的數量 -->
        <property name="concurrentConsumers" value="1"/>
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destinationName" value="access.control.msg"/>
        <property name="messageListener" ref="activeMqHandler"/>
        <property name="pubSubNoLocal" value="false"></property>
    </bean>


ps:訊息監聽器容器還可以以jmsTemplate來實現,如下:

以這種方式配置的時候,消費者的數量會加倍,例concurrency="1"時,消費者數量為2 ,以上面的方式配置監聽器容器時,將concurrentConsumers值設定為1真正實現單執行緒處理

<span style="font-size:18px;">    <!--監聽器容器-->  <!--acknowledge屬性為"transacted",以開啟事務   concurrency:多執行緒-->
    <!--    <jms:listener-container connection-factory="connectionFactory" concurrency="1">
            <jms:listener destination="</span><span style="font-family: Arial, Helvetica, sans-serif; font-size: 18px;">access.control.msg</span><span style="font-family: Arial, Helvetica, sans-serif;">"</span><span style="font-size:18px;">
                          ref="</span><span style="font-family: Arial, Helvetica, sans-serif; font-size: 18px;">messageHandler</span><span style="font-size:18px;">" method="handleMsg"/>
        </jms:listener-container>--></span>
6.訊息傳送
public interface SendActiveMq {
    void sendMessage(EntranceGuardUploadRecord message);
}
public class SendActiveMqImpl implements SendActiveMq {
    private final static Logger LOGGER = Logger.getLogger(SendActiveMqImpl.class);
    @Autowired
    private JmsTemplate jmsTemplate;
    @Autowired
    private IEntranceGuardUploadRecordHandler uploadRecordHandler;
    @Override
    public void sendMessage(EntranceGuardUploadRecord message) {
        jmsTemplate.convertAndSend(message);
        LOGGER.error("######### jmsTemplate send queue message");
    }
}
EntranceGuardUploadRecord為自定義POJO,必須實現Serializable介面

7.從ActiveMq服務接收訊息,必須實現MessageListener介面,當有訊息時自動呼叫onMessage方法

public class ActiveMqHandler implements MessageListener {
    private static final Logger LOG = LoggerFactory.getLogger(ActiveMqHandler.class);
    @Autowired
    private JmsOperations jmsTemplate;
    @Autowired
    private IEntranceGuardUploadRecordHandler uploadRecordHandler;

    @Override
    public void onMessage(Message message) {
        LOG.debug("######## consumer start handle msg");
        EntranceGuardUploadRecord record = null;
        if (message instanceof ObjectMessage) {
            ObjectMessage objectMessage = (ObjectMessage) message;
            try {
                record = (EntranceGuardUploadRecord) objectMessage.getObject();
            } catch (Exception e) {
                e.printStackTrace();
                LOG.error("acivemq convert message failure");
                return;
            }
        }
      
    }
以jms:listener-container標籤配置監聽器容器時可用jmsTemplate來接收訊息
Card card =(Card)jmsTemplate.receiveAndConvert();