springBoot + rabbitMQ +手動確認訊息 + 控制(介面、定時任務)消費者上下線
阿新 • • 發佈:2021-12-17
這裡只貼消費者的部分程式碼
第一部分:手動ack配置
package com.mybatis.plus.config.mq; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/** * * 描述: rabbitMQ配置 * * @author 官昌洪 * @date 2021/12/17 11:24 * @version V1.0 */ @Configuration public class MessageListenerConfig { @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory= new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } }
第二部分:消費訊息
package com.mybatis.plus.config.mq; import com.alibaba.fastjson.JSONObject; import com.mybatis.plus.entity.Log; import com.mybatis.plus.utils.EurekaUtils; import com.mybatis.plus.utils.hash.ConsistentHash; import com.mybatis.plus.utils.hash.pojo.ConsistentHashNode; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.Map; @Slf4j @Component public class Receiver { @Value("${server.port}") private String port; @Autowired RabbitTemplate rabbitTemplate; @RabbitListener(id = "testDirectQueueId1", autoStartup = "false", queues = "testDirectQueue") public void consumer(Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { Thread.sleep(500); if ("testDirectQueue".equals(message.getMessageProperties().getConsumerQueue())) { String msg = new String(message.getBody(), "UTF-8"); Log parseObject = JSONObject.parseObject(msg, Log.class); log.info("消費的訊息來自的佇列名為:" + message.getMessageProperties().getConsumerQueue()); log.info("訊息成功消費到 messageId:" + parseObject.getLogUuid() + " messageData:" + parseObject.getLogTitle() + " createTime:" + parseObject.getCreateTime()); log.info("================================"); // 收到來自主機的訊息 進行一致性hash分配 發往不同的服務 // 獲取服務節點 建立一致hash環 ConsistentHash consistentHash = InitConfig.consistentHash; List<Map<String, String>> allServiceAddr = EurekaUtils.getAllServiceInfo("127.0.0.1", port, "PLUS2"); if (!allServiceAddr.isEmpty()) { for (Map<String, String> stringMap : allServiceAddr) { String instanceId = stringMap.get("routeKey"); // 新增1個物理節點和150個對應的虛擬節點 // String instanceId = stringMap.get("queueKey"); // 如果hash環中沒有該節點 才新增 ConsistentHashNode node = consistentHash.getAccurateNode(instanceId); if (null == node) { consistentHash.putNode(new ConsistentHashNode(consistentHash.getPoint(instanceId), instanceId), 150); } } } else { //沒有服務提供者 將訊息返回佇列 channel.basicReject(deliveryTag, true); return; } channel.basicAck(deliveryTag, false); //第二個引數,手動確認可以被批處理,當該引數為 true 時,則可以一次性確認 delivery_tag 小於等於傳入值的所有訊息 // 提取訊息中的某個代表來源主機的標識 然後在hash環上分配目標節點 String logUuid = parseObject.getLogUuid(); ConsistentHashNode node = consistentHash.getNode(logUuid); log.info("主機標識:{},分配節點:{}", logUuid, node.getTarget()); //向指定路由傳送訊息 // todo 問題 這裡怎麼保證佇列預先建立初始化好 解決方案 先從配置檔案獲取佇列名稱 新增服務時 需要重啟服務 rabbitTemplate.convertAndSend("centerDeliverExchange", node.getTarget().toString(), msg); // planTwo(parseObject); log.info(">>>>>>>>>>>>消費訊息成功!"); } } catch (Exception e) { log.info(">>>>>>>>>>>>消費訊息失敗!失敗訊息ID:{}, 失敗原因:{}", deliveryTag, e.getMessage()); channel.basicReject(deliveryTag, true); } } }
第三部分:控制消費者開啟,關閉
@Autowired private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry; @RequestMapping("/startCustomer") public R startCustomer(){ MessageListenerContainer consumer = rabbitListenerEndpointRegistry.getListenerContainer("testDirectQueueId1"); consumer.start(); return R.ok(); } @RequestMapping("/stopCustomer") public R stopCustomer(){ MessageListenerContainer consumer = rabbitListenerEndpointRegistry.getListenerContainer("testDirectQueueId1"); consumer.stop(); return R.ok(); }
主要還是指定RabbitListener 註解的ID屬性進行控制
⎛⎝官蕭何⎠⎞一隻快樂的爪哇程式猿;郵箱:[email protected]