SpringBoot整合RabbitMQ(一)快速入門
MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。MQ是消費-生產者模型的一個典型的代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取佇列中的訊息。訊息中介軟體最主要的作用是解耦,中介軟體最標準的用法是生產者生產訊息傳送到佇列,消費者從佇列中拿取訊息並處理,生產者不用關心是誰來消費,消費者不用關心誰在生產訊息,從而達到解耦的目的。在分散式的系統中,訊息佇列也會被用在很多其它的方面,比如:分散式事務的支援,RPC的呼叫等等。
RabbitMQ介紹
RabbitMQ是實現AMQP(高階訊息佇列協議)的訊息中介軟體的一種,最初起源於金融系統,用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。RabbitMQ主要是為了實現系統之間的雙向解耦而實現的。當生產者大量產生資料時,消費者無法快速消費,那麼需要一箇中間層。儲存這個資料。
AMQP,即Advanced Message Queuing Protocol,高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。訊息中介軟體主要用於元件之間的解耦,訊息的傳送者無需知道訊息使用者的存在,反之亦然。AMQP的主要特徵是面向訊息、佇列、路由(包括點對點和釋出/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,伺服器端用Erlang語言編寫,支援多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支援AJAX。用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。
相關概念
通常我們談到佇列服務, 會有三個概念: 發訊息者、佇列、收訊息者,RabbitMQ 在這個基本概念之上, 多做了一層抽象, 在發訊息者和 佇列之間, 加入了交換器 (Exchange). 這樣發訊息者和佇列就沒有直接聯絡, 轉而變成發訊息者把訊息給交換器, 交換器根據排程策略再把訊息再給佇列。
那麼,其中比較重要的概念有 4 個,分別為:虛擬主機,交換機,佇列,和繫結。
- 虛擬主機:一個虛擬主機持有一組交換機、佇列和繫結。為什麼需要多個虛擬主機呢?很簡單,RabbitMQ當中,使用者只能在虛擬主機的粒度進行許可權控制。 因此,如果需要禁止A組訪問B組的交換機/佇列/繫結,必須為A和B分別建立一個虛擬主機。每一個RabbitMQ伺服器都有一個預設的虛擬主機“/”。
- 交換機:Exchange 用於轉發訊息,但是它不會做儲存 ,如果沒有 Queue bind 到 Exchange 的話,它會直接丟棄掉 Producer 傳送過來的訊息。 這裡有一個比較重要的概念:路由鍵 。訊息到交換機的時候,互動機會轉發到對應的佇列中,那麼究竟轉發到哪個佇列,就要根據該路由鍵。
- 繫結:也就是交換機需要和佇列相繫結,這其中如上圖所示,是多對多的關係。
四種交換機(Exchange)
交換機的功能主要是接收訊息並且轉發到繫結的佇列,交換機不儲存訊息,在啟用ack模式後,交換機找不到佇列會返回錯誤。交換機有四種類型:Direct, topic, Headers and Fanout
1. Direct Exchange
direct 型別的行為是"先匹配, 再投送". 即在繫結時設定一個 routing_key, 訊息的routing_key 匹配時, 才會被交換器投送到繫結的佇列中去.是RabbitMQ預設的交換機模式,也是最簡單的模式,根據key全文匹配去尋找佇列。
配置:設定一個路由鍵
public static final String QUEUE="queue";
/**
* direct 交換機模式
*/
@Bean
public Queue queue(){
return new Queue(QUEUE,true);
}
複製程式碼
傳送服務:
@Service
@Slf4j
public class MQSender {
@Autowired
AmqpTemplate amqpTemplate;
public void send(Object message){
String msg = (String) message;
log.info("send msg"+message);
amqpTemplate.convertAndSend(MQConfig.QUEUE,msg);
}
}
複製程式碼
接收服務:
@Service
@Slf4j
public class MQReceiver {
//監聽的queue
@RabbitListener(queues = MQConfig.QUEUE)
public void receive(String msg){
log.info("receive msg "+msg);
}
}
複製程式碼
測試:
@Autowired
private MQSender sender;
sender.send("hello direct Exchange");
複製程式碼
2. Topic Exchange
按規則轉發訊息(最靈活) 轉發訊息主要是根據萬用字元。 在這種交換機下,佇列和交換機的繫結會定義一種路由模式,那麼,萬用字元就要在這種路由模式和路由鍵之間匹配後交換機才能轉發訊息。
路由鍵必須是一串字元,用句號(.) 隔開,
路由模式必須包含一個 星號(*),主要用於匹配路由鍵指定位置的一個單詞, 井號(#)就表示相當於一個或者多個單詞
配置類:
public static final String TOPIC_QUEUE1="topic.queue1";
public static final String TOPIC_QUEUE2="topic.queue2";
public static final String ROUTING_KEY1="topic.key1";
public static final String ROUTING_KEY2="topic.#";
/**
* Topic 交換機模式 可以用萬用字元
*/
@Bean
public Queue topicQueue1(){
return new Queue(TOPIC_QUEUE1,true);
}
@Bean
public Queue topicQueue2(){
return new Queue(TOPIC_QUEUE2,true);
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Binding topicBinding1(){
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(ROUTING_KEY1);
}
@Bean
public Binding topicBinding2(){
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(ROUTING_KEY2);
}
複製程式碼
傳送類:
public void sendTopic(Object message){
String msg = (String) message;
log.info("send topic message"+msg);
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE,"topic.key1",msg+"1");
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE,"topic.key2",msg+"2");
}
複製程式碼
接收類:
@RabbitListener(queues = MQConfig.TOPIC_QUEUE1)
public void receiveTopic1(String msg){
log.info("receive topic1 msg "+msg);
}
複製程式碼
測試:
@Autowired
private MQSender sender;
sender.sendTopic("hello topic Exchange");
複製程式碼
3. Headers Exchange
設定header attribute引數型別的交換機,相較於 direct 和 topic 固定地使用 routing_key , headers 則是一個自定義匹配規則的型別. 在佇列與交換器繫結時, 會設定一組鍵值對規則, 訊息中也包括一組鍵值對( headers 屬性), 當這些鍵值對有一對, 或全部匹配時, 訊息被投送到對應佇列.
public static final String HEADER_EXCHANGE="headerExchange";
/**
* Header 交換機模式
*/
@Bean
public HeadersExchange headersExchange(){
return new HeadersExchange(HEADER_EXCHANGE);
}
@Bean
public Queue headerQueue(){
return new Queue(HEADER_QUEUE2,true);
}
// 繫結需要指定header,如果不匹配 則不能使用
@Bean
public Binding headerBinding(){
Map<String,Object> map = new HashMap();
map.put("header1","value1");
map.put("header2","value2");
return BindingBuilder.bind(headerQueue()).to(headersExchange()).whereAll(map).match();
}
複製程式碼
public void sendHeader(Object massage){
String msg = (String) massage;
log.info("send fanout message: "+msg);
MessageProperties properties = new MessageProperties();
properties.setHeader("header1","value1");
properties.setHeader("header2","value2");
Message obj = new Message(msg.getBytes(),properties);
amqpTemplate.convertAndSend(MQConfig.HEADER_EXCHANGE,"",obj);
}
複製程式碼
用MessageProperties來新增Header資訊,然後與接收者的header比對。我都設定的是"header1","value1";"header2","value2"
//監聽 header模式的queue
@RabbitListener(queues = MQConfig.HEADER_QUEUE2)
//因為傳送的是 byte 型別,所以接受也是該資料型別
public void receiveHeader(byte[] message){
log.info("header queue message"+new String(message));
}
複製程式碼
測試:
@Autowired
private MQSender sender;
sender.sendHeader("hello header Exchange");
複製程式碼
4. Fanout Exchange
轉發訊息到所有繫結佇列,訊息廣播的模式,不管路由鍵或者是路由模式,會把訊息發給繫結給它的全部佇列,如果配置了routing_key會被忽略。
public static final String FANOUT_EXCHANGE="fanoutExchange";
/**
* Fanout 交換機模式(廣播模式),不用繫結key
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Binding fanoutBinding1(){
return BindingBuilder.bind(topicQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2(){
return BindingBuilder.bind(topicQueue2()).to(fanoutExchange());
}
複製程式碼
public void sendFanout(Object massage){
String msg = (String) massage;
log.info("send fanout message: "+msg);
amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE,"",msg);
}
複製程式碼
測試:
@Autowired
private MQSender sender;
sender.sendFanout("hello fanout Exchange");
複製程式碼
補充
這個示例是基於springboot的示例。
pom依賴
<!--rabbbitMQ相關依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
複製程式碼
配置檔案
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
# 這個賬號密碼只能連線本地的mq,遠端的話需要配置
virtual-host: /
listener:
simple:
concurrency: 10
max-concurrency: 10
prefetch: 1 # 從佇列每次取一個
auto-startup: true
default-requeue-rejected: true # 失敗後重試
複製程式碼
後續會用它來實現一個小小的搶票功能。