1. 程式人生 > >ActiveMQ訂閱模式持久化實現

ActiveMQ訂閱模式持久化實現

我的訴求是,建一個訂閱通道,然後多個客戶端監聽,當某個客戶端掉線後,再上線的時候可以收到它沒有接收到的訊息。

本文主要參考了《使用Spring配置ActiveMQ的釋出訂閱模式》(http://nettm.iteye.com/blog/1828268),將他們複製貼上過來,基本上就ok了。
在找到這篇文章前,《如何實現ActiveMq的Topic的持久訂閱》(http://www.mytju.com/classcode/news_readNews.asp?newsID=486)給了我提示。要設定clientID啊~


我根據參考的文章編寫了工程,測試後發現是ok的。並且發現了一個細節。
比如,有一個clientID=ID_1,有個通道是topic_channel。往topic_channel中傳送一批訊息(A批)後,首次啟動ID_1的監聽,ID_1沒有收到。關掉ID_1的監聽,再往topic_channel中傳送一批訊息(B批)後,再次啟動ID_1的監聽,收到了B批訊息。這個現象說明了什麼呢?說明你不僅要有自己唯一的ID(clientID)還要事先跟這個通道報告一聲,說你要監聽它,訂閱的訊息來了之後給它留一份,等著它來拿。在你報告之前產生的訊息,你是拿不到的,因為沒有你的份。


為了體現我真的動了腦筋,思考了。所以我發表的工程,是做了一些修改的,比如,通道名啊,ID名啊,檔名啊,還調整了一下檔案結構,哈哈哈~
當然我還做了一個質的突破,我在一個工程裡面做了兩個接收者,監聽同一個訂閱通道。


實現步驟:
1、配置傳送xml,applicationContext-send.xml
<?xml version="1.0" encoding="UTF-8"?>  
  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans    
         http://www.springframework.org/schema/beans/spring-beans-2.5.xsd    
         http://www.springframework.org/schema/context    
         http://www.springframework.org/schema/context/spring-context-2.5.xsd">  
  <context:property-placeholder location="classpath:/properties/jms.properties" />
  
    <!-- 配置JMS連線工廠 -->  
    <bean id="myConnectionFactory"  
        class="org.springframework.jms.connection.CachingConnectionFactory">  
        <!-- Session快取數量 -->  
        <property name="sessionCacheSize" value="10" />  
        <property name="targetConnectionFactory">  
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
                <!-- MQ地址 -->  
                <property name="brokerURL" value="${brokerUrl}" />  
                 <!-- 是否非同步傳送 -->  
                <property name="useAsyncSend" value="true" />  
            </bean>  
        </property>  
    </bean>  
  
    <!-- 傳送訊息的目的地(一個主題) -->  
    <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">  
        <!-- 設定訊息主題的名字 -->  
        <constructor-arg index="0" value="${send.name}" />  
    </bean>  
  
    <!-- 配置JMS模版 -->  
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
        <property name="connectionFactory" ref="myConnectionFactory" />  
        <property name="defaultDestination" ref="myDestination" />  
        <!-- 訂閱釋出模式 -->  
        <property name="pubSubDomain" value="true" />  
        <property name="receiveTimeout" value="10000" />  
    </bean>  
</beans> 


2、編寫傳送java,ActiveMQsender.java

package com.by.activeMQ;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class ActiveMQsender {
	public static void main(String[] args) {
		@SuppressWarnings("resource")
		ApplicationContext ctx = new ClassPathXmlApplicationContext(
				"ApplicationContext/applicationContext-send.xml");

		JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");

		jmsTemplate.send(new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				TextMessage msg = session.createTextMessage();
				// 設定訊息屬性
				msg.setStringProperty("mood", "happy");
				// 設定訊息內容
				msg.setText("Hello World!");
				return msg;
			}
		});

		System.out.println("send end");
	}
}


3、配置接收xml,applicationContext-receive.xml
<?xml version="1.0" encoding="UTF-8"?>  
  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans    
         http://www.springframework.org/schema/beans/spring-beans-2.5.xsd    
         http://www.springframework.org/schema/context    
         http://www.springframework.org/schema/context/spring-context-2.5.xsd">  
  <context:property-placeholder location="classpath:/properties/jms.properties" />
  
  <!-- 第一個接收者 -->
    <!-- 配置JMS連線工廠 -->  
    <bean id="myConnectionFactory"  
        class="org.springframework.jms.connection.CachingConnectionFactory">  
        <!-- Session快取數量 -->  
        <property name="sessionCacheSize" value="10" />  
        <!-- 接收者ID -->  
        <property name="clientId" value="${topic.clientId}" />  
        <property name="targetConnectionFactory">  
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
                <!-- MQ地址 -->  
                <property name="brokerURL" value="${brokerUrl}" />  
            </bean>  
        </property>  
    </bean>  
  
    <!-- 傳送訊息的目的地(一個主題) -->  
    <bean id="myDestination" class="org.apache.activemq.command.ActiveMQTopic">  
        <!-- 設定訊息主題的名字 -->  
        <constructor-arg index="0" value="${topic.name}" />  
    </bean>  
  
    <!-- 生產訊息配置 (自己定義)-->  
    <bean id="myTopicConsumer" class="com.by.activeMQ.ActiveMQreceiver" />  
  
    <!-- 訊息監聽器 -->  
    <bean id="myTopicListener"  
        class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
        <constructor-arg ref="myTopicConsumer" />  
        <!-- 接收訊息的方法名稱 -->  
        <property name="defaultListenerMethod" value="receive" />  
        <!-- 不進行訊息轉換 -->  
        <property name="messageConverter"><null/></property>  
    </bean>  
  
    <!-- 訊息監聽容器 -->  
    <bean id="myListenerContainer"  
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="myConnectionFactory" />  
        <!-- 釋出訂閱模式 -->  
        <property name="pubSubDomain" value="true"/>  
        <!-- 訊息持久化 -->  
        <property name="subscriptionDurable" value="true"/>  
        <property name="receiveTimeout" value="10"/>  
        <!-- 接收者ID -->  
        <property name="clientId" value="${topic.clientId}" />  
        <property name="durableSubscriptionName" value="${topic.clientId}"/>  
        <property name="destination" ref="myDestination" />  
        <property name="messageListener" ref="myTopicListener" />  
    </bean>  
    
      
  <!-- 第二個接收者 -->
  
         <!-- 配置JMS連線工廠 -->  
    <bean id="myConnectionFactory2"  
        class="org.springframework.jms.connection.CachingConnectionFactory">  
        <!-- Session快取數量 -->  
        <property name="sessionCacheSize" value="10" />  
        <!-- 接收者ID -->  
        <property name="clientId" value="${topic2.clientId}" />  
        <property name="targetConnectionFactory">  
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
                <!-- MQ地址 -->  
                <property name="brokerURL" value="${brokerUrl}" />  
            </bean>  
        </property>  
    </bean>  
  
    <!-- 傳送訊息的目的地(一個主題) -->  
    <bean id="myDestination2" class="org.apache.activemq.command.ActiveMQTopic">  
        <!-- 設定訊息主題的名字 -->  
        <constructor-arg index="0" value="${topic2.name}" />  
    </bean>  
  
    <!-- 生產訊息配置 (自己定義)-->  
    <bean id="myTopicConsumer2" class="com.by.activeMQ.ActiveMQreceiver2" />  
  
    <!-- 訊息監聽器 -->  
    <bean id="myTopicListener2"  
        class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
        <constructor-arg ref="myTopicConsumer2" />  
        <!-- 接收訊息的方法名稱 -->  
        <property name="defaultListenerMethod" value="receive" />  
        <!-- 不進行訊息轉換 -->  
        <property name="messageConverter"><null/></property>  
    </bean>  
  
    <!-- 訊息監聽容器 -->  
    <bean id="myListenerContainer2"  
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="myConnectionFactory2" />  
        <!-- 釋出訂閱模式 -->  
        <property name="pubSubDomain" value="true"/>  
        <!-- 訊息持久化 -->  
        <property name="subscriptionDurable" value="true"/>  
        <property name="receiveTimeout" value="10"/>  
        <!-- 接收者ID -->  
        <property name="clientId" value="${topic2.clientId}" />  
        <property name="durableSubscriptionName" value="${topic2.clientId}"/>  
        <property name="destination" ref="myDestination2" />  
        <property name="messageListener" ref="myTopicListener2" />  
    </bean> 
  
</beans>  


4、編寫接收java,ActiveMQreceiver.java
package com.by.activeMQ;

import javax.jms.JMSException;
import javax.jms.TextMessage;

import org.springframework.jms.JmsException;

public class ActiveMQreceiver {
	public void receive(TextMessage message) throws JmsException, JMSException { 
		String info = "this is receiver, "
				+ " mood is " + message.getStringProperty("mood") + ","
				+ "say " + message.getText();
        System.out.println(info);
    } 
}



5、編寫另一個接收java,ActiveMQreceiver.java
package com.by.activeMQ;

import javax.jms.JMSException;
import javax.jms.TextMessage;

import org.springframework.jms.JmsException;

public class ActiveMQreceiver2 {
	public void receive(TextMessage message) throws JmsException, JMSException { 
		String info = "this is receiver2,"
				+ " mood is " + message.getStringProperty("mood") + ","
				+ "say " + message.getText();
        System.out.println(info);
    } 
}



6、編寫一個main,開啟接收監聽,openReceive.java
package com.by.activeMQ;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class openReceive {

	public static void main(String[] args) {
		@SuppressWarnings({ "unused", "resource" })
		ApplicationContext ctx = new ClassPathXmlApplicationContext("ApplicationContext/applicationContext-receive.xml");  
        while(true) {  
        } 
	}

}



7、編寫一個配置檔案,jms.properties

#send
send.name=Topic_Mood

#receive
topic.name=Topic_Mood
topic.clientId=client_LiLei

topic2.name=Topic_Mood
topic2.clientId=client_HanMei

#url
brokerUrl=failover:(tcp://10.0.0.232:61616)?initialReconnectDelay=1000

8、pom裡面新增activeMQ的依賴

<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-pool</artifactId>
	<version>5.11.1</version>
</dependency>
<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-pool2</artifactId>
	<version>2.3</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-jms</artifactId>
	<version>4.0.0.RELEASE</version>
</dependency>

<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-all</artifactId>
	<version>5.11.1</version>
</dependency>



耶,執行就ok了。
傳送訊息的輸出是這樣的:
2016-08-05 11:27:19 [ main:0 ] - [ INFO ] Refreshing org[email protected]16011db4: startup date [Fri Aug 05 11:27:19 CST 2016]; root of context hierarchy
2016-08-05 11:27:19 [ main:31 ] - [ INFO ] Loading XML bean definitions from class path resource [ApplicationContext/applicationContext-send.xml]
2016-08-05 11:27:19 [ main:187 ] - [ INFO ] Loading properties file from class path resource [properties/jms.properties]
2016-08-05 11:27:19 [ main:392 ] - [ INFO ] Established shared JMS Connection: ActiveMQConnection {id=ID:MDG42V9PSU28IKA-60542-1470367639797-1:1,clientId=null,started=false}
2016-08-05 11:27:19 [ ActiveMQ Task-1:467 ] - [ INFO ] Successfully connected to tcp://10.0.0.232:61616
send end


接收訊息的輸出是這樣的:
2016-08-05 11:28:04 [ ActiveMQ Task-1:490 ] - [ INFO ] Successfully connected to tcp://10.0.0.232:61616
2016-08-05 11:28:04 [ main:498 ] - [ INFO ] Established shared JMS Connection: ActiveMQConnection {id=ID:MDG42V9PSU28IKA-60544-1470367684739-1:1,clientId=client_LiLei,started=false}
2016-08-05 11:28:04 [ ActiveMQ Task-1:504 ] - [ INFO ] Successfully connected to tcp://10.0.0.232:61616
2016-08-05 11:28:04 [ main:509 ] - [ INFO ] Established shared JMS Connection: ActiveMQConnection {id=ID:MDG42V9PSU28IKA-60544-1470367684739-3:1,clientId=client_HanMei,started=false}
this is receiver2, mood is happy,say Hello World!
this is receiver,  mood is happy,say Hello World!


啦啦啦,不知道大家注意了沒有,配置另一個接收者就是,把第一個接收者的配置複製,然後添加個2,再把接收類複製,添加個2,就搞定了。哈哈哈~這種方式也適用於mongodb啊這種配置。在一個工程裡面操作兩個mongodb資料庫。

注:我將程式碼上傳到了csdn。


相關推薦

ActiveMQ訂閱模式持久化實現

我的訴求是,建一個訂閱通道,然後多個客戶端監聽,當某個客戶端掉線後,再上線的時候可以收到它沒有接收到的訊息。 本文主要參考了《使用Spring配置ActiveMQ的釋出訂閱模式》(http://nettm.iteye.com/blog/1828268),將他們複製貼上過來

基於spring-redis釋出訂閱模式實現

redis配置:  Java程式碼  <?xml version="1.0" encoding="UTF-8"?>   <beans xmlns="http://www.springframework.org/schema/beans"       

釋出訂閱模式簡單實現

程式碼 export default class Oberver { // 定義一個事件容器 event = {} subscribe (type, fn) { // 訊息型

activemq訂閱模式以及訊息時長和確認機制

程式碼如下: package com.activemq;   import org.apache.activemq.ActiveMQConnectionFactory;   import ja

SpringBoot+ActiveMq實現訂閱模式(Topic)訊息佇列

上文已經詳細介紹了點對點模式(Queue)下的訊息佇列,今天就來再介紹一下訊息佇列的另一種模式:訂閱模式。 一、訂閱模式的流程 生產者產生一條訊息message放入一個topic中,該topic已經三個消費者訂閱了,那麼被放入topic中的這條訊息,就會同時被這三個消費者取走(當然他們必

activeMQ公布訂閱模式中中經常使用工具類

模式 .text boolean ava cer ttext tex conn contain package com.jms; import java.util.Map; import java.util.concurrent.ConcurrentHashMap

JMS消息隊列ActiveMQ(發布/訂閱模式)

jms activemq 消息隊列 (發布/訂閱模式) 消費者1(Consumer)--訂閱(subcribe)-->主題(Topic)package com.java1234.activemq2; import javax.jms.Connection; import javax.j

發布訂閱模式源碼實現

參數 console 發布 === turn shift lis 定義 pan var shoeObj = {}; // 定義發布者 shoeObj.list = []; // 緩存列表 存放訂閱者回調函數 // 增加訂閱者 shoeObj.listen

js 實現釋出訂閱模式

/* Pubsub */ function Pubsub(){   //存放事件和對應的處理方法   this.handles = {}; } Pubsub.prototype = {   //傳入事件型別type和事件處理handle   on: function (type, handle) {     

SpringBoot2.0之整合ActiveMQ(釋出訂閱模式

釋出訂閱模式與前面的點對點模式很類似,簡直一毛一樣   注意:釋出訂閱模式 先啟動消費者 公用pom: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XML

RabbitMQ釋出訂閱模式實現

 pom檔案: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://mav

通過釋出訂閱模式實現的事件委託

關於這篇文章的背景 之前瞭解到的事件代理不多,就像是一個dom將事件委託給另一個dom,又叫事件委託。後來做了個題目,要實現一個類似jquery的事件委託方法,然後認真的瞭解了一下。然後專注於實現,其實並沒有去看jquery的原始碼,hhh。 釋出訂閱模式大概是目前前端框架使用的一種最常見的設計模式了,而

ActiveMQ(三):ActiveMQ的安全機制、api及訂閱模式demo

一、ActiveMQ安全機制 ActiveMQ是使用jetty部署的,修改密碼需要到相應的配置檔案  配置檔案是這個: 在其第123行新增使用者名稱和密碼,新增配置如下: <plugins> <simpleAuthenti

學習筆記-js釋出/訂閱模式的簡單實現

Publish/Subscribe(釋出/訂閱模式): 在《Javascript設計模式》一書中,這兩種模式還是有些區別的。書中原話如下: * Observer模式要求希望接收到主題通知者的觀察者必須訂閱內容改變的事件 * Subscribe/Publis

使用Spring配置ActiveMQ的釋出訂閱模式

通過Spring對ActiveMQ進行配置開發,釋出訂閱模式,支援訊息的持久化。 需要Spring2.5版本以上,如果有多個訂閱者,每個訂閱者需要指定不同的 clientId 。 釋出者的配置 <?xml version="1.0" encoding="UTF-8"

SpringBoot2整合ActiveMQ實戰之釋出訂閱模式

1、預設消費者並不會消費訂閱釋出型別的訊息,這是由於springboot預設採用的是p2p模式進行訊息的監聽      在配置檔案裡面,註釋掉 #spring.jms.pub-sub-domain=true   2.  //需要給topi

訂閱釋出模式——C++實現

只是自己的一個記錄過程,不做詳細解說 這是這次寫的一個流程 Observer.h #ifndef _OBSERVER_H_ #define _OBSERVER_H_ #include <map> #include <mutex> using

activemq中的訂閱模式以及訊息時長和確認機制

直接上程式碼 釋出主題 package com.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class TopicPub {

使用MQTT協議的訂閱釋出模式實現從伺服器推送訊息到客戶端功能。

3、手機端設定相同的連線主題引數,訂閱訊息。關鍵程式碼:PC端//伺服器埠 賬號 密碼 private String host = "tcp://127.0.0.1:61613"; private String userName = "admin"; private Stri

簡單實現發布訂閱模式

push efi lis 類型 n) 自定義事件 undefined scribe 簡單 發布訂閱模式,基於一個主題/事件通道,希望接收通知的對象(subscriber)通過自定義事件訂閱主題,被激活事件對象(publisher)通過發布主題事件的方式被通知。 js中的事