ActiveMQ 點對點訊息傳送模型
阿新 • • 發佈:2019-02-08
點對點訊息傳送模型允許JMS客戶端通過佇列這個虛擬通道來同步和非同步傳送、接收訊息。
在點對點訊息傳送模型中,訊息生產者稱為傳送者,訊息消費者稱為接收者
消費者是基於拉取(pull)或基於輪詢(polling)來從佇列中請求訊息,佇列並不會自動地將訊息推送到客戶端
一個訊息有且只能被一個消費者接收,即使有多個消費者同時監聽了佇列
點對點模型支援負載均衡,允許多個消費者監聽同一個佇列,並以此來分配負載
Spring Framework 為JMS提供了內建支援,Spring提供了JMS模板和訊息監聽容器
JMS實現採用ActiveMQ
定義訊息生產者
public interface ProducerService {
/**
* P2P 點對點模式生產者傳送訊息.
*
* @param destination
* @param message
*/
public void sendMessage(Destination destination, String message);
}
@Component("producerService")
public class ProducerServiceImpl implements ProducerService {
//Spring JMS Template
@Resource(name="jmsTemplate")
private JmsTemplate jmsTemplate;
/**
* P2P 點對點模式生產者傳送訊息.
*
* @param destination
* @param message
*/
@Override
public void sendMessage(Destination destination, final String message) {
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}
設定一個定時任務充當訊息生產者,每隔兩秒傳送一條訊息,訊息內容為當前時間
@Component(value = "poll")
public class Poll {
// ActiveMQ生產者
@Resource(name = "producerService")
private ProducerService producerService;
// ActiveMQ 預先預約運單號佇列目的地
@Resource(name = "queueDestination")
private Destination destination;
@Scheduled(cron="0/2 * * * * ?")
public void getTradeIncrement() {
producerService.sendMessage(destination, "當前時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
}
}
定義一個消費者監聽佇列:
public class ConsumerMessageListener implements MessageListener{
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String num = textMessage.getText();
System.out.println(num);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
JMS配置檔案:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory" />
<property name="maxConnections" value="10" />
</bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory" />
</bean>
<!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--這個是佇列目的地 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>queue</value>
</constructor-arg>
</bean>
<!-- 訊息監聽器 -->
<bean id="consumerMessageListener" class="com.dragon.jms.listener.ConsumerMessageListener" />
<!-- 訊息監聽容器 -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="consumerMessageListener" />
</bean>
</beans>
如果多個消費者同時監聽佇列,那麼訊息將在均衡分佈於每個消費者進行消費,是訊息可靠性的一種方式,稱為佇列消費者叢集(Queue consumer clusters)
public class ConsumerAMessageListener implements MessageListener{
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String num = textMessage.getText();
System.out.println("A " + num);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
public class ConsumerBMessageListener implements MessageListener{
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String num = textMessage.getText();
System.out.println("B " + num);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
public class ConsumerCMessageListener implements MessageListener{
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String num = textMessage.getText();
System.out.println("C " + num);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
此時需為每一個訊息監聽器新增一個訊息監聽容器:
<!-- 訊息監聽器 -->
<bean id="consumerAMessageListener" class="com.dragon.jms.listener.ConsumerAMessageListener" />
<bean id="consumerBMessageListener" class="com.dragon.jms.listener.ConsumerBMessageListener" />
<bean id="consumerCMessageListener" class="com.dragon.jms.listener.ConsumerCMessageListener" />
<!-- 訊息監聽容器 -->
<bean id="jmsAContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="consumerAMessageListener" />
</bean>
<bean id="jmsBContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="consumerBMessageListener" />
</bean>
<bean id="jmsCContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="consumerCMessageListener" />
</bean>
結果:
C 當前時間:2016-03-04 13:29:08
A 當前時間:2016-03-04 13:29:10
B 當前時間:2016-03-04 13:29:12
C 當前時間:2016-03-04 13:29:14
A 當前時間:2016-03-04 13:29:16
B 當前時間:2016-03-04 13:29:18
C 當前時間:2016-03-04 13:29:20
A 當前時間:2016-03-04 13:29:22
B 當前時間:2016-03-04 13:29:24
C 當前時間:2016-03-04 13:29:26
A 當前時間:2016-03-04 13:29:28
B 當前時間:2016-03-04 13:29:30
C 當前時間:2016-03-04 13:29:32
A 當前時間:2016-03-04 13:29:34
B 當前時間:2016-03-04 13:29:36
C 當前時間:2016-03-04 13:29:38
A 當前時間:2016-03-04 13:29:40
B 當前時間:2016-03-04 13:29:42
訊息均衡的由三個消費者消費,提高系統的負載和可靠性,當某個消費者無法正常工作時,也不影響佇列中訊息的消費