淺析GridView中顯示時間日期格式的問題
阿新 • • 發佈:2020-09-01
pom:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
yml:
spring:
rabbitmq:
host: ip
port: 5672
virtual-host: /
username: 賬號
password: 密碼
1.直連型交換機
根據訊息攜帶的路由鍵將訊息投遞給對應佇列。流程:有一個佇列繫結到一個直連交換機上,同時設定一個路由鍵。然後當一個訊息攜帶著路由值X,這個訊息通過生產者傳送給交換機,交換機就會根據這個路由值X去尋找繫結值也是X的佇列。
生產者:
//直連型交換機 @Configuration public class DirectRabbitConfig { //佇列 起名:TestDirectQueue @Bean public Queue TestDirectQueue() { // durable:是否持久化,預設是false,持久化佇列:會被儲存在磁碟上,當訊息代理重啟時仍然存在,暫存佇列:當前連線有效。// exclusive:預設是false,只能被當前建立的連線使用,而且當連線關閉後佇列即被刪除。 // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此佇列,該佇列會自動刪除。//一般設定一下佇列的持久化就好,其餘兩個就是預設false return new Queue("TestDirectQueue",true, false, false); } //Direct交換機 起名:TestDirectExchange @Bean DirectExchange TestDirectExchange() { returnnew DirectExchange("TestDirectExchange",true,false); } //繫結 將佇列和交換機繫結, 並設定用於匹配鍵:TestDirectRouting @Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting4"); } }
package com.demo.business.controller; import com.demo.common.msg.BaseResponse; import com.demo.common.util.ResponseMsgUtil; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import java.util.UUID; @RestController @RequestMapping("/mq") public class RabbitMQController { @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/send") public BaseResponse send() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String,Object> map=new HashMap<>(); map.put("messageId",messageId); map.put("messageData",messageData); map.put("createTime",createTime); //將訊息攜帶繫結鍵值:TestDirectRouting 傳送到交換機TestDirectExchange rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map); return ResponseMsgUtil.success(); } }
消費者:
package com.demo.business.controller; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; @Component @RabbitListener(queues = "TestDirectQueue")//監聽的佇列名稱 TestDirectQueue public class DirectReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("DirectReceiver消費者收到訊息 : " + testMessage.toString()); } }
2.主題交換機
主題交換機與直連交換機流程相似,區別在於繫結鍵有規則(*便是一個單詞,#表示零個或多個)。
生產者:
package com.demo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TopicRabbitConfig { //繫結鍵 public final static String man = "topic.man"; public final static String woman = "topic.woman"; @Bean public Queue firstQueue() { return new Queue(TopicRabbitConfig.man); } @Bean public Queue secondQueue() { return new Queue(TopicRabbitConfig.woman); } @Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); } //將firstQueue和topicExchange繫結,而且繫結的鍵值為topic.man //這樣只要是訊息攜帶的路由鍵是topic.man,才會分發到該佇列 @Bean Binding bindingExchangeMessage() { return BindingBuilder.bind(firstQueue()).to(exchange()).with(man); } //將secondQueue和topicExchange繫結,而且繫結的鍵值為用上通配路由鍵規則topic.# // 這樣只要是訊息攜帶的路由鍵是以topic.開頭,都會分發到該佇列 @Bean Binding bindingExchangeMessage2() { return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#"); } }
@GetMapping("/send2") public BaseResponse send2() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String,Object> map=new HashMap<>(); map.put("messageId",messageId); map.put("messageData",messageData); map.put("createTime",createTime); //將訊息攜帶繫結鍵值:TestDirectRouting 傳送到交換機TestDirectExchange rabbitTemplate.convertAndSend("topicExchange", "topic.woman22222", map); return ResponseMsgUtil.success(); }
消費者:
import java.util.Map; @Component @RabbitListener(queues = "topic.woman")//佇列名稱 public class TopicManReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("TopicManReceiver消費者收到訊息 : " + testMessage.toString()); } }
3.扇形交換機
沒有路由鍵的概念,接收到訊息後,會直接轉發到繫結到它上面的所有佇列。
生產者:
package com.demo.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutRabbitConfig { @Bean public Queue queueA() { return new Queue("fanout.A"); } @Bean public Queue queueB() { return new Queue("fanout.B"); } @Bean public Queue queueC() { return new Queue("fanoutC"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA() { return BindingBuilder.bind(queueA()).to(fanoutExchange()); } @Bean Binding bindingExchangeB() { return BindingBuilder.bind(queueB()).to(fanoutExchange()); } @Bean Binding bindingExchangeC() { return BindingBuilder.bind(queueC()).to(fanoutExchange()); } }
@GetMapping("/send3") public BaseResponse send3() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String,Object> map=new HashMap<>(); map.put("messageId",messageId); map.put("messageData",messageData); map.put("createTime",createTime); //將訊息攜帶繫結鍵值:TestDirectRouting 傳送到交換機TestDirectExchange rabbitTemplate.convertAndSend("fanoutExchange", null, map); return ResponseMsgUtil.success(); }
消費者:
ackage com.demo.business.controller; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; @Component @RabbitListener(queues = "fanout.A") public class FanoutReceiverA { @RabbitHandler public void process(Map testMessage) { System.out.println("FanoutReceiverA消費者收到訊息 : " +testMessage.toString()); } }
package com.demo.business.controller; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; @Component @RabbitListener(queues = "fanoutC") public class FanoutReceiverC { @RabbitHandler public void process(Map testMessage) { System.out.println("FanoutReceiverC消費者收到訊息 : " +testMessage.toString()); } }
另外,還有Header Exchange 頭交換機、Default Exchange 預設交換機、Dead Letter Exchange 死信交換機。
4.訊息回撥
生產這傳送訊息結束後回撥。
yml增加配置:
spring:
#確認訊息已傳送到交換機(Exchange)
publisher-confirms: true
#確認訊息已傳送到佇列(Queue)
publisher-returns: true
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //設定開啟Mandatory,才能觸發回撥函式,無論訊息推送結果怎麼樣都強制呼叫回撥函式 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("ConfirmCallback: "+"資料:"+correlationData); System.out.println("ConfirmCallback: "+"結果:"+ack); System.out.println("ConfirmCallback: "+"原因:"+cause); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("ReturnCallback: "+"訊息:"+message); System.out.println("ReturnCallback: "+"狀態碼:"+replyCode); System.out.println("ReturnCallback: "+"資訊:"+replyText); System.out.println("ReturnCallback: "+"交換機:"+exchange); System.out.println("ReturnCallback: "+"鍵值:"+routingKey); } }); return rabbitTemplate; } }
呼叫規則:
找不到交換機,呼叫:ConfirmCallback;
找到交換機,沒找到佇列,呼叫:ConfirmCallback和RetrunCallback;
交換機和佇列都沒找到,呼叫:ConfirmCallback;
訊息推送成功,呼叫:ConfirmCallback;
5.訊息確認
消費者收到訊息後進行操作。
手動確認:消費者收到訊息後手動呼叫basic.ack/basic.nack/basic.reject後,RabbitMQ收到這些訊息後,才認為本次投遞成功。
basicAck用於肯定確認
basicNack用於否定確認
basicReject用於否定確認,與basic.nack相比,一次只能拒絕單挑訊息。
import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MessageListenerConfig { @Autowired private CachingConnectionFactory connectionFactory; @Autowired private MyListenerConfig myListenerConfig;//訊息接收處理類 @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//RabbitMQ預設是自動確認,這裡改為手動確認訊息 container.setQueueNames("TestDirectQueue"); container.setMessageListener(myListenerConfig); return container; } }
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; @Component public class MyListenerConfig implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println(訊息:"+message.toString()); channel.basicAck(deliveryTag, true); //channel.basicReject(deliveryTag, true);//為true會重新放回佇列 } catch (Exception e) { channel.basicReject(deliveryTag, false); e.printStackTrace(); } } }