1. 程式人生 > 實用技巧 >MQ的使用

MQ的使用

筆記來自
基礎使用
延遲佇列
確認機制

全過程

  • 連線mq伺服器
  • 生成template物件
  • 我們的角色是admin
  • 設定好幾個佇列queue
  • 交換機exchange,有4種交換機
  • 設定處理佇列的方法,也叫做消費者,這是自動處理的
  • 通過介面往佇列新增內容

pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.properties

#rabbitmq
spring.rabbitmq.host=192.168.89.168
spring.rabbitmq.port=5672
spring.rabbitmq.username=fzb
spring.rabbitmq.password=fzb2019
spring.rabbitmq.virtual-host=/
#消費者數量
spring.rabbitmq.listener.simple.concurrency=10
#最大消費者數量
spring.rabbitmq.listener.simple.max-concurrency=10
#消費者每次從佇列獲取的訊息數量。寫多了,如果長時間得不到消費,資料就一直得不到處理
spring.rabbitmq.listener.simple.prefetch=1
#消費者自動啟動
spring.rabbitmq.listener.simple.auto-startup=true
#消費者消費失敗,自動重新入隊
spring.rabbitmq.listener.simple.default-requeue-rejected=true
#啟用傳送重試 佇列滿了發不進去時啟動重試
spring.rabbitmq.template.retry.enabled=true 
#1秒鐘後重試一次
spring.rabbitmq.template.retry.initial-interval=1000 
#最大重試次數 3次
spring.rabbitmq.template.retry.max-attempts=3
#最大間隔 10秒鐘
spring.rabbitmq.template.retry.max-interval=10000
#等待間隔 的倍數。如果為2  第一次 乘以2 等1秒, 第二次 乘以2 等2秒 ,第三次 乘以2 等4秒
spring.rabbitmq.template.retry.multiplier=1.0

交換機模式

  • direct,全匹配的點對點
@Configuration
public class SenderConf {
    @Bean
    public Queue queue() {
        return new Queue("queue");
    }
}


@Service
public class HelloSender {
    @Autowired
    private AmqpTemplate template;
 
    public void send() {
        template.convertAndSend("queue", "hello,rabbit666~");
    }
}

@Component
public class MyListner 
    @RabbitListener(queues = "queue")
    public void msg(String msg){
        System.out.println("消費者消費訊息了:"+msg);
    }
}
  • topic,模糊匹配,需要判斷
@Configuration
public class SenderConf1 {
 
    @Bean(name="message")
    public Queue queueMessage() {
        return new Queue("topic.message");
    }
 
    @Bean(name="messages")
    public Queue queueMessages() {
        return new Queue("topic.messages");
    }
 
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("exchange");
    }
    @Bean
    Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }
 
    @Bean
    Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一個詞,#表示零個或多個詞
    }
}


@Service
public class HelloSender {
    @Autowired
    private AmqpTemplate template;
 
    public void send() {
        template.convertAndSend("exchange","topic.message","hello,rabbit~~~11");
        template.convertAndSend("exchange","topic.messages","hello,rabbit~~~22");
    }
}

@Component
public class MyListner{
    @RabbitListener(queues="topic.message")    //監聽器監聽指定的Queue
    public void process1(String str) {
        System.out.println("message:"+str);
    }
    @RabbitListener(queues="topic.messages")    //監聽器監聽指定的Queue
    public void process2(String str) {
        System.out.println("messages:"+str);
    }
}
  • faout,全廣播
@Configuration
public class SenderConf2 {
 
    @Bean(name="Amessage")
    public Queue AMessage() {
        return new Queue("fanout.A");
    }
 
    @Bean(name="Bmessage")
    public Queue BMessage() {
        return new Queue("fanout.B");
    }
 
    @Bean(name="Cmessage")
    public Queue CMessage() {
        return new Queue("fanout.C");
    }
 
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");//配置廣播路由器
    }
 
    @Bean
    Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }
 
    @Bean
    Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }
 
    @Bean
    Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }
}

@Service
public class HelloSender {
    @Autowired
    private AmqpTemplate template;
 
    public void send() {
        template.convertAndSend("fanoutExchange","","xixi,haha");//引數2忽略
    }
}

@Component
public class MyListner{
    @RabbitListener(queues="fanout.A")
    public void processA(String str1) {
        System.out.println("ReceiveA:"+str1);
    }
    @RabbitListener(queues="fanout.B")
    public void processB(String str) {
        System.out.println("ReceiveB:"+str);
    }
    @RabbitListener(queues="fanout.C")
    public void processC(String str) {
        System.out.println("ReceiveC:"+str);
    }
}

過期時間

  • 設定了過期時間,過期了就沒了,有兩種方式
  • 如果兩個同時設定已最早過期時間為準
// 在傳送訊息時設定過期時間
@Test
public void ttlMessageTest(){
   MessageProperties messageProperties = new MessageProperties();
   //設定訊息的過期時間,3秒
   messageProperties.setExpiration("3000");
   Message message = new Message("測試過期訊息,3秒鐘過期".getBytes(), messageProperties);
   //路由鍵與佇列同名
   rabbitTemplate.convertAndSend("my_ttl_queue", message);
}

// 設定整個佇列的過期時間
@Configuration
public class SenderConf3 {
    // 新建業務佇列,新增死信配置,
    @Bean("redirectQueue")
    public Queue redirectQueue() {
        Map<String, Object> args = new HashMap<>(1);
        // 過期時間
        args.put("x-message-ttl", 10*1000);
        return QueueBuilder.durable("REDIRECT_QUEUE").withArguments(args).build();
    }
    // 需要普通業務交換機和繫結,這裡省略
}

死信佇列

  • 上面過期了沒了,可以讓他們去到死信的佇列
@Configuration
public class SenderConf3 {
    @Bean("deadLetterExchange")
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
    }

    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        return QueueBuilder.durable("DL_QUEUE").build();
    }

    @Bean
    public Binding deadLetterBinding() {
        return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);
    }

    // 新建業務佇列,新增死信配置,
    @Bean("redirectQueue")
    public Queue redirectQueue() {
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    宣告  死信交換機
        args.put("x-dead-letter-exchange", "DL_EXCHANGE");
//       x-dead-letter-routing-key    宣告 死信路由鍵
        args.put("x-dead-letter-routing-key", "DL_KEY");
        return QueueBuilder.durable("REDIRECT_QUEUE").withArguments(args).build();
    }

    // 需要普通業務交換機和繫結,這裡省略
}

延遲佇列

  • 需要給rabbitmq安裝外掛,放在pugins資料夾下重啟服務
1. 檢視yum 安裝的軟體路徑
   查詢安裝包:rpm -qa|grep rabbitmq
   查詢位置: rpm -ql rabbitmq-server-3.6.15-1.el6.noarch
   解除安裝yum安裝:yum remove rabbitmq-server-3.6.15-1.el6.noarch
2. 上傳到plugins資料夾
3. 停止伺服器
   service rabbitmq-server stop
4. 開啟外掛
   rabbitmq-plugins enable rabbitmq_delayed_message_exchange
   (關閉外掛)
   rabbitmq-plugins disable rabbitmq_delayed_message_exchange
5. 啟動伺服器
   service rabbitmq-server start
6. 檢視外掛
   rabbitmq-plugins list
@Configuration
public class DelayQueueConfig {

    public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";
    public static final String DELAY_QUEUE = "DELAY_QUEUE";
    public static final String DELAY_ROUTING_KEY = "DELAY_ROUTING_KEY";

    @Bean("delayExchange")
    public Exchange delayExchange() {
        Map<String, Object> args = new HashMap<>(1);
//       x-delayed-type    宣告 延遲佇列Exchange的型別
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message",true, false,args);
    }

    @Bean("delayQueue")
    public Queue delayQueue() {
        return QueueBuilder.durable(DELAY_QUEUE).build();
    }
    
    @Bean
    public Binding delayQueueBindExchange() {
        return new Binding(DELAY_QUEUE, Binding.DestinationType.QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY, null);
    }

}


@Component
public class DelayQueueSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDelayQueue(int number) {
        rabbitTemplate.convertAndSend(
                "textExchange",
                "textKey",
                number, (message) -> {
                    // 設定延遲的毫秒數
                    message.getMessageProperties().setDelay(number);
                    log.info("Now : {}", ZonedDateTime.now());
                    return message;
                });
    }
}

// 監聽textKey對應的佇列等訊息就行

確認機制

  • 配置
# 傳送確認
spring.rabbitmq.publisher-confirms=true
# 傳送回撥
spring.rabbitmq.publisher-returns=true
# 消費手動確認
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 生產者傳送訊息確認機制
    • 其實這個也不能叫確認機制,只是起到一個監聽的作用,監聽生產者是否傳送訊息到exchange和queue。
    • 生產者和消費者程式碼不改變。
    • 新建配置類 MQProducerAckConfig.java 實現ConfirmCallback和ReturnCallback介面,@Component註冊成元件。
    • ConfirmCallback只確認訊息是否到達exchange,已實現方法confirm中ack屬性為標準,true到達,反之進入黑洞。
    • ReturnCallback訊息沒有正確到達佇列時觸發回撥,如果正確到達佇列不執行
@Component
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallback
        rabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback

    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("訊息傳送成功" + correlationData);
        } else {
            System.out.println("訊息傳送失敗:" + cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        // 反序列化物件輸出
        System.out.println("訊息主體: " + SerializationUtils.deserialize(message.getBody()));
        System.out.println("應答碼: " + replyCode);
        System.out.println("描述:" + replyText);
        System.out.println("訊息使用的交換器 exchange : " + exchange);
        System.out.println("訊息使用的路由鍵 routing : " + routingKey);
    }
}
  • 如果消費者訊息是預設auto
    • 如果訊息成功被消費(成功的意思是在消費的過程中沒有丟擲異常),則自動確認
    • 當丟擲 AmqpRejectAndDontRequeueException 異常的時候,則訊息會被拒絕,且 requeue = false(不重新入佇列)
    • 當丟擲 ImmediateAcknowledgeAmqpException 異常,則消費者會被確認
    • 其他的異常,則訊息會被拒絕,且 requeue = true,此時會發生死迴圈,可以通過 setDefaultRequeueRejected(預設是true)去設定拋棄訊息
  • 消費者訊息手動確認manual,一定要對訊息做出應答,否則rabbit認為當前佇列沒有消費完成,將不再繼續向該佇列傳送訊息
@Component
public class MyListner 
    @RabbitListener(queues = "queue")
    public void msg(Channel channel,String msg) throws IOException {
        System.out.println("消費者消費訊息了:"+msg);
        // 多了個channel,還要監聽錯誤
        // channel有三個方法,一個是成功,一個是拒絕,一個是重新入隊
        try {
            // 模擬執行任務
            Thread.sleep(1000);
            // 模擬異常
            String is = null;
            is.toString();
            // 確認收到訊息,false只確認當前consumer一個訊息收到,true確認所有consumer獲得的訊息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                System.out.println("訊息已重複處理失敗,拒絕再次接收" + user.getName());
                // 拒絕訊息,requeue=false 表示不再重新入隊,如果配置了死信佇列則進入死信佇列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                System.out.println("訊息即將再次返回佇列處理" + user.getName());
                // requeue為是否重新回到佇列,true重新入隊
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
            //e.printStackTrace();
        }
    }
}
  • 這個就是手動事務了

持久化

  • 交換機持久化
@Bean
public DirectExchange testDirectExchange(){
   //第二個引數就是是否持久化,第三個引數就是是否自動刪除
   return new DirectExchange("direct.Exchange",true,false);
}
  • 佇列持久化
@Bean
public Queue txQueue(){
   //第二個引數就是durable,是否持久化
   return new Queue("txQueue",true);
}

高階知識

  • 叢集
  • HAProxy
  • KeepAlived

面試問題

  • 訊息堆積
// 原因
太多入隊,消費不及時,佇列佔滿
// 解決方案
增加消費者
  • 訊息丟失
// 原因一
訊息在生產者丟失
// 解決方案一
資訊被MQ接受後需要給生產者傳送一個確認訊息(確認機制)
在confirm方法裡的資訊傳送失敗後面新增重發機制

// 原因二
訊息在MQ宕機丟失
// 解決方案二
啟動持續化

// 原因三
訊息在消費者丟失
// 解決方案二
消費者確認機制,事務機制
  • 有序消費
// 目的
有ABC三個訊息,想要順序執行ABC,但是有多個消費者,ABC會被瞬間平分
// 解決方案
改成多個佇列,一個佇列一個消費者,資訊由hash值放到對應佇列
  • 重複消費
// 原因
為了防止訊息在消費者丟失開啟了手動回覆,但是如果在消費者執行成功了,但是回覆的時候出了問題,mq就以為訊息沒成功又給下一個消費者傳送一次,同個訊息執行多次
// 解決
每個訊息都新增id,redis也新增id,消費者接受資訊後判斷這個資訊是不是用過了,用過了直接返回成功