RabbitMQ學習(三):Spring整合RabbitMQ
1、maven依賴
<!-- rabbitMQ依賴 -->
<!-- spring相關的其他依賴請參考原始碼,此處不做過多描述 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.6.RELEASE</version>
</dependency>
2、spring-rabbit配置
2.1、配置rabbitMQ連線
(1)採用<rabbit/>方式配置連線
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="" publisher-confirms="true" publisher-returns="true" channel-cache-size="5" />
(2)採用普通方式配置連線
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="${rabbitmq.host}" /> <property name="username" value="${rabbitmq.username}" /> <property name="password" value="${rabbitmq.password}" /> <property name="port" value="${rabbitmq.port}" /> <!-- 快取中要維護的通道數 --> <property name="channelCacheSize" value="5" /> <!-- 開啟發送確認機制 --> <property name="publisherConfirms" value="true"/> <!-- 開啟結果返回機制 --> <property name="publisherReturns" value="true"/> </bean>
2.2、配置RabbitAdmin
<rabbit:admin connection-factory="connectionFactory" />
2.3、定義Queue
<rabbit:queue name="queueTest" durable="true" auto-delete="false"
exclusive="false" />
常用引數解釋:
durable:是否持久化
auto-delete:是否當沒有連線時自動刪除
exclusive:是否只能由建立者使用
2.4、定義Exchange,並繫結Queue
(1)定義direct型別
<rabbit:direct-exchange name="exchangeTest"
durable="true" auto-delete="false">
<rabbit:bindings>
<!-- 此處沒有指定RoutingKey -->
<rabbit:binding queue="queueTest" key=""></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
(2)定義topic型別
<rabbit:topic-exchange name="topicexchangetest" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="" key=""></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
(3)定義fanout型別
<rabbit:fanout-exchange name=""></rabbit:topic-exchange>
(4)定義headers型別
<rabbit:headers-exchange name=""></rabbit:topic-exchange>
常用引數解釋:
durable:是否持久化
auto-delete:是否當沒有連線時自動刪除
注意:在此例中,如果不把Exchange和queue進行繫結,傳送訊息的時候ConfirmCallback依然正常執行,因為訊息到達了Exchange。但是ReturnCallback就會執行回撥方法,傳回錯誤資訊:NO_ROUTE。Exchange沒有找到指定的Queue,丟棄這條訊息並把訊息返回給生產者。回撥順序是ReturnCallback在前,ConfirmCallback在後。
2.5、定義rabbit template
(1)普通方式配置rabbitTemplate
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate" id="rabbitTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="exchange" value="exchangeTest" />
<!-- 啟動AMQP協議層面事務機制來解決傳送確認機制,但是採用事務機制實現會降低RabbitMQ的訊息吞吐量。
RabbitMQ團隊為我們拿出了更好的方案,即採用傳送方確認模式,事務機制和confirmCallback只能二選一 -->
<!-- <property name="channelTransacted" value="true" /> -->
<!-- mandatory 監聽是否有符合的佇列 -->
<property name="mandatory" value="true"/>
<!-- 設定傳送確認回執監聽方法 -->
<property name="confirmCallback" ref="confirmcallback" />
<!-- 設定結果返回監聽方法 -->
<property name="returnCallback" ref="MyReturnCallback"/>
<!-- 設定訊息轉換 -->
<property name="messageConverter" ref="JsonMessageConverter" />
</bean>
(2)<rabbit/>方式配置rabbitTemplate和相關監聽器
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
exchange="exchangeTest"
mandatory="true"
confirm-callback="confirmcallback"
return-callback="MyReturnCallback"
encoding="UTF-8"
message-converter="JsonMessageConverter"
>
</rabbit:template>
常用引數解釋:
mandatory:監聽是否有符合的佇列
confirm-callback:訊息確認回撥類
return-callback:mandatory監聽結果回撥類
encoding:編碼
message-converter:訊息轉化類
(1)其他相關配置
<bean id="JsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter">
<!-- if necessary, override the DefaultClassMapper -->
<property name="classMapper" ref="customClassmapper" />
</bean>
<bean id="customClassmapper" class="pruducer.com.taikang.tkh.message.common.CustomClassMapper"/>
<bean id="confirmcallback" class="pruducer.com.taikang.tkh.message.common.ConfirmReturnBack"/>
<bean id="MyReturnCallback" class="pruducer.com.taikang.tkh.message.common.MyReturnCallback"/>
2.6配置Consumer
<rabbit:listener-container
prefetch="10"
connection-factory="connectionFactory"
acknowledge="manual">
<rabbit:listener queues="queueTest" ref="messageReceiver" />
</rabbit:listener-container>
<!-- 自定義訊息接收者 -->
<bean id="messageReceiver" class="consumer.com.taikang.tkh.message.listener.MessageConsumer"></bean>
常用引數解釋:
prefetch:訊息預取數目為每次接收10條。
acknowledge:acknowledge="manual":意為表示該消費者的ack方式為手動 ;acknowledge="auto"表示自動。
3、spring-rabbit常用方法介紹
3.1、訊息傳送
/**
* 傳送訊息到預設的交換機和佇列(不帶有自定義引數)
* @param messageObject 訊息物件
* @return boolean 傳送標記
*/
RabbitTemplate.convertAndSend(messageObject);
/**
* 傳送訊息到預設的交換機和佇列
* @param messageObject 訊息物件
* @param messageObject 自定義引數,在監聽器ConfirmCallback中可以取到。
* @return boolean 傳送標記
*/
RabbitTemplate.correlationConvertAndSend(messageObject,correlationdata);
/**
* 傳送訊息到指定的佇列
* @param queue 佇列名稱
* @param messageObject 訊息物件
* @param messageObject 自定義引數,在監聽器ConfirmCallback中可以取到。
* @return boolean 傳送標記
*/
RabbitTemplate.convertAndSend(queue, messageObject,correlationdata);
/**
* 傳送訊息到指定的交換機和佇列
* @param exchange 交換機名稱
* @param queue 佇列名稱
* @param messageObject 自定義引數,在監聽器ConfirmCallback中可以取到。
* @return boolean 傳送標記
*/
RabbitTemplate.convertAndSend(exchange,queue,messageObject,correlationdata);
/**
* 傳送訊息到預設的交換機和佇列(不帶有自定義引數)
Send方法還有很多,此處只列舉一種
* @param Message AMQP封裝的訊息物件
* @return void
*/
RabbitTemplate.send(Message message);
注意:凡是帶有convert的系統都會自動把訊息轉換為AMQP的Message物件;
沒有convert的需要自己將傳送的物件轉換為Message物件。
3.2、Confirm監聽
public class ConfirmReturnBack implements ConfirmCallback{
/**
* Confirmation callback.
* @param correlationData 回撥的相關資料.
* @param ack true for ack, false for nack
* @param cause 專門給NACK準備的一個可選的原因,其他情況為null。
*/
public void confirm(CorrelationData correlationdata, boolean ack, String cause) {
System.out.println("Exchange接收是否成功(ack): " + ack + "。 返回的使用者引數(correlationData): " + correlationdata + "。NACK原因(cause) : " + cause);
}
}
注意:CorrelationData 是在傳送訊息時傳入回撥方法的引數,可以用於區分訊息物件。 CorrelationData物件中只有一個屬性 String id。通過這個引數,我們可以區分當前是傳送哪一條訊息時的回撥,並通過ack引數來進行失敗重發功能。
3.3、Return監聽
/**
* 實現此方法在basicpublish失敗時回撥
* 相當於 ReturnListener的功能。
* 在釋出訊息時設定mandatory等於true,
* 監聽訊息是否有相匹配的佇列,沒有時ReturnCallback將執行returnedMessage方法,訊息將返給傳送者
*/
public class MyReturnCallback implements ReturnCallback {
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
try {
System.out.println("訊息傳送進指定佇列失敗:失敗原因(+replyText):"+replyText
+";錯誤程式碼(replyCode):"+replyCode
+";訊息物件:"+new String(message.getBody(),"UTF-8")
+"exchange:"+exchange
+"routingKey:"+routingKey);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
3.4、訊息處理
/**
* 訊息到達消費者監聽類
*/
public class MessageConsumer implements ChannelAwareMessageListener {
/**
* 處理收到的rabbit訊息的回撥方法。
* @param message AMQP封裝訊息物件
* @param channel 通道物件,可以進行確認回覆
* @throws Exception Any.
*/
public void onMessage(Message message, Channel channel) throws Exception {
try {
String srt2=new String(message.getBody(),"UTF-8");
System.out.println("消費者收到訊息:"+srt2);
//成功應答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
//失敗應答
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
注意:ChannelAwareMessageListener和MessageListener兩個介面都實現對訊息到達消費者時的監聽,只不過MessageListener的onMessage方法沒有Channel引數,不能實現訊息手動應答功能。
4、Demo專案完整程式碼
【四川樂山程式設計師聯盟,歡迎大家加群相互交流學習5 7 1 8 1 4 7 4 3】
相關推薦
RabbitMQ學習(三):Spring整合RabbitMQ
1、maven依賴 <!-- rabbitMQ依賴 --> <!-- spring相關的其他依賴請參考原始碼,此處不做過多描述 --> <dependency>
學習之路-RabbitMQ(三):SpringBoot整合RabbitMQ
一:引入RabbitMQ的相關jar包: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp
Spring(三):Spring整合Hibernate
ng- checkout wait 哪些 check driver eas package class 背景: 本文主要介紹使用spring-framework-4.3.8.RELEASE與hibernate-release-5.2.9.Final項目整合搭建
rabbitmq學習(三):rabbitmq之扇形交換機、主題交換機
前言 上篇我們學習了rabbitmq的作用以及直連交換機的程式碼實現,這篇我們繼續看如何用程式碼實現扇形交換機和主題交換機 一、扇形交換機 1.生產者 /** * 生產者 */ public class LogProducer { //交換機名稱 pu
RabbitMQ學習(二):Java使用RabbitMQ要點知識
1、maven依賴 <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId>
Spring boot入門(三):SpringBoot整合結合AdminLTE(Freemarker),利用generate自動生成程式碼,利用DataTable和PageHelper進行分頁顯示
關於SpringBoot和PageHelper,前篇部落格已經介紹過Spring boot入門(二):Spring boot整合MySql,Mybatis和PageHelper外掛,前篇部落格大致講述了SpringBoot如何整合Mybatis和Pagehelper,但是沒有做出實際的範例,本篇部落格是連
SpringBoot學習筆記(三):SpringBoot整合Mybatis、SpringBoot事務管理、SpringBoot多資料來源
SpringBoot整合Mybatis 第一步我們需要在pom.xml裡面引入mybatis相關的jar包 <dependency> <groupId>org.mybatis.spring.boot</groupId> <artif
Spring Boot學習(三):定時任務
一、簡介 專案中經常會遇到使用定時任務的情況,那麼SpringBoot如何實現這種需求呢,還是簡潔到要命。 二、環境準備 eclipse + maven + Spring Boot 三、程式碼示例 pom.xml檔案不用特殊引入依賴包,保持一般初始化就可以,如下
webService學習之路(三):springMVC整合CXF後呼叫已知的wsdl介面
本篇文章將講解SpringMVC+CXF環境下,怎麼呼叫其他系統通過webService方式暴露出來的介面 ① 為避免懷疑同一個專案中呼叫本專案的介面,這裡我新開啟一個eclipse通過最原始的方式釋出了一個webservice並啟動保證可以被訪問 開啟瀏覽器確認可以被訪問 ②進入CXF/bin
Spring Boot系列(三):Spring Boot整合Mybatis原始碼解析
一、Mybatis回顧 1、MyBatis介紹 Mybatis是一個半ORM框架,它使用簡單的 XML 或註解用於配置和原始對映,將介面和Java的POJOs(普通的Java 物件)對映成資料庫中的記錄。 2、Mybatis整體架構 二、Spring Boot整合Mybatis +
RabbitMQ學習(三)訂閱/發布
cto submit actor nal chan true exec oid lsp RabbitMQ學習(三)訂閱/發布 1.RabbitMQ模型 前面所學都只用到了生產者、隊列、消費者。如上圖所示,其實生產者並不直接將信息傳輸到隊列中,在生產者和隊列
RabbitMQ學習(六):遠程結果調用
cells actor ble 隨機 get getenv all 求和 int 場景:我們需要在傳輸消息時得到結果 客服端在發送請求時會發送回調隊列,服務端處理事情完成後會將結果返回到回調隊列中,在增加關聯標誌關聯每個請求和服務返回 客戶端代碼: public
vue移動音樂app開發學習(三):輪播圖組件的開發
hub out webapp width eth reat slot utc -1 本系列文章是為了記錄學習中的知識點,便於後期自己觀看。如果有需要的同學請登錄慕課網,找到Vue 2.0 高級實戰-開發移動端音樂WebApp進行觀看,傳送門。 完成後的頁面狀態以及項目結構如
Sping Boot入門到實戰之入門篇(三):Spring Boot屬性配置
git 測試 add 禁用 rop fix ron org set 該篇為Sping Boot入門到實戰系列入門篇的第三篇。介紹Spring Boot的屬性配置。 傳統的Spring Web應用自定義屬性一般是通過添加一個demo.properties配置文件(
Spring學習(3):Spring概述(轉載)
效率 調度 jpa 源代碼 一個 維護 html www hiberna 1. Spring是什麽? Spring是一個開源的輕量級Java SE(Java 標準版本)/Java EE(Java 企業版本)開發應用框架,其目的是用於簡化企業級應用程序開發。 在面向對
PE檔案格式學習(三):匯出表
1.回顧 上篇文章中介紹過,可選頭中的資料目錄表是一個大小為0x10的陣列,匯出表就是這個陣列中的第一個元素。 我們再回顧下資料目錄表的結構體: struct _IMAGE_DATA_DIRECTORY { DWORD VirtualAddress;  
TensorFlow學習(三):tf.reduce_sum()
壓縮求和 tf.reduce_sum( input_tensor, axis=None, keepdims=None, name=None, reduction_indices=None, keep_dims=None ) Args:
網頁開發學習(三):表單
表單是網頁中提供的一種互動式操作手段,無論是提交搜尋的資訊,還是網上註冊等都需要使用表單。使用者可以通過提交表單資訊與伺服器進行動態交流。表單主要可以分為兩部分:一是HTML原始碼描述的表單;二是提交後的表單處理,需要使用伺服器端編寫好 JSP等程式碼對客戶端提交的資訊作出迴應。
ionic學習(三):建立pages頁面
1.建立命令:ionic g page 名稱 2. app.module.ts 引入宣告元件 3.如果跳轉,在跳轉的ts中引入元件 步驟: 1.新建頁面news元件 2.app.module.ts 中引入宣告 3.在home頁面中
rabbitmq學習(四):利用rabbitmq實現遠端rpc呼叫
一、rabbitmq實現rpc呼叫的原理 ·rabbitmq實現rpc的原理是:客戶端向一個佇列中傳送訊息,並註冊一個回撥的佇列用於接收服務端返回的訊息,該訊息需要宣告一個叫做correaltionId的屬性,該屬性將是該次請求的唯一標識。服務端在接受到訊息(在需要時可以驗證correaltionId)後,