MQ的使用
阿新 • • 發佈:2020-08-18
全過程
- 連線mq伺服器
- 生成template物件
- 我們的角色是admin
- 設定好幾個佇列queue
- 交換機exchange,有4種交換機
- 設定處理佇列的方法,也叫做消費者,這是自動處理的
- 通過介面往佇列新增內容
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.properties
#rabbitmq spring.rabbitmq.host=192.168.89.168 spring.rabbitmq.port=5672 spring.rabbitmq.username=fzb spring.rabbitmq.password=fzb2019 spring.rabbitmq.virtual-host=/ #消費者數量 spring.rabbitmq.listener.simple.concurrency=10 #最大消費者數量 spring.rabbitmq.listener.simple.max-concurrency=10 #消費者每次從佇列獲取的訊息數量。寫多了,如果長時間得不到消費,資料就一直得不到處理 spring.rabbitmq.listener.simple.prefetch=1 #消費者自動啟動 spring.rabbitmq.listener.simple.auto-startup=true #消費者消費失敗,自動重新入隊 spring.rabbitmq.listener.simple.default-requeue-rejected=true #啟用傳送重試 佇列滿了發不進去時啟動重試 spring.rabbitmq.template.retry.enabled=true #1秒鐘後重試一次 spring.rabbitmq.template.retry.initial-interval=1000 #最大重試次數 3次 spring.rabbitmq.template.retry.max-attempts=3 #最大間隔 10秒鐘 spring.rabbitmq.template.retry.max-interval=10000 #等待間隔 的倍數。如果為2 第一次 乘以2 等1秒, 第二次 乘以2 等2秒 ,第三次 乘以2 等4秒 spring.rabbitmq.template.retry.multiplier=1.0
交換機模式
- direct,全匹配的點對點
@Configuration public class SenderConf { @Bean public Queue queue() { return new Queue("queue"); } } @Service public class HelloSender { @Autowired private AmqpTemplate template; public void send() { template.convertAndSend("queue", "hello,rabbit666~"); } } @Component public class MyListner @RabbitListener(queues = "queue") public void msg(String msg){ System.out.println("消費者消費訊息了:"+msg); } }
- topic,模糊匹配,需要判斷
@Configuration
public class SenderConf1 {
@Bean(name="message")
public Queue queueMessage() {
return new Queue("topic.message");
}
@Bean(name="messages")
public Queue queueMessages() {
return new Queue("topic.messages");
}
@Bean
public TopicExchange exchange() {
return new TopicExchange("exchange");
}
@Bean
Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一個詞,#表示零個或多個詞
}
}
@Service
public class HelloSender {
@Autowired
private AmqpTemplate template;
public void send() {
template.convertAndSend("exchange","topic.message","hello,rabbit~~~11");
template.convertAndSend("exchange","topic.messages","hello,rabbit~~~22");
}
}
@Component
public class MyListner{
@RabbitListener(queues="topic.message") //監聽器監聽指定的Queue
public void process1(String str) {
System.out.println("message:"+str);
}
@RabbitListener(queues="topic.messages") //監聽器監聽指定的Queue
public void process2(String str) {
System.out.println("messages:"+str);
}
}
- faout,全廣播
@Configuration
public class SenderConf2 {
@Bean(name="Amessage")
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean(name="Bmessage")
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean(name="Cmessage")
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");//配置廣播路由器
}
@Bean
Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}
@Service
public class HelloSender {
@Autowired
private AmqpTemplate template;
public void send() {
template.convertAndSend("fanoutExchange","","xixi,haha");//引數2忽略
}
}
@Component
public class MyListner{
@RabbitListener(queues="fanout.A")
public void processA(String str1) {
System.out.println("ReceiveA:"+str1);
}
@RabbitListener(queues="fanout.B")
public void processB(String str) {
System.out.println("ReceiveB:"+str);
}
@RabbitListener(queues="fanout.C")
public void processC(String str) {
System.out.println("ReceiveC:"+str);
}
}
過期時間
- 設定了過期時間,過期了就沒了,有兩種方式
- 如果兩個同時設定已最早過期時間為準
// 在傳送訊息時設定過期時間
@Test
public void ttlMessageTest(){
MessageProperties messageProperties = new MessageProperties();
//設定訊息的過期時間,3秒
messageProperties.setExpiration("3000");
Message message = new Message("測試過期訊息,3秒鐘過期".getBytes(), messageProperties);
//路由鍵與佇列同名
rabbitTemplate.convertAndSend("my_ttl_queue", message);
}
// 設定整個佇列的過期時間
@Configuration
public class SenderConf3 {
// 新建業務佇列,新增死信配置,
@Bean("redirectQueue")
public Queue redirectQueue() {
Map<String, Object> args = new HashMap<>(1);
// 過期時間
args.put("x-message-ttl", 10*1000);
return QueueBuilder.durable("REDIRECT_QUEUE").withArguments(args).build();
}
// 需要普通業務交換機和繫結,這裡省略
}
死信佇列
- 上面過期了沒了,可以讓他們去到死信的佇列
@Configuration
public class SenderConf3 {
@Bean("deadLetterExchange")
public Exchange deadLetterExchange() {
return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
}
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
return QueueBuilder.durable("DL_QUEUE").build();
}
@Bean
public Binding deadLetterBinding() {
return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);
}
// 新建業務佇列,新增死信配置,
@Bean("redirectQueue")
public Queue redirectQueue() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 宣告 死信交換機
args.put("x-dead-letter-exchange", "DL_EXCHANGE");
// x-dead-letter-routing-key 宣告 死信路由鍵
args.put("x-dead-letter-routing-key", "DL_KEY");
return QueueBuilder.durable("REDIRECT_QUEUE").withArguments(args).build();
}
// 需要普通業務交換機和繫結,這裡省略
}
延遲佇列
- 需要給rabbitmq安裝外掛,放在pugins資料夾下重啟服務
1. 檢視yum 安裝的軟體路徑
查詢安裝包:rpm -qa|grep rabbitmq
查詢位置: rpm -ql rabbitmq-server-3.6.15-1.el6.noarch
解除安裝yum安裝:yum remove rabbitmq-server-3.6.15-1.el6.noarch
2. 上傳到plugins資料夾
3. 停止伺服器
service rabbitmq-server stop
4. 開啟外掛
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
(關閉外掛)
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
5. 啟動伺服器
service rabbitmq-server start
6. 檢視外掛
rabbitmq-plugins list
@Configuration
public class DelayQueueConfig {
public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";
public static final String DELAY_QUEUE = "DELAY_QUEUE";
public static final String DELAY_ROUTING_KEY = "DELAY_ROUTING_KEY";
@Bean("delayExchange")
public Exchange delayExchange() {
Map<String, Object> args = new HashMap<>(1);
// x-delayed-type 宣告 延遲佇列Exchange的型別
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message",true, false,args);
}
@Bean("delayQueue")
public Queue delayQueue() {
return QueueBuilder.durable(DELAY_QUEUE).build();
}
@Bean
public Binding delayQueueBindExchange() {
return new Binding(DELAY_QUEUE, Binding.DestinationType.QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY, null);
}
}
@Component
public class DelayQueueSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendDelayQueue(int number) {
rabbitTemplate.convertAndSend(
"textExchange",
"textKey",
number, (message) -> {
// 設定延遲的毫秒數
message.getMessageProperties().setDelay(number);
log.info("Now : {}", ZonedDateTime.now());
return message;
});
}
}
// 監聽textKey對應的佇列等訊息就行
確認機制
- 配置
# 傳送確認
spring.rabbitmq.publisher-confirms=true
# 傳送回撥
spring.rabbitmq.publisher-returns=true
# 消費手動確認
spring.rabbitmq.listener.simple.acknowledge-mode=manual
- 生產者傳送訊息確認機制
- 其實這個也不能叫確認機制,只是起到一個監聽的作用,監聽生產者是否傳送訊息到exchange和queue。
- 生產者和消費者程式碼不改變。
- 新建配置類 MQProducerAckConfig.java 實現ConfirmCallback和ReturnCallback介面,@Component註冊成元件。
- ConfirmCallback只確認訊息是否到達exchange,已實現方法confirm中ack屬性為標準,true到達,反之進入黑洞。
- ReturnCallback訊息沒有正確到達佇列時觸發回撥,如果正確到達佇列不執行
@Component
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallback
rabbitTemplate.setReturnCallback(this); //指定 ReturnCallback
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("訊息傳送成功" + correlationData);
} else {
System.out.println("訊息傳送失敗:" + cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 反序列化物件輸出
System.out.println("訊息主體: " + SerializationUtils.deserialize(message.getBody()));
System.out.println("應答碼: " + replyCode);
System.out.println("描述:" + replyText);
System.out.println("訊息使用的交換器 exchange : " + exchange);
System.out.println("訊息使用的路由鍵 routing : " + routingKey);
}
}
- 如果消費者訊息是預設auto
- 如果訊息成功被消費(成功的意思是在消費的過程中沒有丟擲異常),則自動確認
- 當丟擲 AmqpRejectAndDontRequeueException 異常的時候,則訊息會被拒絕,且 requeue = false(不重新入佇列)
- 當丟擲 ImmediateAcknowledgeAmqpException 異常,則消費者會被確認
- 其他的異常,則訊息會被拒絕,且 requeue = true,此時會發生死迴圈,可以通過 setDefaultRequeueRejected(預設是true)去設定拋棄訊息
- 消費者訊息手動確認manual,一定要對訊息做出應答,否則rabbit認為當前佇列沒有消費完成,將不再繼續向該佇列傳送訊息
@Component
public class MyListner
@RabbitListener(queues = "queue")
public void msg(Channel channel,String msg) throws IOException {
System.out.println("消費者消費訊息了:"+msg);
// 多了個channel,還要監聽錯誤
// channel有三個方法,一個是成功,一個是拒絕,一個是重新入隊
try {
// 模擬執行任務
Thread.sleep(1000);
// 模擬異常
String is = null;
is.toString();
// 確認收到訊息,false只確認當前consumer一個訊息收到,true確認所有consumer獲得的訊息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
System.out.println("訊息已重複處理失敗,拒絕再次接收" + user.getName());
// 拒絕訊息,requeue=false 表示不再重新入隊,如果配置了死信佇列則進入死信佇列
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
System.out.println("訊息即將再次返回佇列處理" + user.getName());
// requeue為是否重新回到佇列,true重新入隊
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
//e.printStackTrace();
}
}
}
- 這個就是手動事務了
持久化
- 交換機持久化
@Bean
public DirectExchange testDirectExchange(){
//第二個引數就是是否持久化,第三個引數就是是否自動刪除
return new DirectExchange("direct.Exchange",true,false);
}
- 佇列持久化
@Bean
public Queue txQueue(){
//第二個引數就是durable,是否持久化
return new Queue("txQueue",true);
}
高階知識
- 叢集
- HAProxy
- KeepAlived
面試問題
- 訊息堆積
// 原因
太多入隊,消費不及時,佇列佔滿
// 解決方案
增加消費者
- 訊息丟失
// 原因一
訊息在生產者丟失
// 解決方案一
資訊被MQ接受後需要給生產者傳送一個確認訊息(確認機制)
在confirm方法裡的資訊傳送失敗後面新增重發機制
// 原因二
訊息在MQ宕機丟失
// 解決方案二
啟動持續化
// 原因三
訊息在消費者丟失
// 解決方案二
消費者確認機制,事務機制
- 有序消費
// 目的
有ABC三個訊息,想要順序執行ABC,但是有多個消費者,ABC會被瞬間平分
// 解決方案
改成多個佇列,一個佇列一個消費者,資訊由hash值放到對應佇列
- 重複消費
// 原因
為了防止訊息在消費者丟失開啟了手動回覆,但是如果在消費者執行成功了,但是回覆的時候出了問題,mq就以為訊息沒成功又給下一個消費者傳送一次,同個訊息執行多次
// 解決
每個訊息都新增id,redis也新增id,消費者接受資訊後判斷這個資訊是不是用過了,用過了直接返回成功