1. 程式人生 > 其它 >springBoot + rabbitMQ +手動確認訊息 + 控制(介面、定時任務)消費者上下線

springBoot + rabbitMQ +手動確認訊息 + 控制(介面、定時任務)消費者上下線

這裡只貼消費者的部分程式碼

第一部分:手動ack配置

package com.mybatis.plus.config.mq;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/** * * 描述: rabbitMQ配置 * * @author 官昌洪 * @date 2021/12/17 11:24 * @version V1.0 */ @Configuration public class MessageListenerConfig { @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory
= new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } }

第二部分:消費訊息

package com.mybatis.plus.config.mq;

import com.alibaba.fastjson.JSONObject;
import com.mybatis.plus.entity.Log;
import com.mybatis.plus.utils.EurekaUtils;
import com.mybatis.plus.utils.hash.ConsistentHash;
import com.mybatis.plus.utils.hash.pojo.ConsistentHashNode;
import com.rabbitmq.client.Channel;
import lombok.
extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.Map; @Slf4j @Component public class Receiver { @Value("${server.port}") private String port; @Autowired RabbitTemplate rabbitTemplate; @RabbitListener(id = "testDirectQueueId1", autoStartup = "false", queues = "testDirectQueue") public void consumer(Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { Thread.sleep(500); if ("testDirectQueue".equals(message.getMessageProperties().getConsumerQueue())) { String msg = new String(message.getBody(), "UTF-8"); Log parseObject = JSONObject.parseObject(msg, Log.class); log.info("消費的訊息來自的佇列名為:" + message.getMessageProperties().getConsumerQueue()); log.info("訊息成功消費到 messageId:" + parseObject.getLogUuid() + " messageData:" + parseObject.getLogTitle() + " createTime:" + parseObject.getCreateTime()); log.info("================================"); // 收到來自主機的訊息 進行一致性hash分配 發往不同的服務 // 獲取服務節點 建立一致hash環 ConsistentHash consistentHash = InitConfig.consistentHash; List<Map<String, String>> allServiceAddr = EurekaUtils.getAllServiceInfo("127.0.0.1", port, "PLUS2"); if (!allServiceAddr.isEmpty()) { for (Map<String, String> stringMap : allServiceAddr) { String instanceId = stringMap.get("routeKey"); // 新增1個物理節點和150個對應的虛擬節點 // String instanceId = stringMap.get("queueKey"); // 如果hash環中沒有該節點 才新增 ConsistentHashNode node = consistentHash.getAccurateNode(instanceId); if (null == node) { consistentHash.putNode(new ConsistentHashNode(consistentHash.getPoint(instanceId), instanceId), 150); } } } else { //沒有服務提供者 將訊息返回佇列 channel.basicReject(deliveryTag, true); return; } channel.basicAck(deliveryTag, false); //第二個引數,手動確認可以被批處理,當該引數為 true 時,則可以一次性確認 delivery_tag 小於等於傳入值的所有訊息 // 提取訊息中的某個代表來源主機的標識 然後在hash環上分配目標節點 String logUuid = parseObject.getLogUuid(); ConsistentHashNode node = consistentHash.getNode(logUuid); log.info("主機標識:{},分配節點:{}", logUuid, node.getTarget()); //向指定路由傳送訊息 // todo 問題 這裡怎麼保證佇列預先建立初始化好 解決方案 先從配置檔案獲取佇列名稱 新增服務時 需要重啟服務 rabbitTemplate.convertAndSend("centerDeliverExchange", node.getTarget().toString(), msg); // planTwo(parseObject); log.info(">>>>>>>>>>>>消費訊息成功!"); } } catch (Exception e) { log.info(">>>>>>>>>>>>消費訊息失敗!失敗訊息ID:{}, 失敗原因:{}", deliveryTag, e.getMessage()); channel.basicReject(deliveryTag, true); } } }

第三部分:控制消費者開啟,關閉

@Autowired
    private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

    @RequestMapping("/startCustomer")
    public R startCustomer(){
        MessageListenerContainer consumer = rabbitListenerEndpointRegistry.getListenerContainer("testDirectQueueId1");
        consumer.start();
        return R.ok();
    }

    @RequestMapping("/stopCustomer")
    public R stopCustomer(){
        MessageListenerContainer consumer = rabbitListenerEndpointRegistry.getListenerContainer("testDirectQueueId1");
        consumer.stop();
        return R.ok();
    }

主要還是指定RabbitListener 註解的ID屬性進行控制

⎛⎝官蕭何⎠⎞一隻快樂的爪哇程式猿;郵箱:[email protected]