1. 程式人生 > 其它 >Ribbit Mq 實現延遲訊息

Ribbit Mq 實現延遲訊息

--------------------好記性不如爛筆頭---------------------------

windows 環境,使用 rabbit Mq 需要安裝,erl 和rabbit Mq

1.erl 安裝完需要配置環境變數

2.查詢erl 是否安裝好,cmd-->erl -version

erl -version
3.MQ 安裝目錄下
D:\anzhuang\rabbitmq_server-3.8.9\sbin
啟動 :cmd-->rabbitmq-server

訪問:http://localhost:15672/#/

guest/guest (使用者名稱/密碼)

--以上可以登入,進行下面操作,配置檔案

spring:


rabbitmq:
host: localhost
port: 5672
virtualHost: /
username : guest
password : guest
listener:
simple:
acknowledge-mode: manual

pom檔案
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置類

package com.example.servicebuy.config;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
* Created by qiuzhijie.
* Date: 2019-01-07
* 備註: mq config information
*/
@Configuration
public class RabbitMQConfig {

private Logger log = LoggerFactory.getLogger(this.getClass());
@Value("${spring.rabbitmq.host}")
private String HOST;
@Value("${spring.rabbitmq.port}")
private Integer PORT;
@Value("${spring.rabbitmq.virtualHost}")
private String VIRTUALHOST;
@Value("${spring.rabbitmq.username}")
private String USERNAME;
@Value("${spring.rabbitmq.password}")
private String PASSWORD;

@Bean
public CachingConnectionFactory connectionFactory() {
log.info("RabbitMQ連結資訊:{},{},{},{}", HOST, PORT, USERNAME, PASSWORD);
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(this.HOST, this.PORT);

connectionFactory.setUsername(this.USERNAME);
connectionFactory.setPassword(this.PASSWORD);
connectionFactory.setVirtualHost(this.VIRTUALHOST);
log.info("RabbitMQ連線成功");

return connectionFactory;


}

/**
* 死信佇列交換機識別符號 屬性值不能改,寫死
*/
private static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 死信佇列交換機繫結鍵 識別符號 屬性值不能改,寫死
*/
private static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";


/**
* deadLetterExchange(direct型別交換機)
*
* @return
*/
@Bean("deadLetterExchange")
public Exchange deadLetterExchange() {
return ExchangeBuilder.directExchange("DEAD_LETTER_EXCHANGE").durable(true).build();
}

/**
* 宣告一個死信佇列
* x-dead-letter-exchange 對應 死信交換機
* x-dead-letter-routing-key 對應 死信佇列
*/
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
//應該像個普通佇列,裡面多設定了兩個引數,這個佇列沒有被消費或者超時 則通過x-dead-letter-exchange 指明重新回到死信交換機 TEST_SIGN_EXCHANGE
//交換機
// 引數
Map<String, Object> args = new HashMap<>(2);
// 出現dead letter之後將dead letter重新發送到指定exchange
args.put(DEAD_LETTER_QUEUE_KEY, "DEAD_LETTER_EXCHANGE");
// 出現dead letter之後將dead letter重新按照指定的routing-key傳送
args.put(DEAD_LETTER_ROUTING_KEY, "REDIRECT_KEY");
// name佇列名字 durable是否持久化,true保證訊息的不丟失, exclusive是否排他佇列,如果一個佇列被宣告為排他佇列,該佇列僅對首次申明它的連線可見,並在連線斷開時自動刪除, autoDelete如果該佇列沒有任何訂閱的消費者的話,該佇列是否會被自動刪除, arguments引數map
return new Queue("DEAD_LETTER_QUEUE", true, false, false, args);
}


/**
* 死信路由通過 DEAD_LETTER_KEY 繫結到死信佇列上.
*/
@Bean
public Binding deadLetterBinding() {
return new Binding("DEAD_LETTER_QUEUE", Binding.DestinationType.QUEUE, "DEAD_LETTER_EXCHANGE", "DEAD_LETTER_KEY", null);

}

/**
* 死信路由通過 REDIRECT_KEY 繫結到轉發佇列上. 這個佇列繫結的是當出現死信訊息後 重新轉發給的佇列
*/
@Bean
public Binding redirectBinding() {
return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DEAD_LETTER_EXCHANGE", "REDIRECT_KEY", null);
}
/**
* 定義死信佇列轉發佇列. (和普通佇列一樣,這個佇列是為了原有的訊息沒有被消費重新轉發給一個新的佇列)
*/
@Bean("redirectQueue")
public Queue redirectQueue() {
return new Queue("REDIRECT_QUEUE", true, false, false);
}

}

2.訊息生產者 --》傳送訊息

import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 延遲訊息 ribbit MQ栗子
*
* @param messAge
* @throws Exception
*/
@ApiOperation("死信傳送訊息")
@GetMapping("creatMessageDear")
public void creatMessageDear(@RequestParam("messAge") String messAge) throws Exception {
//宣告訊息處理器 設定訊息的編碼以及訊息的過期時間 時間毫秒值為字串
MessagePostProcessor messagePostProcessor = message -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString().replaceAll("-", ""));
messageProperties.setContentEncoding("utf-8");
//超時時間10秒 (我這裡是延遲10秒,根據業務需要設定時間)
messageProperties.setExpiration(String.valueOf(1000 * 10));
return message;
};
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("msg", messAge);
rabbitTemplate.convertAndSend("DEAD_LETTER_EXCHANGE", "DEAD_LETTER_KEY", dataMap, messagePostProcessor);
}

3.接受死信訊息
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class TestMQConsumer {

/**
* 監聽轉發佇列 死信佇列重新轉發回這裡
*
*/
@RabbitListener(queues = {"REDIRECT_QUEUE"})
public void redirect(HashMap<String,Object> dataMap) throws IOException {
System.out.println(dataMap.get("msg"));
System.out.println("我是轉發佇列,這裡執行邏輯業務");
}
}
------------------------------