使用rabbitmq手動確認訊息的,定時獲取佇列訊息實現
描述問題
最近專案中因為有些資料,需要推送到第三方系統中,因為資料會一直增加,並且需要與第三方系統做相關互動。
相關業務
本著不影響線上執行效率的思想,我們將增加的訊息放入rabbitmq,使用另一個應用獲取消費,因為資料只是推送,並且業務的資料有15分鐘左右的更新策略,對實時性不是很高所以我們需要一個定時任務來主動連結rabbit去消費,然後將資料以網路方式傳送
相關分析
網路上大致出現了相關的解決辦法,但由於實現相關資料丟失及處理、效能和效率等相關基礎業務的工作量,望而卻步。。。。。。
還好spring有相關的 org.springframework.amqp 工具包,簡化的大量麻煩>_> 讓我們開始吧
瞭解rabbit的相關幾個概念
- Spring RabbitMQ Channel理解
- 中介軟體系列二 RabbitMQ之訊息永續性、確認機制、拒絕、預取數量、分配策略
- topic
瞭解了這幾個概念的時候你可能已經關注到了我們今天的主題SimpleMessageListenerContainer
我們使用SimpleMessageListenerContainer容器設定消費佇列監聽,然後設定具體的監聽Listener進行訊息消費具體邏輯的編寫,通過SimpleRabbitListenerContainerFactory我們可以完成相關SimpleMessageListenerContainer容器的管理,
但對於使用此容器批量消費的方式,官方並沒有相關說明,網路上你可能只找到這篇SimpleMessageListenerContainer批量訊息處理對於問題描述是很清晰,但是回答只是說的比較簡單
下面我們就對這個問題的答案來個coding
解決辦法
首先我們因為需要失敗重試,使用spring的RepublishMessageRecoverer可以解決這個問題,這顯然有一個缺點,即將在整個重試期間佔用執行緒。所以我們使用了死信佇列
相關配置
1 @Bean 2 ObjectMapper objectMapper() { 3 ObjectMapper objectMapper = new ObjectMapper(); 4 DateFormat dateFormat = objectMapper.getDateFormat(); 5 JavaTimeModule javaTimeModule = new JavaTimeModule(); 6 7 SimpleModule module = new SimpleModule(); 8 module.addSerializer(new ToStringSerializer(Long.TYPE)); 9 module.addSerializer(new ToStringSerializer(Long.class)); 10 module.addSerializer(new ToStringSerializer(BigInteger.class)); 11 12 javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); 13 javaTimeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); 14 javaTimeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss"))); 15 16 objectMapper.registerModule(module); 17 objectMapper.registerModule(javaTimeModule); 18 objectMapper.setConfig(objectMapper.getDeserializationConfig().with(new ObjectMapperDateFormatExtend(dateFormat)));//反序列化擴充套件日期格式支援 19 objectMapper.enable(SerializationFeature.INDENT_OUTPUT); 20 objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); 21 return objectMapper; 22 } 23 24 25 26 @Bean 27 RabbitAdmin admin (ConnectionFactory aConnectionFactory) { 28 return new RabbitAdmin(aConnectionFactory); 29 } 30 31 @Bean 32 MessageConverter jacksonAmqpMessageConverter( ) { 33 return new Jackson2JsonMessageConverter(objectMapper()); 34 } 35 36 37 @Bean 38 Queue bcwPushControlQueue (RabbitAdmin rabbitAdmin) { 39 Queue queue = new Queue(Queues.QUEUE_BCW_PUSH); 40 rabbitAdmin.declareQueue(queue); 41 return queue; 42 } 43 @Bean 44 Queue bcwPayControlQueue (RabbitAdmin rabbitAdmin) { 45 Queue queue = new Queue(Queues.QUEUE_BCW_PAY); 46 rabbitAdmin.declareQueue(queue); 47 return queue; 48 } 49 @Bean 50 Queue bcwPullControlQueue (RabbitAdmin rabbitAdmin) { 51 Queue queue = new Queue(Queues.QUEUE_BCW_PULL); 52 rabbitAdmin.declareQueue(queue); 53 return queue; 54 } 55 /** 56 * 宣告一個交換機 57 * @return 58 */ 59 @Bean 60 TopicExchange controlExchange () { 61 return new TopicExchange(Exchanges.ExangeTOPIC); 62 } 63 64 65 /** 66 * 延時重試佇列 67 */ 68 @Bean 69 public Queue bcwPayControlRetryQueue() { 70 Map<String, Object> arguments = new HashMap<>(); 71 arguments.put("x-message-ttl", 10 * 1000); 72 arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC); 73 // 如果設定死信會以路由鍵some-routing-key轉發到some.exchange.name,如果沒設預設為訊息傳送到本佇列時用的routing key 74 arguments.put("x-dead-letter-routing-key", "queue_bcw.push"); 75 return new Queue("queue_bcw@pay@retry", true, false, false, arguments); 76 } 77 /** 78 * 延時重試佇列 79 */ 80 @Bean 81 public Queue bcwPushControlRetryQueue() { 82 Map<String, Object> arguments = new HashMap<>(); 83 arguments.put("x-message-ttl", 10 * 1000); 84 arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC); 85 // 如果設定死信會以路由鍵some-routing-key轉發到some.exchange.name,如果沒設預設為訊息傳送到本佇列時用的routing key 86 arguments.put("x-dead-letter-routing-key", "queue_bcw.push"); 87 return new Queue("queue_bcw@push@retry", true, false, false, arguments); 88 } 89 /** 90 * 延時重試佇列 91 */ 92 @Bean 93 public Queue bcwPullControlRetryQueue() { 94 Map<String, Object> arguments = new HashMap<>(); 95 arguments.put("x-message-ttl", 10 * 1000); 96 arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC); 97 // 如果設定死信會以路由鍵some-routing-key轉發到some.exchange.name,如果沒設預設為訊息傳送到本佇列時用的routing key 98 // arguments.put("x-dead-letter-routing-key", "queue_bcw"); 99 return new Queue("queue_bcw@pull@retry", true, false, false, arguments); 100 } 101 @Bean 102 public Binding bcwPayControlRetryBinding() { 103 return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pay.retry"); 104 } 105 @Bean 106 public Binding bcwPushControlRetryBinding() { 107 return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.push.retry"); 108 } 109 @Bean 110 public Binding bcwPullControlRetryBinding() { 111 return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pull.retry"); 112 } 113 114 /** 115 * 佇列繫結並關聯到RoutingKey 116 * 117 * @param queueMessages 佇列名稱 118 * @param exchange 交換機 119 * @return 繫結 120 */ 121 @Bean 122 Binding bcwPushBindingQueue(@Qualifier("bcwPushControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) { 123 return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.push"); 124 } 125 /** 126 * 佇列繫結並關聯到RoutingKey 127 * 128 * @param queueMessages 佇列名稱 129 * @param exchange 交換機 130 * @return 繫結 131 */ 132 @Bean 133 Binding bcwPayBindingQueue(@Qualifier("bcwPayControlQueue") Queue queueMessages, @Qualifier("controlExchange") TopicExchange exchange) { 134 return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pay"); 135 } 136 /** 137 * 佇列繫結並關聯到RoutingKey 138 * 139 * @param queueMessages 佇列名稱 140 * @param exchange 交換機 141 * @return 繫結 142 */ 143 @Bean 144 Binding bcwPullBindingQueue(@Qualifier("bcwPullControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) { 145 return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pull"); 146 } 147 148 @Bean 149 @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") 150 public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( 151 SimpleRabbitListenerContainerFactoryConfigurer configurer, 152 ConnectionFactory connectionFactory) { 153 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 154 configurer.configure(factory, connectionFactory); 155 factory.setMessageConverter(jacksonAmqpMessageConverter()); 156 return factory; 157 }
下面就是我們的主題,定時任務使用的是org.springframework.scheduling
1 /** 2 * 手動確認訊息的,定時獲取佇列訊息實現 3 */ 4 public abstract class QuartzSimpleMessageListenerContainer extends SimpleMessageListenerContainer { 5 protected final Logger logger = LoggerFactory.getLogger(getClass()); 6 private List<Message> body = new LinkedList<>(); 7 public long start_time; 8 private Channel channel; 9 @Autowired 10 private ObjectMapper objectMapper; 11 @Autowired 12 private RabbitTemplate rabbitTemplate; 13 14 public QuartzSimpleMessageListenerContainer() { 15 // 手動確認 16 this.setAcknowledgeMode(AcknowledgeMode.MANUAL); 17 18 this.setMessageListener((ChannelAwareMessageListener) (message,channel) -> { 19 long current_time = System.currentTimeMillis(); 20 int time = (int) ((current_time - start_time)/1000); 21 logger.info("====接收到{}佇列的訊息=====",message.getMessageProperties().getConsumerQueue()); 22 Long retryCount = getRetryCount(message.getMessageProperties()); 23 if (retryCount > 3) { 24 logger.info("====此訊息失敗超過三次{}從佇列的訊息刪除=====",message.getMessageProperties().getConsumerQueue()); 25 try { 26 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); 27 } catch (IOException ex) { 28 ex.printStackTrace(); 29 } 30 return; 31 } 32 33 this.body.add(message); 34 /** 35 * 判斷陣列資料是否滿了,判斷此監聽器時間是否大於執行時間 36 * 如果在最後延時時間段內沒有業務訊息,此監聽器會一直開著 37 */ 38 if(body.size()>=3 || time>60){ 39 this.channel = channel; 40 callback(); 41 } 42 }); 43 44 45 46 } 47 private void callback(){ 48 // channel = getChannel(getTransactionalResourceHolder()); 49 if(body.size()>0 && channel !=null && channel.isOpen()){ 50 try { 51 callbackWork(); 52 }catch (Exception e){ 53 logger.error("推送資料出錯:{}",e.getMessage()); 54 55 body.stream().forEach(message -> { 56 Long retryCount = getRetryCount(message.getMessageProperties()); 57 if (retryCount <= 3) { 58 logger.info("將訊息置入延時重試佇列,重試次數:" + retryCount); 59 rabbitTemplate.convertAndSend(Exchanges.ExangeTOPIC, message.getMessageProperties().getReceivedRoutingKey()+".retry", message); 60 } 61 }); 62 63 } finally{ 64 65 logger.info("flsher too data"); 66 67 body.stream().forEach(message -> { 68 //手動acknowledge 69 try { 70 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 71 } catch (IOException e) { 72 logger.error("手動確認訊息失敗!"); 73 e.printStackTrace(); 74 } 75 }); 76 77 body.clear(); 78 this.stop(); 79 80 } 81 } 82 83 } 84 abstract void callbackWork() throws Exception; 85 /** 86 * 獲取訊息失敗次數 87 * @param properties 88 * @return 89 */ 90 private long getRetryCount(MessageProperties properties){ 91 long retryCount = 0L; 92 Map<String,Object> header = properties.getHeaders(); 93 if(header != null && header.containsKey("x-death")){ 94 List<Map<String,Object>> deaths = (List<Map<String,Object>>)header.get("x-death"); 95 if(deaths.size()>0){ 96 Map<String,Object> death = deaths.get(0); 97 retryCount = (Long)death.get("count"); 98 } 99 } 100 return retryCount; 101 } 102 103 @Override 104 @Scheduled(cron = "0 0/2 * * * ? ") 105 public void start() { 106 logger.info("start push data scheduled!"); 107 //初始化資料,將未處理的呼叫stop方法,返還至rabbit 108 body.clear(); 109 super.stop(); 110 start_time = System.currentTimeMillis(); 111 super.start(); 112 113 logger.info("end push data scheduled!"); 114 } 115 116 public List<WDNJPullOrder> getBody() { 117 118 List<WDNJPullOrder> collect = body.stream().map(data -> { 119 byte[] body = data.getBody(); 120 WDNJPullOrder readValue = null; 121 try { 122 readValue = objectMapper.readValue(body, new TypeReference<WDNJPullOrder>() { 123 }); 124 } catch (IOException e) { 125 logger.error("處理資料出錯{}",e.getMessage()); 126 } 127 return readValue; 128 } 129 ).collect(Collectors.toList()); 130 131 return collect; 132 133 134 } 135 136 }
後續
當然定時任務的啟動,你可以寫到相關rabbit容器實現的裡面,但是這裡並不是很需要,所以對於這個的小改動,同學你可以自己實現
@Scheduled(cron = "0 0/2 * * * ? ") public void start()