ActiveMQF釋出訂閱模式及Spring整合
阿新 • • 發佈:2019-05-12
Topic主題釋出和訂閱訊息
前面講的案例都是點對點的訊息,即一個生產者傳送的一條訊息只能被一個消費者消費,然後就移除了。 而topic模式一條訊息可以被多個消費者訂閱,關係如下:
定義生產者
package com.sxt.demo; import java.net.URI; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.activemq.command.ActiveMQTextMessage; import com.sxt.bean.User; /** * 定義生產者 * @author Administrator * */ public class ActivemqdemoProducer { private static String userName="admin"; private static String password="admin"; private static String brokerURL="tcp://192.168.119.12:61616"; public static void main(String[] args) throws Exception { TopicConnectionFactory factory = new ActiveMQConnectionFactory(userName,password,brokerURL); TopicConnection connection = factory.createTopicConnection(); connection.start(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("topic-hello"); TopicPublisher publisher = session.createPublisher(topic); MapMessage message = session.createMapMessage(); message.setString("name", "lisi"); message.setString("age", "18"); publisher.send(message ); publisher.close(); session.close(); connection.close(); } }
定義消費者
package com.sxt.demo; import java.net.URI; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.activemq.command.ActiveMQTextMessage; import com.sxt.bean.User; /** * 定義消費者 * @author Administrator * */ public class ActivemqdemoConsumer { private static String userName="admin"; private static String password="admin"; private static String brokerURL="tcp://192.168.119.12:61616"; public static void main(String[] args) throws Exception { TopicConnectionFactory factory = new ActiveMQConnectionFactory(userName,password,brokerURL); TopicConnection connection = factory.createTopicConnection(); connection.start(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("topic-hello"); TopicPublisher publisher = session.createPublisher(topic); TopicSubscriber subscriber = session.createSubscriber(topic); subscriber.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(message instanceof MapMessage){ MapMessage mapMessage=(MapMessage)message; try { String name = mapMessage.getString("name"); String age = mapMessage.getString("age"); System.out.println(name+"+"+age); } catch (JMSException e) { // TODO Auto-generated catch block System.out.println("錯誤資訊"); } } } }); Thread.sleep(30000); publisher.close(); session.close(); connection.close(); } }
測試
先啟動消費者 啟動生產者
ActiveMQ整合Spring框架
整合Spring框架
匯入jar包
<dependencies> <!-- ActiveMQ客戶端完整jar包依賴 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency> <!-- ActiveMQ和Spring整合配置檔案標籤處理jar包依賴 --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>4.5</version> </dependency> <!-- Spring-JMS外掛相關jar包依賴 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.6.RELEASE</version> </dependency> <!-- Spring框架上下文jar包依賴 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.1.6.RELEASE</version> </dependency> </dependencies>
定義訊息載體物件
package com.sxt.bean;
import java.io.Serializable;
public class User implements Serializable{
/**
*
*/
private static final long serialVersionUID = 1L;
private String id;
private String name;
private String password;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return "User [id=" + id + ", name=" + name + ", password=" + password + "]";
}
}
定義生產者
package com.sxt.producer;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import com.sxt.bean.User;
public class OrderProducer {
private JmsTemplate template;
public JmsTemplate getTemplate() {
return template;
}
public void setTemplate(JmsTemplate template) {
this.template = template;
}
/**
* 傳送訊息的方法
*/
public void sendOrder(String destinationName,User user){
template.send(destinationName, new MessageCreator() {
/**
* 模板模式中暴露給呼叫者的方法
*/
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(user);
}
});
}
}
定義消費者
package com.sxt.consumer;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.activemq.command.ActiveMQObjectMessage;
public class OrderConsumer implements MessageListener{
@Override
public void onMessage(Message message) {
ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message;
try {
System.out.println(msg.getObject());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Spring配置檔案配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd">
<!-- ActiveMQ 連線工廠 -->
<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
<!-- 需提供訪問路徑tcp://ip:61616;以及使用者名稱,密碼 -->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://192.168.119.12:61616" userName="admin" password="admin" />
<!-- Spring Caching連線工廠 -->
<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- Session快取數量 -->
<property name="sessionCacheSize" value="100" />
</bean>
<!-- 訊息生產者 start -->
<!-- 定義JmsTemplate物件. 此型別由Spring框架JMS元件提供. 用於訪問ActiveMQ使用. -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(釋出/訂閱),即佇列模式, 預設資料可省略配置 -->
<!-- <property name="pubSubDomain" value="false" /> -->
</bean>
<!-- 定義生成者物件 -->
<bean id="orderProducer" class="com.sxt.producer.OrderProducer">
<!-- 為屬性賦值 -->
<property name="template" ref="jmsQueueTemplate"></property>
</bean>
<!--訊息生產者 end -->
<!-- 訊息消費者 start -->
<!-- 定義訊息監聽器, 此元件為spring-jms元件定義. 可以一次註冊若干訊息監聽器.
屬性解釋:
destination-type - 目的地型別, queue代表訊息佇列
可選值: queue | topic | durableTopic
queue - 預設值. 代表訊息佇列
topic - 代表訊息佇列集合
durableTopic - 持久化的訊息佇列集合. ActiveMQ會保證訊息的消費者一定接收到此訊息.
container-type - 容器型別
可選值: default | simple
default - 預設值. 預設容器型別, 對應DefaultMessageListenerContainer
simple - 簡單容器型別, 對應SimpleMessageListenerContainer
connection-factory - 連結工廠, 注入的是Spring-JMS元件提供的連結工廠物件.
acknowledge - 確認方式
可選值: auto | client | dups-ok | transacted
auto - 預設值, 即自動確認訊息
client - 客戶端確認訊息
dups-ok - 可使用副本的客戶端確認訊息
transacted - 有事務的持久化訊息確認機制. 需開啟對ActiveMQ的事務控制才可應用.
-->
<jms:listener-container destination-type="queue"
container-type="default" connection-factory="connectionFactory"
acknowledge="auto">
<!-- 註冊訊息監聽器. 如果需要註冊多個, 重複定義下述標籤. -->
<jms:listener destination="spring-MQ" ref="orderReciver" />
</jms:listener-container>
<!-- 容器管理訊息監聽器實現類物件 -->
<bean id="orderReciver" class="com.sxt.consumer.OrderConsumer"/>
<!-- 訊息消費者 end -->
</beans>
測試
package com.sxt.test;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.context.ApplicationContext;
import com.sxt.bean.User;
import com.sxt.producer.OrderProducer;
public class Test {
public static void main(String[] args) {
ApplicationContext ac = new ClassPathXmlApplicationContext("application.xml");
OrderProducer bean = ac.getBean(OrderProducer.class);
User user = new User();
user.setId("2");
user.setName("lisi");
user.setPassword("123456");
bean.sendOrder("spring-MQ", user);
System.out.println("訊息傳送完成");
}
}
<