1. 程式人生 > 實用技巧 >訊息佇列RabbitMQ

訊息佇列RabbitMQ

一、RabbitMQ簡介

1. 應用場景

(1)任務非同步處理

​ 將不需要同步處理的並且耗時長的操作由訊息佇列通知訊息接收方進行非同步處理。縮短了應用程式的響應時間。

(2)應用程式解耦合

​ MQ相當於一箇中介,生產方通過MQ與消費方互動,它將應用程式進行了解耦合。

2. 工作原理

​ 下圖是RabbitMQ的基本結構:

![](E:\部落格園\Spring Boot\img\22.png)

組成部分說明如下:

  • Broker:訊息佇列服務程序,此程序包括兩個部分:Exchange和Queue。

  • Exchange:訊息佇列交換機,按一定的規則將訊息路由轉發到某個佇列,對訊息進行過濾。

  • Queue:訊息佇列,儲存訊息的佇列,訊息到達佇列並轉發給指定的消費方。

  • Producer:訊息生產者,即生產方客戶端,生產方客戶端將訊息傳送到MQ。

  • Consumer:訊息消費者,即消費方客戶端,接收MQ轉發的訊息。

訊息釋出接收流程:

-----傳送訊息-----

(1)生產者和Broker建立TCP連線。

(2)生產者和Broker建立通道。

(3)生產者通過通道將訊息傳送給Broker,由Exchange將訊息進行轉發。

(4)Exchange將訊息轉發到指定的Queue(佇列)

----接收訊息-----

(1)消費者和Broker建立TCP連線。

(2)消費者和Broker建立通道。

(3)消費者監聽指定的Queue(佇列)。

(4)當有訊息到達Queue時Broker預設將訊息推送給消費者。

(5)消費者接收到訊息。

二、RabbitMQ的6種工作模式

1. Work queues(工作佇列模式)

![](E:\部落格園\Spring Boot\img\23.png)

(1)一條訊息只會被一個消費者接收。

(2)RabbitMQ採用輪詢的方式將訊息平均傳送給消費者。

(3)消費者在處理完某條訊息後,才會收到下一條訊息。

應用場景:

對於任務過重或任務較多的情況使用工作佇列可以提高任務處理的速度。

2. Publish/subscribe(釋出訂閱模式)

![](E:\部落格園\Spring Boot\img\24.png)

(1)每個消費者監聽自己的佇列。

(2)生產者將訊息發給broker,由交換機將訊息轉發到繫結此交換機的每個佇列,每個繫結交換機的佇列都將接收到訊息。

【注1】 publish/subscribe與work queues有什麼區別?

區別:

  • work queues不用定義交換機,而publish/subscribe需要定義交換機。

  • publish/subscribe的生產方是面向交換機發送訊息,work queues的生產方是面向佇列傳送訊息(底層使用預設交換機)。

  • publish/subscribe需要設定佇列和交換機的繫結,work queues不需要設定,實質上work queues會將佇列繫結到預設的交換機 。

相同點:

  • 兩者實現的釋出/訂閱的效果是一樣的,多個消費端監聽同一個佇列不會重複消費訊息。

【注2】實際工作中用publish/subscribe還是work queues?

​ 建議使用 publish/subscribe,釋出訂閱模式比工作佇列模式更強大,並且釋出訂閱模式可以指定自己專用的交換機。

3. Routing(路由工作模式)

![](E:\部落格園\Spring Boot\img\25.png)

(1)每個消費者監聽自己的佇列,佇列設定routingkey。

(2)生產者將訊息發給交換機,傳送訊息時需要指定routingkey,由交換機根據routingkey來轉發訊息到指定的佇列。

【注1】Routing與Publish/subscibe有什麼區別?

  • Routing要求佇列在繫結交換機時要指定routingkey,訊息會轉發到符合routingkey的佇列中。

4. Topic(萬用字元工作模式)

![](E:\部落格園\Spring Boot\img\26.png)

(1)每個消費者監聽自己的佇列,佇列設定帶萬用字元的routingkey。

(2)生產者將訊息發給broker,由交換機根據routingkey來轉發訊息到指定的佇列。

萬用字元規則:

​ 中間以“.”分隔; 符號#可以匹配多個詞,符號*可以匹配一個詞語。

【注】Topic模式更多加強大,它可以實現Routing、publish/subscirbe模式的功能。

5. Header模式

​ header模式與routing不同的地方在於,header模式取消了routingkey,使用header中的 key/value(鍵值對)匹配佇列。

6. RPC

![](E:\部落格園\Spring Boot\img\27.png)

RPC即客戶端遠端呼叫服務端的方法 ,使用MQ可以實現RPC的非同步呼叫,基於Direct交換機實現,流程如下:

(1)客戶端既是生產者也是消費者,向RPC請求佇列傳送RPC呼叫訊息,同時監聽RPC響應佇列。

(2)服務端監聽RPC請求佇列的訊息,收到訊息後執行服務端的方法,得到方法返回的結果。

(3)服務端將RPC方法返回的結果傳送到RPC響應佇列。

(4)客戶端(RPC呼叫方)監聽RPC響應佇列,接收到RPC呼叫的結果。

三、Spring Boot整合RabbitMQ

1. 新增依賴

<dependency> 
	<groupId>org.springframework.boot</groupId> 
	<artifactId>spring‐boot‐starter‐amqp</artifactId> 
</dependency> 
<dependency> 
	<groupId>org.springframework.boot</groupId> 
	<artifactId>spring‐boot‐starter‐test</artifactId> 
</dependency> 
<dependency> 
	<groupId>org.springframework.boot</groupId> 
	<artifactId>spring‐boot‐starter‐logging</artifactId> 
</dependency>

2. 配置

(1)配置application.yml (配置連線RabbitMQ的引數)

server: port: 
	44000 
spring: 
	application: 
		name: test‐rabbitmq‐producer 
	rabbitmq: 
		host: 127.0.0.1 
		port: 5672 
		username: guest 
		password: guest 
		virtualHost: /

(2)定義RabbitConfifig類,配置Exchange、Queue、及繫結交換機(本例配置Topic交換機)

package com.xuecheng.test.rabbitmq.config; 

import org.springframework.amqp.core.*; 
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration;

@Configuration 
public class RabbitmqConfig { 
	public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; 
	public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; 
	public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; 
	
	/**
	* 交換機配置 
	* ExchangeBuilder提供了fanout、direct、topic、header交換機型別的配置 
	* @return the exchange 
	*/ 
	@Bean(EXCHANGE_TOPICS_INFORM) 
	public Exchange EXCHANGE_TOPICS_INFORM() { 
		//durable(true)持久化,訊息佇列重啟後交換機仍然存在 
		return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); 
	}
	//宣告佇列 
	@Bean(QUEUE_INFORM_SMS) 
	public Queue QUEUE_INFORM_SMS() { 
		Queue queue = new Queue(QUEUE_INFORM_SMS); 
		return queue; 
	}
	//宣告佇列 
	@Bean(QUEUE_INFORM_EMAIL) 
	public Queue QUEUE_INFORM_EMAIL() { 
		Queue queue = new Queue(QUEUE_INFORM_EMAIL); 
		return queue; 
	}
	/** channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#"); 	   * 繫結佇列到交換機 . 
	 *
	 * @param queue the queue 
	 * @param exchange the exchange 
	 * @return the binding 
	 */ 
	@Bean 
	public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { 
		return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs(); 
	}
	@Bean 
	public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) { 
		return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs(); 		
	} 
}

3. 生產端

使用RarbbitTemplate傳送訊息

package com.xuecheng.test.rabbitmq; 

import com.xuecheng.test.rabbitmq.config.RabbitmqConfig; 
import org.junit.Test; import org.junit.runner.RunWith; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.boot.test.context.SpringBootTest; 
import org.springframework.test.context.junit4.SpringRunner; 

@SpringBootTest 
@RunWith(SpringRunner.class) 
public class Producer05_topics_springboot { 
	@Autowired 
	RabbitTemplate rabbitTemplate; 
	
	@Test 
	public void testSendByTopics(){ 
		for (int i=0;i<5;i++){ 
			String message = "sms email inform to user"+i;
       		rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"inform.sms.email",message); 
          System.out.println("Send Message is:'" + message + "'"); 
		} 
	} 
}

4. 消費端

建立消費端工程,新增依賴:

<dependency> 
	<groupId>org.springframework.boot</groupId> 
	<artifactId>spring‐boot‐starter‐amqp</artifactId> 
</dependency> 
<dependency> 
	<groupId>org.springframework.boot</groupId> 
	<artifactId>spring‐boot‐starter‐test</artifactId> 
</dependency> 
<dependency> 
	<groupId>org.springframework.boot</groupId> 
	<artifactId>spring‐boot‐starter‐logging</artifactId> 
</dependency>

使用@RabbitListener註解監聽佇列:

package com.xuecheng.test.rabbitmq.mq; 

import com.rabbitmq.client.Channel; 
import com.xuecheng.test.rabbitmq.config.RabbitmqConfig; 
import org.springframework.amqp.core.Message; 
import org.springframework.amqp.rabbit.annotation.RabbitListener; 
import org.springframework.stereotype.Component; 
@Component 
public class ReceiveHandler { 
	
	//監聽email佇列 
	@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL}) 
	public void receive_email(String msg,Message message,Channel channel){ 			     	System.out.println(msg); 
	}
	
	//監聽sms佇列 
	@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS}) 
	public void receive_sms(String msg,Message message,Channel channel){ 	        		System.out.println(msg); 
	} 
}

5. 測試

![](E:\部落格園\Spring Boot\img\28.png)