1. 程式人生 > >RabbitMQ學習(三):Spring整合RabbitMQ

RabbitMQ學習(三):Spring整合RabbitMQ

1、maven依賴

<!-- rabbitMQ依賴 -->
<!-- spring相關的其他依賴請參考原始碼,此處不做過多描述 -->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.4.6.RELEASE</version>
</dependency>

2spring-rabbit配置

2.1、配置rabbitMQ連線

(1)採用<rabbit/>方式配置連線

<rabbit:connection-factory id="connectionFactory" 
host="${rabbitmq.host}" 
port="${rabbitmq.port}" 
        username="${rabbitmq.username}" 
        password="${rabbitmq.password}" 
        virtual-host=""
        publisher-confirms="true"
        publisher-returns="true"
        channel-cache-size="5"
        />


(2)採用普通方式配置連線

<bean id="connectionFactory"
	class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
		<constructor-arg value="${rabbitmq.host}" />
		<property name="username" value="${rabbitmq.username}" />
		<property name="password" value="${rabbitmq.password}" />
		<property name="port" value="${rabbitmq.port}" />
		<!-- 快取中要維護的通道數 -->
		<property name="channelCacheSize" value="5" />
		<!-- 開啟發送確認機制 -->
		<property name="publisherConfirms" value="true"/>
		<!-- 開啟結果返回機制 -->
		<property name="publisherReturns" value="true"/>
	</bean>

2.2、配置RabbitAdmin

<rabbit:admin connection-factory="connectionFactory" />

2.3、定義Queue

<rabbit:queue name="queueTest" durable="true" auto-delete="false"
exclusive="false" />

常用引數解釋:

durable:是否持久化

auto-delete:是否當沒有連線時自動刪除

exclusive:是否只能由建立者使用

2.4、定義Exchange,並繫結Queue

(1)定義direct型別

<rabbit:direct-exchange name="exchangeTest"
		durable="true" auto-delete="false">
	<rabbit:bindings>
<!-- 此處沒有指定RoutingKey -->
			<rabbit:binding queue="queueTest" key=""></rabbit:binding>
		</rabbit:bindings>
</rabbit:direct-exchange>

(2)定義topic型別

<rabbit:topic-exchange name="topicexchangetest" durable="true" auto-delete="false">
		<rabbit:bindings>
			<rabbit:binding queue="" key=""></rabbit:binding>
		</rabbit:bindings>
</rabbit:topic-exchange>

(3)定義fanout型別

<rabbit:fanout-exchange name=""></rabbit:topic-exchange>

(4)定義headers型別

<rabbit:headers-exchange name=""></rabbit:topic-exchange>

常用引數解釋:

durable:是否持久化

auto-delete:是否當沒有連線時自動刪除

注意:在此例中,如果不把Exchange和queue進行繫結,傳送訊息的時候ConfirmCallback依然正常執行,因為訊息到達了Exchange。但是ReturnCallback就會執行回撥方法,傳回錯誤資訊:NO_ROUTEExchange沒有找到指定的Queue,丟棄這條訊息並把訊息返回給生產者。回撥順序是ReturnCallback在前,ConfirmCallback在後。

2.5、定義rabbit template

(1)普通方式配置rabbitTemplate

<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate" id="rabbitTemplate">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="exchange" value="exchangeTest" />
		
		<!-- 啟動AMQP協議層面事務機制來解決傳送確認機制,但是採用事務機制實現會降低RabbitMQ的訊息吞吐量。
		RabbitMQ團隊為我們拿出了更好的方案,即採用傳送方確認模式,事務機制和confirmCallback只能二選一 -->
		<!-- <property name="channelTransacted" value="true" /> -->

		<!-- mandatory 監聽是否有符合的佇列 -->
		<property name="mandatory" value="true"/>
		
		<!-- 設定傳送確認回執監聽方法 -->
		<property name="confirmCallback" ref="confirmcallback" /> 
		<!-- 設定結果返回監聽方法 -->
		<property name="returnCallback" ref="MyReturnCallback"/>
		<!-- 設定訊息轉換 -->
		<property name="messageConverter" ref="JsonMessageConverter" />
	</bean>

(2)<rabbit/>方式配置rabbitTemplate和相關監聽器

<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" 
		exchange="exchangeTest" 
		mandatory="true" 
		confirm-callback="confirmcallback"
		return-callback="MyReturnCallback" 
		encoding="UTF-8"
		message-converter="JsonMessageConverter"
		>
	</rabbit:template>

常用引數解釋:

mandatory:監聽是否有符合的佇列

confirm-callback:訊息確認回撥類

return-callback:mandatory監聽結果回撥類

encoding:編碼

message-converter:訊息轉化類

(1)其他相關配置

<bean id="JsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter">
    <!-- if necessary, override the DefaultClassMapper -->
    <property name="classMapper" ref="customClassmapper" />
</bean>
<bean id="customClassmapper" class="pruducer.com.taikang.tkh.message.common.CustomClassMapper"/>

<bean id="confirmcallback" class="pruducer.com.taikang.tkh.message.common.ConfirmReturnBack"/>

<bean id="MyReturnCallback" class="pruducer.com.taikang.tkh.message.common.MyReturnCallback"/>


2.6配置Consumer

<rabbit:listener-container  
	prefetch="10" 
	connection-factory="connectionFactory"  
	acknowledge="manual">
	<rabbit:listener  queues="queueTest" ref="messageReceiver" />
</rabbit:listener-container>

<!-- 自定義訊息接收者 -->
<bean id="messageReceiver" class="consumer.com.taikang.tkh.message.listener.MessageConsumer"></bean>

常用引數解釋:

prefetch:訊息預取數目為每次接收10條。

acknowledge:acknowledge="manual":意為表示該消費者的ack方式為手動 ;acknowledge="auto"表示自動。

3、spring-rabbit常用方法介紹

3.1、訊息傳送

/**
* 傳送訊息到預設的交換機和佇列(不帶有自定義引數)
* @param messageObject 訊息物件
* @return boolean 傳送標記 
*/
RabbitTemplate.convertAndSend(messageObject);
 
/**
* 傳送訊息到預設的交換機和佇列
* @param messageObject 訊息物件
* @param messageObject 自定義引數,在監聽器ConfirmCallback中可以取到。
* @return boolean 傳送標記 
*/
RabbitTemplate.correlationConvertAndSend(messageObject,correlationdata);
 
/**
* 傳送訊息到指定的佇列
* @param queue           佇列名稱
* @param messageObject   訊息物件
* @param messageObject 自定義引數,在監聽器ConfirmCallback中可以取到。
* @return boolean 傳送標記 
*/
RabbitTemplate.convertAndSend(queue, messageObject,correlationdata);
 
/**
* 傳送訊息到指定的交換機和佇列
* @param exchange       交換機名稱
* @param queue          佇列名稱
* @param messageObject 自定義引數,在監聽器ConfirmCallback中可以取到。
* @return boolean 傳送標記 
*/
RabbitTemplate.convertAndSend(exchange,queue,messageObject,correlationdata);
 
/**
* 傳送訊息到預設的交換機和佇列(不帶有自定義引數)
Send方法還有很多,此處只列舉一種
* @param Message AMQP封裝的訊息物件
* @return void 
*/
RabbitTemplate.send(Message message);

注意凡是帶有convert的系統都會自動把訊息轉換為AMQP的Message物件;

沒有convert的需要自己將傳送的物件轉換為Message物件。

3.2、Confirm監聽

public class ConfirmReturnBack implements ConfirmCallback{
	/**
	 * Confirmation callback.
	 * @param correlationData 回撥的相關資料.
	 * @param ack true for ack, false for nack
	 * @param cause 專門給NACK準備的一個可選的原因,其他情況為null。
	 */
	public void confirm(CorrelationData correlationdata, boolean ack, String cause) {
		System.out.println("Exchange接收是否成功(ack): " + ack + "。 返回的使用者引數(correlationData): " + correlationdata + "。NACK原因(cause) : " + cause);
	}
}

注意CorrelationData 是在傳送訊息時傳入回撥方法的引數,可以用於區分訊息物件。 CorrelationData物件中只有一個屬性 String id。通過這個引數,我們可以區分當前是傳送哪一條訊息時的回撥,並通過ack引數來進行失敗重發功能。

3.3、Return監聽

/**
 * 實現此方法在basicpublish失敗時回撥
 * 相當於 ReturnListener的功能。
 * 在釋出訊息時設定mandatory等於true,
 * 監聽訊息是否有相匹配的佇列,沒有時ReturnCallback將執行returnedMessage方法,訊息將返給傳送者 
 */
public class MyReturnCallback implements ReturnCallback {

	public void returnedMessage(Message message, int replyCode, String replyText, 
			String exchange, String routingKey) {
		try {
			System.out.println("訊息傳送進指定佇列失敗:失敗原因(+replyText):"+replyText
					+";錯誤程式碼(replyCode):"+replyCode
					+";訊息物件:"+new String(message.getBody(),"UTF-8")
					+"exchange:"+exchange
					+"routingKey:"+routingKey);
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
	}
}

3.4、訊息處理

/**
 * 訊息到達消費者監聽類
 */
public class MessageConsumer implements ChannelAwareMessageListener {
	/**
	 * 處理收到的rabbit訊息的回撥方法。
	 * @param message AMQP封裝訊息物件
	 * @param channel 通道物件,可以進行確認回覆
	 * @throws Exception Any.
	 */
	public void onMessage(Message message, Channel channel) throws Exception {
		try {
			String srt2=new String(message.getBody(),"UTF-8");
			System.out.println("消費者收到訊息:"+srt2);
			//成功應答
			channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
		} catch (Exception e) {
			e.printStackTrace();
			//失敗應答
			channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
		}
	}
}

注意ChannelAwareMessageListenerMessageListener兩個介面都實現對訊息到達消費者時的監聽,只不過MessageListeneronMessage方法沒有Channel引數,不能實現訊息手動應答功能。

4、Demo專案完整程式碼

【四川樂山程式設計師聯盟,歡迎大家加群相互交流學習5 7 1 8 1 4 7 4 3】

相關推薦

RabbitMQ學習Spring整合RabbitMQ

1、maven依賴 <!-- rabbitMQ依賴 --> <!-- spring相關的其他依賴請參考原始碼,此處不做過多描述 --> <dependency>

學習之路-RabbitMQSpringBoot整合RabbitMQ

一:引入RabbitMQ的相關jar包: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp

SpringSpring整合Hibernate

ng- checkout wait 哪些 check driver eas package class 背景:   本文主要介紹使用spring-framework-4.3.8.RELEASE與hibernate-release-5.2.9.Final項目整合搭建

rabbitmq學習rabbitmq之扇形交換機、主題交換機

 前言 上篇我們學習了rabbitmq的作用以及直連交換機的程式碼實現,這篇我們繼續看如何用程式碼實現扇形交換機和主題交換機 一、扇形交換機   1.生產者    /** * 生產者 */ public class LogProducer { //交換機名稱 pu

RabbitMQ學習Java使用RabbitMQ要點知識

1、maven依賴 <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId>

Spring boot入門SpringBoot整合結合AdminLTE(Freemarker),利用generate自動生成程式碼,利用DataTable和PageHelper進行分頁顯示

  關於SpringBoot和PageHelper,前篇部落格已經介紹過Spring boot入門(二):Spring boot整合MySql,Mybatis和PageHelper外掛,前篇部落格大致講述了SpringBoot如何整合Mybatis和Pagehelper,但是沒有做出實際的範例,本篇部落格是連

SpringBoot學習筆記SpringBoot整合Mybatis、SpringBoot事務管理、SpringBoot多資料來源

SpringBoot整合Mybatis 第一步我們需要在pom.xml裡面引入mybatis相關的jar包 <dependency> <groupId>org.mybatis.spring.boot</groupId> <artif

Spring Boot學習定時任務

一、簡介 專案中經常會遇到使用定時任務的情況,那麼SpringBoot如何實現這種需求呢,還是簡潔到要命。 二、環境準備 eclipse + maven + Spring Boot 三、程式碼示例 pom.xml檔案不用特殊引入依賴包,保持一般初始化就可以,如下

webService學習之路springMVC整合CXF後呼叫已知的wsdl介面

本篇文章將講解SpringMVC+CXF環境下,怎麼呼叫其他系統通過webService方式暴露出來的介面 ① 為避免懷疑同一個專案中呼叫本專案的介面,這裡我新開啟一個eclipse通過最原始的方式釋出了一個webservice並啟動保證可以被訪問 開啟瀏覽器確認可以被訪問 ②進入CXF/bin

Spring Boot系列Spring Boot整合Mybatis原始碼解析

一、Mybatis回顧   1、MyBatis介紹   Mybatis是一個半ORM框架,它使用簡單的 XML 或註解用於配置和原始對映,將介面和Java的POJOs(普通的Java 物件)對映成資料庫中的記錄。   2、Mybatis整體架構  二、Spring Boot整合Mybatis +

RabbitMQ學習訂閱/發布

cto submit actor nal chan true exec oid lsp RabbitMQ學習(三)訂閱/發布 1.RabbitMQ模型 前面所學都只用到了生產者、隊列、消費者。如上圖所示,其實生產者並不直接將信息傳輸到隊列中,在生產者和隊列

RabbitMQ學習遠程結果調用

cells actor ble 隨機 get getenv all 求和 int 場景:我們需要在傳輸消息時得到結果 客服端在發送請求時會發送回調隊列,服務端處理事情完成後會將結果返回到回調隊列中,在增加關聯標誌關聯每個請求和服務返回 客戶端代碼: public

vue移動音樂app開發學習輪播圖組件的開發

hub out webapp width eth reat slot utc -1 本系列文章是為了記錄學習中的知識點,便於後期自己觀看。如果有需要的同學請登錄慕課網,找到Vue 2.0 高級實戰-開發移動端音樂WebApp進行觀看,傳送門。 完成後的頁面狀態以及項目結構如

Sping Boot入門到實戰之入門篇Spring Boot屬性配置

git 測試 add 禁用 rop fix ron org set   該篇為Sping Boot入門到實戰系列入門篇的第三篇。介紹Spring Boot的屬性配置。   傳統的Spring Web應用自定義屬性一般是通過添加一個demo.properties配置文件(

Spring學習3Spring概述轉載

效率 調度 jpa 源代碼 一個 維護 html www hiberna 1. Spring是什麽?   Spring是一個開源的輕量級Java SE(Java 標準版本)/Java EE(Java 企業版本)開發應用框架,其目的是用於簡化企業級應用程序開發。   在面向對

PE檔案格式學習匯出表

1.回顧 上篇文章中介紹過,可選頭中的資料目錄表是一個大小為0x10的陣列,匯出表就是這個陣列中的第一個元素。 我們再回顧下資料目錄表的結構體: struct _IMAGE_DATA_DIRECTORY {     DWORD VirtualAddress;    

TensorFlow學習tf.reduce_sum()

壓縮求和 tf.reduce_sum( input_tensor, axis=None, keepdims=None, name=None, reduction_indices=None, keep_dims=None ) Args:

網頁開發學習表單

表單是網頁中提供的一種互動式操作手段,無論是提交搜尋的資訊,還是網上註冊等都需要使用表單。使用者可以通過提交表單資訊與伺服器進行動態交流。表單主要可以分為兩部分:一是HTML原始碼描述的表單;二是提交後的表單處理,需要使用伺服器端編寫好 JSP等程式碼對客戶端提交的資訊作出迴應。

ionic學習建立pages頁面

1.建立命令:ionic g page 名稱 2. app.module.ts 引入宣告元件 3.如果跳轉,在跳轉的ts中引入元件 步驟: 1.新建頁面news元件 2.app.module.ts 中引入宣告 3.在home頁面中

rabbitmq學習利用rabbitmq實現遠端rpc呼叫

一、rabbitmq實現rpc呼叫的原理 ·rabbitmq實現rpc的原理是:客戶端向一個佇列中傳送訊息,並註冊一個回撥的佇列用於接收服務端返回的訊息,該訊息需要宣告一個叫做correaltionId的屬性,該屬性將是該次請求的唯一標識。服務端在接受到訊息(在需要時可以驗證correaltionId)後,