spring boot 整合RabbitMQ
阿新 • • 發佈:2019-01-05
首先介紹下MQ1:MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法2:MQ是消費-生產者模型的一個典型的代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取或者訂閱佇列中的訊息。3:在專案中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量。4:MQ則是遵循了AMQP協議的具體實現和產品。5:AMQP,即Advanced Message Queuing Protocol,一個提供統一訊息服務的應用層標準高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。基於此協議的客戶端與訊息中介軟體可傳遞訊息,並不受客戶端/中介軟體不同產品,不同的開發語言等條件的限制.Erlang中的實現有 RabbitMQ.RabbitMQ的工作流程如下所示 前面是生產者,中間是RabbitMQ,後面是消費者。MQ還分為交換機(exchange)和佇列(queues)。兩者的關係是佇列繫結到交換機上。具體流程是:生產者生產訊息傳送到交換機,交換機負責把訊息轉發到與自己繫結的佇列上(交換機不負責儲存),消費者從自己監聽的佇列中取出訊息並且消費。這裡還存在兩個機制,一個是Confirm機制,一種是ACK機制。前一種是生產者傳送給MQ時,是否傳送成功的確認。後者是訊息被消費者拿到是否手動確認MQ刪除此訊息。兩者都可以不設定,Confirm不設定,表示無論生產者是否成功傳送訊息到MQ,都不做處理,這裡需要自己程式碼實現。這樣的缺點就是,在向某個交換機發送訊息時,由於某些原因沒成功,又沒設定Confirm,造成訊息丟失。ACK不設定,表示noack即不確認,只要消費者拿到訊息,MQ就會刪除佇列中的訊息,無論消費者是否成功消費。這樣的缺點是,當消費者拿到訊息後沒有消費成功,此訊息已經在MQ中刪除了,造成訊息丟失。這兩種機制都在後面的程式碼中有所體現。下面介紹下交換機的型別,有四種類型,分別為Direct,Topic,headers,Fanout. 1:Direct是RabbitMQ預設的交換機模式,也是最簡單的模式.即建立訊息佇列的時候,指定一個BindingKey.當傳送者傳送訊息的時候,指定對應的Key.當Key和訊息佇列的BindingKey一致的時候,訊息將會被髮送到該訊息佇列中.eg:這是一個完整key的匹配。如果一個佇列繫結到交換機上要求路由鍵位"dog",則只有被標記為"dog"的訊息才被轉發到該佇列上,不會轉發"dog.1"和"dog.1.2",只會轉發"dog"。2:Topic轉發資訊主要是依據萬用字元,佇列和交換機的繫結主要是依據一種模式(萬用字元+字串),而當傳送訊息的時候,只有指定的Key和該模式相匹配的時候,訊息才會被髮送到該訊息佇列中. eg:佇列需要繫結在一個模式上,符號“#”匹配一個或多個詞,符號“*”只能匹配一個詞。因為"dog.#"能匹配到"dog.1.2",但是“dog.*”只能匹配到“dog.1”3:headers也是根據一個規則進行匹配,在訊息佇列和交換機繫結的時候會指定一組鍵值對規則,而傳送訊息的時候也會指定一組鍵值對規則,當兩組鍵值對規則相匹配的時候,訊息會被髮送到匹配的訊息佇列中.4:Fanout是路由廣播的形式,將會把訊息發給繫結它的全部佇列,即便設定了key,也會被忽略.spring boot 整合 RabbitMQ基於maven,建立兩個工程,一個生產者和一個消費者兩者的pom檔案是一樣的,看下生產者的
還有一處是一樣的RabbitMQConfig類,程式碼如下:
解決問題1:在本地寫個快取,把每次要傳送的資料快取到本地,生產者接收到MQ發回的確認資訊後,刪除本地快取資料。開啟一個執行緒去處理快取中的資料,一定時間內,遍歷快取資料重新發送。解決問題2:生產者除了傳送訊息外,再為每個訊息生成個id,消費者根據id是否相同決定是否消費此訊息。用到一個map,每次拿到id先判斷map中是否存在此id,存在表明已經消費過,直接ack讓MQ刪除訊息,map不存在此id,把id存入map後消費。生產者本地快取程式碼:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
<scope>true</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- 新增springboot對amqp的支援 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-jasper</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
這裡注意一點是添加了springboot的對amqp的支援配置檔案application.properties如下,同樣生產者和消費者配置是一樣的。#MQ連結相關---開始 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin spring.rabbitmq.virtualHost=/taotao #MQ連結相關---結束 #MQ ack相關---開始 #客戶端確認模式,客戶端傳送訊息到mq,mq會非同步返回是否收到訊息。 #預設沒有確認,無論傳送是否成功,客戶端都不會知曉 spring.rabbitmq.publisher-confirms=true #服務端確認模式,消費者消費成功向mq傳送刪除已經消費訊息的資訊。 #預設消費者接收到訊息,mq就會刪除 spring.rabbitmq.listener.acknowledge-mode=MANUAL #MQ ack相關---結束
還有一處是一樣的RabbitMQConfig類,程式碼如下:
package cn.rabbitmq.example; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { // 建立Topic型別交換機 @Bean public TopicExchange defaultExchange() { // 第一個引數交換機名稱 // 第二個引數是否交換機久化,true為持久化,建立後重啟依然存在 // 第三個引數是在交換機不在使用的情況下,自動刪除 return new TopicExchange("TopicExchange", true, false, null); } // 建立佇列 @Bean public Queue queue1() { // 第一個引數佇列名稱 // 第二個引數是否列隊持久化,true為持久化,建立後重啟依然存在 return new Queue("queue3", true); } @Bean public Queue queue2() { return new Queue("queue4", true); } // 交換機與佇列繫結,並且設定了路由匹配 @Bean public Binding binding1() { return BindingBuilder.bind(queue1()).to(defaultExchange()).with("item.*"); } // 交換機與佇列繫結,並且設定了路由匹配 @Bean public Binding binding2() { return BindingBuilder.bind(queue2()).to(defaultExchange()).with("item.#"); } }具體的解釋在程式碼中,這裡定義了一個交換機和兩個佇列,並且兩個佇列都繫結到了交換機上,只是路由萬用字元不同。接下來就是傳送和消費了。但是其中有些問題是無法避免的。1:生產者傳送訊息到MQ然後返回,MQ接收到了後非同步去向生產者確認,但是在MQ確認接收到時,網路出現問題無法向生產者確認這時應該怎麼辦?生產者不知道MQ是否收到,這時必須重新發送。2:上一個問題可能導致MQ中的訊息是重複的,消費者消費到重複的資料,消費者應該去重。
解決問題1:在本地寫個快取,把每次要傳送的資料快取到本地,生產者接收到MQ發回的確認資訊後,刪除本地快取資料。開啟一個執行緒去處理快取中的資料,一定時間內,遍歷快取資料重新發送。解決問題2:生產者除了傳送訊息外,再為每個訊息生成個id,消費者根據id是否相同決定是否消費此訊息。用到一個map,每次拿到id先判斷map中是否存在此id,存在表明已經消費過,直接ack讓MQ刪除訊息,map不存在此id,把id存入map後消費。生產者本地快取程式碼:
public class RetrySendCache {
private MessageSender messageSender;
private ConcurrentHashMap<String,MessageWithTime> map=new ConcurrentHashMap();
private String exchangeName;
private String key;
public int getSize(){
return map.size();
}
public RetrySendCache(){
startRetry();
}
private static class MessageWithTime{
private long time;
private Object message;
public MessageWithTime(long time,Object message){
this.time=time;
this.message=message;
}
public long getTime() {
return time;
}
public Object getMessage() {
return message;
}
}
public void setSenderInfo(MessageSender messageSender, String exchangeName, String key){
this.messageSender=messageSender;
this.exchangeName=exchangeName;
this.key=key;
}
public void put(String id,Object message){
map.put(id, new MessageWithTime(System.currentTimeMillis(), message));
}
public void remove(String id){
map.remove(id);
}
public void startRetry(){
new Thread(new Runnable() {
@Override
public void run() {
while(true){
try {
Thread.sleep(30*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for(String key : map.keySet()){
long now=System.currentTimeMillis();
MessageWithTime messageWithTime=map.get(key);
//超過一定時間沒有ack直接刪除本地快取的message,重試兩次
if(messageWithTime.getTime()+3*30*1000<=now){
remove(key);
}else if(messageWithTime.getTime()+30*1000<=now){
SendStatusMessage message = messageSender.send(messageWithTime.getMessage(),exchangeName,key);
if(message.isFlag()){
remove(key);
}
}
}
}
}
}).start();
}
}
生產者程式碼@Component
public class MessageSender implements ConfirmCallback,SenderInterface{
@Autowired
private RabbitTemplate rabbitTemplate;
static RetrySendCache cache=new RetrySendCache();
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
public SendStatusMessage send(Object message,String exchangeName,String key){
try {
cache.setSenderInfo(this,exchangeName,key);
String uuid = UUID.randomUUID().toString();
cache.put(uuid, message);
Message msg=new Message(message,uuid);
rabbitTemplate.convertAndSend(exchangeName, key, FastJsonUtil.objectToString(msg), new CorrelationData(uuid));
} catch (AmqpException e) {
e.printStackTrace();
return new SendStatusMessage(false, "");
}
return new SendStatusMessage(true, "");
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
System.err.println("傳送失敗------: " + cause);
}else{
cache.remove(correlationData.getId());
}
}
}
消費者程式碼public abstract class BaseConsumer implements Consumer{
@Override
@RabbitListener(queues="#{'${rabbitmq.listener.queue.name}'.split(',')}")
//以下固定有兩個引數,也可以只有message一個引數
public void consume(Message s, Channel channel) {
byte[] body = s.getBody();
MessageDetail obj = FastJsonUtil.stringToMessage(body);
String message = map.get(obj.getUuid());
if (StringUtils.isBlank(message)) {
map.put(obj.getUuid(), obj.getUuid());// 記憶體不夠怎麼辦,定期清理
try {
//消費具體邏輯,子類實現
logic(new String(body));
//Delivery Tag 用來標識通道中投遞的訊息,RabbitMQ 推送訊息給 Consumer 時,會附帶一個 Delivery Tag,
//以便 Consumer 可以在訊息確認時告訴 RabbitMQ 到底是哪條訊息被確認了。
//RabbitMQ 保證在每個通道中,每條訊息的 Delivery Tag 從 1 開始遞增
//basicAck 方法的第二個引數 multiple 取值為 false 時,表示通知 RabbitMQ 當前訊息被確認;如果為 true,
//則額外將比第一個引數指定的 delivery tag 小的訊息一併確認
channel.basicAck(s.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
try {
//當消費訊息出現異常時,我們需要取消確認,這時我們可以使用 Channel 的 basicReject 方法。
//第一個引數指定 delivery tag,第二個引數說明如何處理這個失敗訊息。
//requeue 值為 true 表示該訊息重新放回佇列頭,值為 false 表示放棄這條訊息。
//一般來說,如果是系統無法處理的異常,我們一般是將 requeue 設為 false,例如訊息格式錯誤,再處理多少次也是異常。
//呼叫第三方介面超時這類異常 requeue 應該設為 true。
channel.basicReject(s.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e1) {
e1.printStackTrace();
}
}
} else {
try {
//這裡並不是出現異常,而是重複的訊息是不會消費的,直接通知MQ刪除
channel.basicAck(s.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
protected abstract void logic(String message);
}
大體程式碼上,後續還做了簡單封裝,見下一篇