ActiveMQ訂閱模式持久化實現
我的訴求是,建一個訂閱通道,然後多個客戶端監聽,當某個客戶端掉線後,再上線的時候可以收到它沒有接收到的訊息。
本文主要參考了《使用Spring配置ActiveMQ的釋出訂閱模式》(http://nettm.iteye.com/blog/1828268),將他們複製貼上過來,基本上就ok了。在找到這篇文章前,《如何實現ActiveMq的Topic的持久訂閱》(http://www.mytju.com/classcode/news_readNews.asp?newsID=486)給了我提示。要設定clientID啊~
我根據參考的文章編寫了工程,測試後發現是ok的。並且發現了一個細節。
比如,有一個clientID=ID_1,有個通道是topic_channel。往topic_channel中傳送一批訊息(A批)後,首次啟動ID_1的監聽,ID_1沒有收到。關掉ID_1的監聽,再往topic_channel中傳送一批訊息(B批)後,再次啟動ID_1的監聽,收到了B批訊息。這個現象說明了什麼呢?說明你不僅要有自己唯一的ID(clientID)還要事先跟這個通道報告一聲,說你要監聽它,訂閱的訊息來了之後給它留一份,等著它來拿。在你報告之前產生的訊息,你是拿不到的,因為沒有你的份。
為了體現我真的動了腦筋,思考了。所以我發表的工程,是做了一些修改的,比如,通道名啊,ID名啊,檔名啊,還調整了一下檔案結構,哈哈哈~
當然我還做了一個質的突破,我在一個工程裡面做了兩個接收者,監聽同一個訂閱通道。
實現步驟:
1、配置傳送xml,applicationContext-send.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"> <context:property-placeholder location="classpath:/properties/jms.properties" /> <!-- 配置JMS連線工廠 --> <bean id="myConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- Session快取數量 --> <property name="sessionCacheSize" value="10" /> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- MQ地址 --> <property name="brokerURL" value="${brokerUrl}" /> <!-- 是否非同步傳送 --> <property name="useAsyncSend" value="true" /> </bean> </property> </bean> <!-- 傳送訊息的目的地(一個主題) --> <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 設定訊息主題的名字 --> <constructor-arg index="0" value="${send.name}" /> </bean> <!-- 配置JMS模版 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="myConnectionFactory" /> <property name="defaultDestination" ref="myDestination" /> <!-- 訂閱釋出模式 --> <property name="pubSubDomain" value="true" /> <property name="receiveTimeout" value="10000" /> </bean> </beans>
2、編寫傳送java,ActiveMQsender.java
package com.by.activeMQ; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; public class ActiveMQsender { public static void main(String[] args) { @SuppressWarnings("resource") ApplicationContext ctx = new ClassPathXmlApplicationContext( "ApplicationContext/applicationContext-send.xml"); JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate"); jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage msg = session.createTextMessage(); // 設定訊息屬性 msg.setStringProperty("mood", "happy"); // 設定訊息內容 msg.setText("Hello World!"); return msg; } }); System.out.println("send end"); } }
3、配置接收xml,applicationContext-receive.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"> <context:property-placeholder location="classpath:/properties/jms.properties" /> <!-- 第一個接收者 --> <!-- 配置JMS連線工廠 --> <bean id="myConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- Session快取數量 --> <property name="sessionCacheSize" value="10" /> <!-- 接收者ID --> <property name="clientId" value="${topic.clientId}" /> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- MQ地址 --> <property name="brokerURL" value="${brokerUrl}" /> </bean> </property> </bean> <!-- 傳送訊息的目的地(一個主題) --> <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 設定訊息主題的名字 --> <constructor-arg index="0" value="${topic.name}" /> </bean> <!-- 生產訊息配置 (自己定義)--> <bean id="myTopicConsumer" class="com.by.activeMQ.ActiveMQreceiver" /> <!-- 訊息監聽器 --> <bean id="myTopicListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="myTopicConsumer" /> <!-- 接收訊息的方法名稱 --> <property name="defaultListenerMethod" value="receive" /> <!-- 不進行訊息轉換 --> <property name="messageConverter"><null/></property> </bean> <!-- 訊息監聽容器 --> <bean id="myListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="myConnectionFactory" /> <!-- 釋出訂閱模式 --> <property name="pubSubDomain" value="true"/> <!-- 訊息持久化 --> <property name="subscriptionDurable" value="true"/> <property name="receiveTimeout" value="10"/> <!-- 接收者ID --> <property name="clientId" value="${topic.clientId}" /> <property name="durableSubscriptionName" value="${topic.clientId}"/> <property name="destination" ref="myDestination" /> <property name="messageListener" ref="myTopicListener" /> </bean> <!-- 第二個接收者 --> <!-- 配置JMS連線工廠 --> <bean id="myConnectionFactory2" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- Session快取數量 --> <property name="sessionCacheSize" value="10" /> <!-- 接收者ID --> <property name="clientId" value="${topic2.clientId}" /> <property name="targetConnectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- MQ地址 --> <property name="brokerURL" value="${brokerUrl}" /> </bean> </property> </bean> <!-- 傳送訊息的目的地(一個主題) --> <bean id="myDestination2" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 設定訊息主題的名字 --> <constructor-arg index="0" value="${topic2.name}" /> </bean> <!-- 生產訊息配置 (自己定義)--> <bean id="myTopicConsumer2" class="com.by.activeMQ.ActiveMQreceiver2" /> <!-- 訊息監聽器 --> <bean id="myTopicListener2" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="myTopicConsumer2" /> <!-- 接收訊息的方法名稱 --> <property name="defaultListenerMethod" value="receive" /> <!-- 不進行訊息轉換 --> <property name="messageConverter"><null/></property> </bean> <!-- 訊息監聽容器 --> <bean id="myListenerContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="myConnectionFactory2" /> <!-- 釋出訂閱模式 --> <property name="pubSubDomain" value="true"/> <!-- 訊息持久化 --> <property name="subscriptionDurable" value="true"/> <property name="receiveTimeout" value="10"/> <!-- 接收者ID --> <property name="clientId" value="${topic2.clientId}" /> <property name="durableSubscriptionName" value="${topic2.clientId}"/> <property name="destination" ref="myDestination2" /> <property name="messageListener" ref="myTopicListener2" /> </bean> </beans>
4、編寫接收java,ActiveMQreceiver.java
package com.by.activeMQ;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import org.springframework.jms.JmsException;
public class ActiveMQreceiver {
public void receive(TextMessage message) throws JmsException, JMSException {
String info = "this is receiver, "
+ " mood is " + message.getStringProperty("mood") + ","
+ "say " + message.getText();
System.out.println(info);
}
}
5、編寫另一個接收java,ActiveMQreceiver.java
package com.by.activeMQ;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import org.springframework.jms.JmsException;
public class ActiveMQreceiver2 {
public void receive(TextMessage message) throws JmsException, JMSException {
String info = "this is receiver2,"
+ " mood is " + message.getStringProperty("mood") + ","
+ "say " + message.getText();
System.out.println(info);
}
}
6、編寫一個main,開啟接收監聽,openReceive.java
package com.by.activeMQ;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class openReceive {
public static void main(String[] args) {
@SuppressWarnings({ "unused", "resource" })
ApplicationContext ctx = new ClassPathXmlApplicationContext("ApplicationContext/applicationContext-receive.xml");
while(true) {
}
}
}
7、編寫一個配置檔案,jms.properties
#send
send.name=Topic_Mood
#receive
topic.name=Topic_Mood
topic.clientId=client_LiLei
topic2.name=Topic_Mood
topic2.clientId=client_HanMei
#url
brokerUrl=failover:(tcp://10.0.0.232:61616)?initialReconnectDelay=1000
8、pom裡面新增activeMQ的依賴
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.3</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.1</version>
</dependency>
耶,執行就ok了。
傳送訊息的輸出是這樣的:
2016-08-05 11:27:19 [ main:0 ] - [ INFO ] Refreshing org[email protected]16011db4: startup date [Fri Aug 05 11:27:19 CST 2016]; root of context hierarchy
2016-08-05 11:27:19 [ main:31 ] - [ INFO ] Loading XML bean definitions from class path resource [ApplicationContext/applicationContext-send.xml]
2016-08-05 11:27:19 [ main:187 ] - [ INFO ] Loading properties file from class path resource [properties/jms.properties]
2016-08-05 11:27:19 [ main:392 ] - [ INFO ] Established shared JMS Connection: ActiveMQConnection {id=ID:MDG42V9PSU28IKA-60542-1470367639797-1:1,clientId=null,started=false}
2016-08-05 11:27:19 [ ActiveMQ Task-1:467 ] - [ INFO ] Successfully connected to tcp://10.0.0.232:61616
send end
接收訊息的輸出是這樣的:
2016-08-05 11:28:04 [ ActiveMQ Task-1:490 ] - [ INFO ] Successfully connected to tcp://10.0.0.232:61616
2016-08-05 11:28:04 [ main:498 ] - [ INFO ] Established shared JMS Connection: ActiveMQConnection {id=ID:MDG42V9PSU28IKA-60544-1470367684739-1:1,clientId=client_LiLei,started=false}
2016-08-05 11:28:04 [ ActiveMQ Task-1:504 ] - [ INFO ] Successfully connected to tcp://10.0.0.232:61616
2016-08-05 11:28:04 [ main:509 ] - [ INFO ] Established shared JMS Connection: ActiveMQConnection {id=ID:MDG42V9PSU28IKA-60544-1470367684739-3:1,clientId=client_HanMei,started=false}
this is receiver2, mood is happy,say Hello World!
this is receiver, mood is happy,say Hello World!
啦啦啦,不知道大家注意了沒有,配置另一個接收者就是,把第一個接收者的配置複製,然後添加個2,再把接收類複製,添加個2,就搞定了。哈哈哈~這種方式也適用於mongodb啊這種配置。在一個工程裡面操作兩個mongodb資料庫。
注:我將程式碼上傳到了csdn。
相關推薦
ActiveMQ訂閱模式持久化實現
我的訴求是,建一個訂閱通道,然後多個客戶端監聽,當某個客戶端掉線後,再上線的時候可以收到它沒有接收到的訊息。 本文主要參考了《使用Spring配置ActiveMQ的釋出訂閱模式》(http://nettm.iteye.com/blog/1828268),將他們複製貼上過來
基於spring-redis釋出訂閱模式的實現
redis配置: Java程式碼 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"
釋出訂閱模式簡單實現
程式碼 export default class Oberver { // 定義一個事件容器 event = {} subscribe (type, fn) { // 訊息型
activemq訂閱模式以及訊息時長和確認機制
程式碼如下: package com.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import ja
SpringBoot+ActiveMq實現訂閱模式(Topic)訊息佇列
上文已經詳細介紹了點對點模式(Queue)下的訊息佇列,今天就來再介紹一下訊息佇列的另一種模式:訂閱模式。 一、訂閱模式的流程 生產者產生一條訊息message放入一個topic中,該topic已經三個消費者訂閱了,那麼被放入topic中的這條訊息,就會同時被這三個消費者取走(當然他們必
activeMQ公布訂閱模式中中經常使用工具類
模式 .text boolean ava cer ttext tex conn contain package com.jms; import java.util.Map; import java.util.concurrent.ConcurrentHashMap
JMS消息隊列ActiveMQ(發布/訂閱模式)
jms activemq 消息隊列 (發布/訂閱模式) 消費者1(Consumer)--訂閱(subcribe)-->主題(Topic)package com.java1234.activemq2; import javax.jms.Connection; import javax.j
發布訂閱模式源碼實現
參數 console 發布 === turn shift lis 定義 pan var shoeObj = {}; // 定義發布者 shoeObj.list = []; // 緩存列表 存放訂閱者回調函數 // 增加訂閱者 shoeObj.listen
js 實現釋出訂閱模式
/* Pubsub */ function Pubsub(){ //存放事件和對應的處理方法 this.handles = {}; } Pubsub.prototype = { //傳入事件型別type和事件處理handle on: function (type, handle) {
SpringBoot2.0之整合ActiveMQ(釋出訂閱模式)
釋出訂閱模式與前面的點對點模式很類似,簡直一毛一樣 注意:釋出訂閱模式 先啟動消費者 公用pom: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XML
RabbitMQ釋出訂閱模式實現
pom檔案: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://mav
通過釋出訂閱模式實現的事件委託
關於這篇文章的背景 之前瞭解到的事件代理不多,就像是一個dom將事件委託給另一個dom,又叫事件委託。後來做了個題目,要實現一個類似jquery的事件委託方法,然後認真的瞭解了一下。然後專注於實現,其實並沒有去看jquery的原始碼,hhh。 釋出訂閱模式大概是目前前端框架使用的一種最常見的設計模式了,而
ActiveMQ(三):ActiveMQ的安全機制、api及訂閱模式demo
一、ActiveMQ安全機制 ActiveMQ是使用jetty部署的,修改密碼需要到相應的配置檔案 配置檔案是這個: 在其第123行新增使用者名稱和密碼,新增配置如下: <plugins> <simpleAuthenti
學習筆記-js釋出/訂閱模式的簡單實現
Publish/Subscribe(釋出/訂閱模式): 在《Javascript設計模式》一書中,這兩種模式還是有些區別的。書中原話如下: * Observer模式要求希望接收到主題通知者的觀察者必須訂閱內容改變的事件 * Subscribe/Publis
使用Spring配置ActiveMQ的釋出訂閱模式
通過Spring對ActiveMQ進行配置開發,釋出訂閱模式,支援訊息的持久化。 需要Spring2.5版本以上,如果有多個訂閱者,每個訂閱者需要指定不同的 clientId 。 釋出者的配置 <?xml version="1.0" encoding="UTF-8"
SpringBoot2整合ActiveMQ實戰之釋出訂閱模式
1、預設消費者並不會消費訂閱釋出型別的訊息,這是由於springboot預設採用的是p2p模式進行訊息的監聽 在配置檔案裡面,註釋掉 #spring.jms.pub-sub-domain=true 2. //需要給topi
訂閱釋出模式——C++實現
只是自己的一個記錄過程,不做詳細解說 這是這次寫的一個流程 Observer.h #ifndef _OBSERVER_H_ #define _OBSERVER_H_ #include <map> #include <mutex> using
activemq中的訂閱模式以及訊息時長和確認機制
直接上程式碼 釋出主題 package com.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class TopicPub {
使用MQTT協議的訂閱釋出模式,實現從伺服器推送訊息到客戶端功能。
3、手機端設定相同的連線主題引數,訂閱訊息。關鍵程式碼:PC端//伺服器埠 賬號 密碼 private String host = "tcp://127.0.0.1:61613"; private String userName = "admin"; private Stri
簡單實現發布訂閱模式
push efi lis 類型 n) 自定義事件 undefined scribe 簡單 發布訂閱模式,基於一個主題/事件通道,希望接收通知的對象(subscriber)通過自定義事件訂閱主題,被激活事件對象(publisher)通過發布主題事件的方式被通知。 js中的事