1. 程式人生 > >spring boot 整合RabbitMQ

spring boot 整合RabbitMQ

首先介紹下MQ1:MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法2:MQ是消費-生產者模型的一個典型的代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取或者訂閱佇列中的訊息。3:在專案中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量。4:MQ則是遵循了AMQP協議的具體實現和產品。5:AMQP,即Advanced Message Queuing Protocol,一個提供統一訊息服務的應用層標準高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。基於此協議的客戶端與訊息中介軟體可傳遞訊息,並不受客戶端/中介軟體不同產品,不同的開發語言等條件的限制.Erlang中的實現有 RabbitMQ.RabbitMQ的工作流程如下所示
前面是生產者,中間是RabbitMQ,後面是消費者。MQ還分為交換機(exchange)和佇列(queues)。兩者的關係是佇列繫結到交換機上。具體流程是:生產者生產訊息傳送到交換機,交換機負責把訊息轉發到與自己繫結的佇列上(交換機不負責儲存),消費者從自己監聽的佇列中取出訊息並且消費。這裡還存在兩個機制,一個是Confirm機制,一種是ACK機制。前一種是生產者傳送給MQ時,是否傳送成功的確認。後者是訊息被消費者拿到是否手動確認MQ刪除此訊息。兩者都可以不設定,Confirm不設定,表示無論生產者是否成功傳送訊息到MQ,都不做處理,這裡需要自己程式碼實現。這樣的缺點就是,在向某個交換機發送訊息時,由於某些原因沒成功,又沒設定Confirm,造成訊息丟失。ACK不設定,表示noack即不確認,只要消費者拿到訊息,MQ就會刪除佇列中的訊息,無論消費者是否成功消費。這樣的缺點是,當消費者拿到訊息後沒有消費成功,此訊息已經在MQ中刪除了,造成訊息丟失。這兩種機制都在後面的程式碼中有所體現。下面介紹下交換機的型別,有四種類型,分別為Direct,Topic,headers,Fanout.
1:Direct是RabbitMQ預設的交換機模式,也是最簡單的模式.即建立訊息佇列的時候,指定一個BindingKey.當傳送者傳送訊息的時候,指定對應的Key.當Key和訊息佇列的BindingKey一致的時候,訊息將會被髮送到該訊息佇列中.eg:這是一個完整key的匹配。如果一個佇列繫結到交換機上要求路由鍵位"dog",則只有被標記為"dog"的訊息才被轉發到該佇列上,不會轉發"dog.1"和"dog.1.2",只會轉發"dog"。2:Topic轉發資訊主要是依據萬用字元,佇列和交換機的繫結主要是依據一種模式(萬用字元+字串),而當傳送訊息的時候,只有指定的Key和該模式相匹配的時候,訊息才會被髮送到該訊息佇列中.
eg:佇列需要繫結在一個模式上,符號“#”匹配一個或多個詞,符號“*”只能匹配一個詞。因為"dog.#"能匹配到"dog.1.2",但是“dog.*”只能匹配到“dog.1”3:headers也是根據一個規則進行匹配,在訊息佇列和交換機繫結的時候會指定一組鍵值對規則,而傳送訊息的時候也會指定一組鍵值對規則,當兩組鍵值對規則相匹配的時候,訊息會被髮送到匹配的訊息佇列中.4:Fanout是路由廣播的形式,將會把訊息發給繫結它的全部佇列,即便設定了key,也會被忽略.spring boot 整合 RabbitMQ基於maven,建立兩個工程,一個生產者和一個消費者兩者的pom檔案是一樣的,看下生產者的
    <parent>
	    <groupId>org.springframework.boot</groupId>
	    <artifactId>spring-boot-starter-parent</artifactId>
	    <version>1.5.9.RELEASE</version>
    </parent>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
            <scope>true</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!-- 新增springboot對amqp的支援 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.tomcat.embed</groupId>
            <artifactId>tomcat-embed-jasper</artifactId>
            <scope>provided</scope>
        </dependency>
       
    </dependencies>
這裡注意一點是添加了springboot的對amqp的支援配置檔案application.properties如下,同樣生產者和消費者配置是一樣的。
#MQ連結相關---開始
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtualHost=/taotao
#MQ連結相關---結束

#MQ ack相關---開始
#客戶端確認模式,客戶端傳送訊息到mq,mq會非同步返回是否收到訊息。
#預設沒有確認,無論傳送是否成功,客戶端都不會知曉
spring.rabbitmq.publisher-confirms=true
#服務端確認模式,消費者消費成功向mq傳送刪除已經消費訊息的資訊。
#預設消費者接收到訊息,mq就會刪除
spring.rabbitmq.listener.acknowledge-mode=MANUAL
#MQ ack相關---結束

還有一處是一樣的RabbitMQConfig類,程式碼如下:
package cn.rabbitmq.example;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

	// 建立Topic型別交換機
	@Bean
	public TopicExchange defaultExchange() {
		// 第一個引數交換機名稱
		// 第二個引數是否交換機久化,true為持久化,建立後重啟依然存在
		// 第三個引數是在交換機不在使用的情況下,自動刪除
		return new TopicExchange("TopicExchange", true, false, null);
	}

	// 建立佇列
	@Bean
	public Queue queue1() {
		// 第一個引數佇列名稱
		// 第二個引數是否列隊持久化,true為持久化,建立後重啟依然存在
		return new Queue("queue3", true);
	}

	@Bean
	public Queue queue2() {
		return new Queue("queue4", true);
	}

	// 交換機與佇列繫結,並且設定了路由匹配
	@Bean
	public Binding binding1() {
		return BindingBuilder.bind(queue1()).to(defaultExchange()).with("item.*");
	}

	// 交換機與佇列繫結,並且設定了路由匹配
	@Bean
	public Binding binding2() {
		return BindingBuilder.bind(queue2()).to(defaultExchange()).with("item.#");
	}

}
具體的解釋在程式碼中,這裡定義了一個交換機和兩個佇列,並且兩個佇列都繫結到了交換機上,只是路由萬用字元不同。接下來就是傳送和消費了。但是其中有些問題是無法避免的。1:生產者傳送訊息到MQ然後返回,MQ接收到了後非同步去向生產者確認,但是在MQ確認接收到時,網路出現問題無法向生產者確認這時應該怎麼辦?生產者不知道MQ是否收到,這時必須重新發送。2:上一個問題可能導致MQ中的訊息是重複的,消費者消費到重複的資料,消費者應該去重。
解決問題1:在本地寫個快取,把每次要傳送的資料快取到本地,生產者接收到MQ發回的確認資訊後,刪除本地快取資料。開啟一個執行緒去處理快取中的資料,一定時間內,遍歷快取資料重新發送。解決問題2:生產者除了傳送訊息外,再為每個訊息生成個id,消費者根據id是否相同決定是否消費此訊息。用到一個map,每次拿到id先判斷map中是否存在此id,存在表明已經消費過,直接ack讓MQ刪除訊息,map不存在此id,把id存入map後消費。生產者本地快取程式碼:
public class RetrySendCache {
	private MessageSender messageSender;
	private ConcurrentHashMap<String,MessageWithTime> map=new ConcurrentHashMap();
	private String exchangeName;
	private String key;

	public int getSize(){
		return map.size();
	}
	
	public RetrySendCache(){
		startRetry();
	}
	
	private static class MessageWithTime{
		private long time;
		private Object message;
		public MessageWithTime(long time,Object message){
			this.time=time;
			this.message=message;
		}
		public long getTime() {
			return time;
		}
		public Object getMessage() {
			return message;
		}
		
	}
	public void setSenderInfo(MessageSender messageSender, String exchangeName, String key){
		this.messageSender=messageSender;
		this.exchangeName=exchangeName;
		this.key=key;
	}
	
	public void put(String id,Object message){
		map.put(id, new MessageWithTime(System.currentTimeMillis(), message));
	}
	
	public void remove(String id){
		map.remove(id);
	}
	
	public void startRetry(){
		new Thread(new Runnable() {
	           @Override
		   public void run() {
			while(true){
				try {
					Thread.sleep(30*1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				for(String key  : map.keySet()){
					long now=System.currentTimeMillis();
					MessageWithTime messageWithTime=map.get(key);
					//超過一定時間沒有ack直接刪除本地快取的message,重試兩次
					if(messageWithTime.getTime()+3*30*1000<=now){
						remove(key);
					}else if(messageWithTime.getTime()+30*1000<=now){
				SendStatusMessage message = messageSender.send(messageWithTime.getMessage(),exchangeName,key);
					if(message.isFlag()){
						remove(key);
						}
					}
				}
			}
		}
	}).start();
		
	}
}
生產者程式碼
@Component
public class MessageSender  implements ConfirmCallback,SenderInterface{
	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	static RetrySendCache cache=new RetrySendCache();
	
	
	@PostConstruct
    public void init() {
		rabbitTemplate.setConfirmCallback(this);
    }

	public SendStatusMessage send(Object message,String exchangeName,String key){
		try {
			cache.setSenderInfo(this,exchangeName,key);
			String uuid = UUID.randomUUID().toString();
			cache.put(uuid, message);
			Message msg=new Message(message,uuid);
			rabbitTemplate.convertAndSend(exchangeName, key, FastJsonUtil.objectToString(msg), new CorrelationData(uuid));
		} catch (AmqpException e) {
			e.printStackTrace();
			return new SendStatusMessage(false, "");
		}
		
		return new SendStatusMessage(true, "");
		
	}

	@Override
	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
		if (!ack) { 
			System.err.println("傳送失敗------: " + cause);
		}else{
			cache.remove(correlationData.getId());
		}
		
	}
}
消費者程式碼
public abstract class BaseConsumer implements Consumer{
	@Override
	@RabbitListener(queues="#{'${rabbitmq.listener.queue.name}'.split(',')}")
	//以下固定有兩個引數,也可以只有message一個引數
	public void consume(Message s, Channel channel) {
		byte[] body = s.getBody();
		MessageDetail obj = FastJsonUtil.stringToMessage(body);
		String message = map.get(obj.getUuid());
		if (StringUtils.isBlank(message)) {
			map.put(obj.getUuid(), obj.getUuid());// 記憶體不夠怎麼辦,定期清理
			try {
				//消費具體邏輯,子類實現
				logic(new String(body));
				//Delivery Tag 用來標識通道中投遞的訊息,RabbitMQ 推送訊息給 Consumer 時,會附帶一個 Delivery Tag,
				//以便 Consumer 可以在訊息確認時告訴 RabbitMQ 到底是哪條訊息被確認了。
				//RabbitMQ 保證在每個通道中,每條訊息的 Delivery Tag 從 1 開始遞增
				//basicAck 方法的第二個引數 multiple 取值為 false 時,表示通知 RabbitMQ 當前訊息被確認;如果為 true,
				//則額外將比第一個引數指定的 delivery tag 小的訊息一併確認
				channel.basicAck(s.getMessageProperties().getDeliveryTag(), false);
			} catch (Exception e) {
				e.printStackTrace();
				try {
					//當消費訊息出現異常時,我們需要取消確認,這時我們可以使用 Channel 的 basicReject 方法。
					//第一個引數指定 delivery tag,第二個引數說明如何處理這個失敗訊息。
					//requeue 值為 true 表示該訊息重新放回佇列頭,值為 false 表示放棄這條訊息。
					//一般來說,如果是系統無法處理的異常,我們一般是將 requeue 設為 false,例如訊息格式錯誤,再處理多少次也是異常。
					//呼叫第三方介面超時這類異常 requeue 應該設為 true。
					channel.basicReject(s.getMessageProperties().getDeliveryTag(),false);
				} catch (IOException e1) {
					e1.printStackTrace();
				}
			}
		} else {
			try {
				//這裡並不是出現異常,而是重複的訊息是不會消費的,直接通知MQ刪除
				channel.basicAck(s.getMessageProperties().getDeliveryTag(), false);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	protected abstract void  logic(String message);

}
大體程式碼上,後續還做了簡單封裝,見下一篇