1. 程式人生 > >MQ系列3 使用Spring傳送,消費topic和queue訊息 activeMQ

MQ系列3 使用Spring傳送,消費topic和queue訊息 activeMQ

簡介

實戰一 , 實戰二 介紹了ActiveMQ的基本概念和配置方式.

本篇將通過一個例項介紹使用spring傳送,消費topic, queue型別訊息的方法. 不懂topic和queue的google 之.

如圖示, TOPIC和QUEUE分別代表一個topic和一個queue訊息通道.

  1. TopicMessageProducer向topic傳送訊息, TopicConsumerA和TopicConsumerB則從topic消費訊息.
  2. QueueMessageProducer向Queue傳送訊息, QueueConsumer從Queue中消費訊息

Spring整合JMS

就像對orm, web的支援一樣, spring同樣支援jms, 為整合jms到已有的專案提供了很多便利的方法. 本篇主要講實戰, 是所以先從配置開始, spring配置jms基本上需要8個部分.

  1. ConnectionFactory. 和jms伺服器的連線, 可以是外部的jms server, 也可以使用embedded ActiveMQ Broker.
  2. Destination. 有topic和queue兩種方式.
  3. JmsTemplate. spring提供的jms模板.
  4. MessageConverter. 訊息轉換器.
  5. MessageProducer. 訊息生產者.
  6. MessageConsumer. 訊息消費者.
  7. MessageListener. 訊息監聽器
  8. MessageListenerContainer. 訊息監聽容器

下面以例項的方式介紹上面8個部分.

1. ConnectionFactory

Xml程式碼  收藏程式碼
  1. <amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://localhost" />  

 brokerURL是指要連線的activeMQ server的地址, activeMQ提供了多種brokerURL, 集體可參見文件.一般我們使用巢狀的ActiveMQ server. 配置如下, 這個配置使用訊息的儲存機制, 伺服器重啟也不會丟失訊息.

Xml程式碼  收藏程式碼
  1. <!--  embedded ActiveMQ Broker -->  
  2.     <amq:broker
     useJmx="false" persistent="true">  
  3.         <amq:persistenceAdapter>  
  4.             <amq:amqPersistenceAdapter directory="d:/amq"/>  
  5.         </amq:persistenceAdapter>  
  6.         <amq:transportConnectors>  
  7.             <amq:transportConnector uri="tcp://localhost:61616" />  
  8.                        <amq:transportConnector uri="vm://localhost:0" />  
  9.         </amq:transportConnectors>  
  10.     </amq:broker>  

 2. Destination

 在例項中我們使用了兩種destination

Xml程式碼  收藏程式碼
  1. <!--  ActiveMQ destinations  -->  
  2. <!--  使用topic方式-->  
  3. <amq:topic name="TOPIC" physicalName="JMS-TEST-TOPIC" />  
  4. <!--  使用Queue方式-->  
  5. <amq:queue name="QUEUE" physicalName="JMS-TEST-QUEUE" />  

 3. JmsTemplate

Xml程式碼  收藏程式碼
  1. <!--  Spring JmsTemplate config -->  
  2.     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
  3.         <property name="connectionFactory">  
  4.             <!--  lets wrap in a pool to avoid creating a connection per send -->  
  5.             <bean class="org.springframework.jms.connection.SingleConnectionFactory">  
  6.                 <property name="targetConnectionFactory" ref="jmsConnectionFactory" />  
  7.             </bean>  
  8.         </property>  
  9.         <!-- custom MessageConverter -->  
  10.         <property name="messageConverter" ref="defaultMessageConverter" />  
  11.     </bean>  

  4. MessageConverter

   MessageConverter實現的是org.springframework.jms.support.converter.MessageConverter介面, 提供訊息的轉換功能. DefaultMessageConverter的實現見附件.

Xml程式碼  收藏程式碼
  1. <bean id="defaultMessageConverter" class="com.andyao.activemq.DefaultMessageConverter" />  

  5. MessageProducer

   例項擁有兩個訊息生產者, 訊息生產者都是POJO, 實現見附件.

Xml程式碼  收藏程式碼
  1. <!-- POJO which send Message uses  Spring JmsTemplate -->  
  2.     <bean id="topicMessageProducer" class="com.andyao.activemq.TopicMessageProducer">  
  3.         <property name="template" ref="jmsTemplate" />  
  4.         <property name="destination" ref="TOPIC" />  
  5.     </bean>  
  6.     <bean id="queueMessageProducer" class="com.andyao.activemq.QueuMessageProducer">  
  7.         <property name="template" ref="jmsTemplate" />  
  8.         <property name="destination" ref="QUEUE" />  
  9.     </bean>  

 6. MessageConsumer

 TOPIC通道有兩個訊息消費者, QUEUE有一個訊息消費者

Xml程式碼  收藏程式碼
  1. <!--  Message Driven POJO (MDP) -->  
  2.     <!-- consumer1 for topic a -->  
  3.     <bean id="topicConsumerA" class="com.andyao.activemq.TopicConsumerA" />  
  4.     <!-- consumer2 for topic a -->  
  5.     <bean id="topicConsumerB" class="com.andyao.activemq.TopicConsumerB" />  
  6.     <!-- consumer for queue -->  
  7.     <bean id="queueConsumer" class="com.andyao.activemq.QueueConsumer" />  

  7. MessageListener

每一個訊息消費者都對應一個MessageListener

Xml程式碼  收藏程式碼
  1. <bean id="topicListenerA" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
  2.         <constructor-arg ref="topicConsumerA" />  
  3.         <!--  may be other method -->  
  4.         <property name="defaultListenerMethod" value="receive" />  
  5.         <!-- custom MessageConverter define -->  
  6.         <property name="messageConverter" ref="defaultMessageConverter" />  
  7.     </bean>  
  8.     <bean id="topicListenerB" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
  9.         <constructor-arg ref="topicConsumerB" />  
  10.         <!--  may be other method -->  
  11.         <property name="defaultListenerMethod" value="receive" />  
  12.         <!-- custom MessageConverter define -->  
  13.         <property name="messageConverter" ref="defaultMessageConverter" />  
  14.     </bean>  
  15.     <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  
  16.         <constructor-arg ref="queueConsumer" />  
  17.         <!--  may be other method -->  
  18.         <property name="defaultListenerMethod" value="receive" />  
  19.         <!-- custom MessageConverter define -->  
  20.         <property name="messageConverter" ref="defaultMessageConverter" />  
  21.     </bean>  

 8. MessageListenerContainer

 有幾個MessageListener既有幾個MessageListenerContainer

Xml程式碼  收藏程式碼
  1. <bean id="topicListenerContainerA" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  2.         <property name="connectionFactory" ref="jmsConnectionFactory" />  
  3.         <property name="destination" ref="TOPIC" />  
  4.         <property name="messageListener" ref="topicListenerA" />  
  5.     </bean>  
  6.     <bean id="topicListenerContainerB" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  7.         <property name="connectionFactory" ref="jmsConnectionFactory" />  
  8.         <property name="destination" ref="TOPIC" />  
  9.         <property name="messageListener" ref="topicListenerB" />  
  10.     </bean>  
  11.     <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  12.         <property name="connectionFactory" ref="jmsConnectionFactory" />  
  13.         <property name="destination" ref="QUEUE" />  
  14.         <property name="messageListener" ref="queueListener" />  
  15.     </bean>  

  Summary

寫spring配置檔案的時候, 要把MessageProducer, MessageConsumer,MessageListener,MessageListenerContainer幾個地方弄清楚:

  1. 可以有一個或者多個訊息生產者向同一個destination傳送訊息.
  2. queue型別的只能有一個訊息消費者.
  3. topic型別的可以有多個訊息消費者.
  4. 每個消費者對應一個MessageListener和一個MessageListenerContainer.

 這裡幫同事打個廣告:家裡種芒果,提前找批發零售渠道  可以以批發價個人預定

http://9community.com

相關推薦

MQ系列3 使用Spring傳送,消費topicqueue訊息 activeMQ

簡介 實戰一 , 實戰二 介紹了ActiveMQ的基本概念和配置方式. 本篇將通過一個例項介紹使用spring傳送,消費topic, queue型別訊息的方法. 不懂topic和queue的google 之. 如圖示, TOPIC和QUEUE分別代表一個top

ActiveMQ5.0實戰三:使用Spring傳送,消費topicqueue訊息

簡介 上一篇http://www.iteye.com/topic/15317介紹了ActiveMQ5.0的安裝,這一篇將介紹的配置。ActiveMQ包含了很多features(詳見http://activemq.apache.org/features.html ),  

ActiveMQ5.0實戰三:使用Spring發送,消費topicqueue消息

cme 擁有 支持 add gin logs ges .get sum 實戰一 , 實戰二 介紹了ActiveMQ的基本概念和配置方式. 本篇將通過一個實例介紹使用spring發送,消費topic, queue類型消息的方法. 不懂topic和queue的google 之

Android 系列 3.8使用Log.dLogCat進行除錯

3.8使用Log.d和LogCat進行除錯 問題 通常Java程式碼編譯沒有錯誤,但有時執行的應用程式崩潰,給出“強制關閉”(或類似)錯誤訊息。 解 使用LogCat訊息除錯程式碼對於發現自己處於這種情況的開發人員是一個有用的技術。 討論 熟悉Java程式設計的人可能在除錯

Kafka系列3-python版本producer生產者consumer消費者例項

直接上程式碼了: # -*- coding: utf-8 -*- ''' 使用kafka-Python 1.3.3模組 ''' import sys import time import json from kafka import KafkaProduce

topic queue的區別應用 activeMQ

剛接觸activeMQ,在這裡分析一下topic和queue的區別,在我這的理解,最基本的區別便是queue是佇列,訊息可以存在佇列裡面,只要觀察者去接收,這個訊息就被接收到了,就消失了,就像生產啤酒,生產了很多,看你什麼時候運走,然後topic就相當是廣播功能,在某個時刻

springcloud系列—Stream—第8章-3: Spring Cloud Stream 訊息驅動(消費組)

使用消費組實現訊息消費的負載均衡 通常在生產環境,我們的每個服務都不會以單節點的方式執行在生產環境,當同一個服務啟動多個例項的時候,這些例項都會繫結到同一個訊息通道的目標主題(Topic)上。 預設情況下,當生產者發出一條訊息到繫結通道上,這條訊息會產生多個副本被每個消費者例項接收和處理,但

Spring整合JMS、IBM MQ傳送接收訊息

最近才接觸到MQ,由於之前完全不知道是幹嘛用的,還是很花了一點時間研究的~先來簡單解釋一下名詞啦 一、名詞解釋 MQ MQ(message queue)指訊息佇列,是應用程式對應用程式的通訊方法。可以利用訊息佇列暫存資料報文。 MQ的原理其實就是生產者

SpringMVC系列(十五)Spring MVC與Spring整合時實例被創建兩次的解決方案以及Spring 的 IOC 容器 SpringMVC 的 IOC 容器的關系

問題 nbsp frame ota 展示 not als pri exc 一、Spring MVC與Spring整合時實例被創建兩次的解決方案 1.問題產生的原因 Spring MVC的配置文件和Spring的配置文件裏面都使用了掃描註解<context:compon

3.Spring系列之IOC&DI

java程序 xmlns 配置文件 string main 設計 instance 什麽是 資源獲取 一、什麽是IOC? 1.概念 IOC—Inversion of Control,即“控制反轉”,不是新的技術,而是一種設計思想。Java開發中,IOC意味著將你設計好的對

c#基礎系列3---深入理解ref out

ref 聲明 函數的參數 .... -- 新增 tel struct 結果 “大菜”:源於自己剛踏入猿途混沌時起,自我感覺不是一般的菜,因而得名“大菜”,於自身共勉。 擴展閱讀 c#基礎系列1---深入理解 值類型和引用類型 c#基礎系列2---深入理解 Str

2.3用Options建立配置實體的映射「深入淺出ASP.NET Core系列

實體 add 謝謝 lar 化學 機制 失效 github tar 希望給你3-5分鐘的碎片化學習,可能是坐地鐵、等公交,積少成多,水滴石穿,謝謝關註。 Startup.cs中創建MVC中間件 關鍵代碼:services.AddMvc();app.

C#知識點總結系列3、C#中DelegateEvent以及它們的區別

的區別 sent () exit 功能 final 通知 bsp t對象 1.Monitor.Enter(object)方法是獲取鎖,Monitor.Exit(object)方法是釋放鎖,這就是Monitor最常用的兩個方法,當然在使用過程中為了避免獲取鎖之後因為異常,致鎖

C#系列 ----- 3 值引用物件引用

值型別和引用型別(Value Types Versus Reference Types) 上一篇對於type的定義其實不準確,在此給出更準確的定義。 所有的C#型別包括: Value types Reference types Generic type par

Spring Boot實戰系列(3)AOP面向切面程式設計

AOP是一種與語言無關的程式思想、程式設計正規化。專案業務邏輯中,將通用的模組以水平切割的方式進行分離統一處理,常用於日誌、許可權控制、異常處理等業務中。 快速導航 引入AOP依賴 AOP常用註解解析 實現日誌分割功能 @Pointcut 新增切入點 @Be

python爬蟲系列(3.4-使用xpathlxml爬取伯樂線上)

一、爬取的程式碼 1、網站地址 2、具體實現程式碼 import requests from lxml import etree class JobBole(object):     def __init__(self):     &

JAVA學習筆記系列3-JVM、JREJDK的區別

JVM(Java Virtual Machine)就是一個虛擬的用於執行bytecode位元組碼的“虛擬計算機”。它和os打交道 JRE(Java Runtime Environment)包含:Java虛擬機器、庫函式、執行java應用程式所必須的檔案。它包含了JVM JDK(Java Developme

資料結構算法系列3--複雜度分析(下)

複雜度分析的4個概念 1.最壞情況時間複雜度:程式碼在最理想情況下執行的時間複雜度。 2.最好情況時間複雜度:程式碼在最壞情況下執行的時間複雜度。 3.平均時間複雜度:用程式碼在所有情況下執行的次數的加權平均值表示。 4.均攤時間複雜度:在程式碼執行的所有複雜度情況中絕大部分是低級別的複

pytorch系列 --3 Variable,Tensor Gradient

Variable & Automatic Gradient Calculation Tensor vs Variable graph and gradient 注意,在pytorch0.4中,tensor和pytorch合併了。 https://pytorch.

RabbitMQ系列(四)RabbitMQ事務Confirm傳送訊息確認——深入解讀(轉載)

原文地址:https://yq.aliyun.com/articles/629858   RabbitMQ事務和Confirm傳送方訊息確認——深入解讀 RabbitMQ系列文章 RabbitMQ在Ubuntu上的環境搭建 深入瞭解RabbitMQ工作原理及簡單使用 Rabbi