Spring整合activemq,實現單消費者消費
阿新 • • 發佈:2019-02-10
1.ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排,實現了JMS(Java MessageService,實際上是指JMS API)可用來實現基於訊息的RPC(過程遠端呼叫)
對於Spring和Java程式設計師來說不一定要實現JMS,還有另外一種可選方案—AMQP(advanced message queuing protocol),即高階佇列訊息協議
下面將介紹的是ActiveMQ使用方法
2.下載地址:http://http://activemq.apache.org/點選開啟連結
.解壓目錄:
4.新增Maven依賴
5.Spring配置<span style="font-size:18px;"> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.13.3</version> </dependency></span>
C名稱空間:
xmlns:c="http://www.springframework.org/schema/c"
P名稱空間:
<span style="font-size:18px;">xmlns:p="http://www.springframework.org/schema/p"</span>
Spring檔案:
<!--宣告連線工廠--> <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" p:brokerURL="tcp://${activeMq.destination.adderss}:${activeMq.destination.port}" p:trustAllPackages="true" /> <!--<amd:connectionFactory id="connectionFactory" brokerURL="tcp://localhost:61616"/>--> <!--宣告訊息目的地 佇列形式 topic主題--> <amd:queue id="queue" physicalName="access.control.msg"/> <!--資訊轉換器--> <bean id="messageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/> <!--JMS模板--> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" c:_0-ref="connectionFactory" p:defaultDestination-ref="queue" p:messageConverter-ref="messageConverter" /> <!--傳送資訊--> <bean id="sendActiveMqImpl" class="cn.com.reformer.communication.jmshandler.SendActiveMqImpl"/> <!--處理資訊--> <bean id="activeMqHandler" class="cn.com.reformer.communication.jmshandler.ActiveMqHandler"/> <!--監聽器容器,實現POJO訊息驅動,非同步處理資料--> <bean id="listenerContainer" class="org.springframework.jms.listener.SimpleMessageListenerContainer"> <!-- 訊息監聽器輸出訊息的數量 --> <property name="concurrentConsumers" value="1"/> <property name="connectionFactory" ref="connectionFactory"/> <property name="destinationName" value="access.control.msg"/> <property name="messageListener" ref="activeMqHandler"/> <property name="pubSubNoLocal" value="false"></property> </bean>
ps:訊息監聽器容器還可以以jmsTemplate來實現,如下:
以這種方式配置的時候,消費者的數量會加倍,例concurrency="1"時,消費者數量為2 ,以上面的方式配置監聽器容器時,將concurrentConsumers值設定為1真正實現單執行緒處理
6.訊息傳送<span style="font-size:18px;"> <!--監聽器容器--> <!--acknowledge屬性為"transacted",以開啟事務 concurrency:多執行緒--> <!-- <jms:listener-container connection-factory="connectionFactory" concurrency="1"> <jms:listener destination="</span><span style="font-family: Arial, Helvetica, sans-serif; font-size: 18px;">access.control.msg</span><span style="font-family: Arial, Helvetica, sans-serif;">"</span><span style="font-size:18px;"> ref="</span><span style="font-family: Arial, Helvetica, sans-serif; font-size: 18px;">messageHandler</span><span style="font-size:18px;">" method="handleMsg"/> </jms:listener-container>--></span>
public interface SendActiveMq {
void sendMessage(EntranceGuardUploadRecord message);
}
public class SendActiveMqImpl implements SendActiveMq {
private final static Logger LOGGER = Logger.getLogger(SendActiveMqImpl.class);
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private IEntranceGuardUploadRecordHandler uploadRecordHandler;
@Override
public void sendMessage(EntranceGuardUploadRecord message) {
jmsTemplate.convertAndSend(message);
LOGGER.error("######### jmsTemplate send queue message");
}
}
EntranceGuardUploadRecord為自定義POJO,必須實現Serializable介面
7.從ActiveMq服務接收訊息,必須實現MessageListener介面,當有訊息時自動呼叫onMessage方法
public class ActiveMqHandler implements MessageListener {
private static final Logger LOG = LoggerFactory.getLogger(ActiveMqHandler.class);
@Autowired
private JmsOperations jmsTemplate;
@Autowired
private IEntranceGuardUploadRecordHandler uploadRecordHandler;
@Override
public void onMessage(Message message) {
LOG.debug("######## consumer start handle msg");
EntranceGuardUploadRecord record = null;
if (message instanceof ObjectMessage) {
ObjectMessage objectMessage = (ObjectMessage) message;
try {
record = (EntranceGuardUploadRecord) objectMessage.getObject();
} catch (Exception e) {
e.printStackTrace();
LOG.error("acivemq convert message failure");
return;
}
}
}
以jms:listener-container標籤配置監聽器容器時可用jmsTemplate來接收訊息
Card card =(Card)jmsTemplate.receiveAndConvert();