SpringBoot---RabbitMQ訊息佇列_1
阿新 • • 發佈:2020-09-11
1、首先下載安裝好Rabbit MQ,使用Username : guest Password: guest登入
2、觀察目前RabbitMq中的Exchange和queue的情況。下圖是預設存在的交換器
3、 預設沒有任何訊息佇列
RabbitMQ和SpringBoot整合
Spring提供對RabbitMQ的封裝如下: (1)AmqpAdmin bean物件 管理rabbitMQ的Bean物件 建立交換器、繫結佇列等 (2)RabbitTemplate 訊息的操作物件
1、建立一個SpringBoot工程,新增RabbitMq的啟動依賴
2、配置檔案配置
#配置RabbitMQ訊息中介軟體連線配置 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #配置RabbitMQ虛擬主機路徑/,預設可以省略 spring.rabbitmq.virtual-host=/
3、在test資料夾的測試類中建立以下測試方法,用於在RabbitMq新增佇列和交換器
@SpringBootTest class Rabbitmq14ApplicationTests { @Autowired AmqpAdmin amqpAdmin;@Resource RabbitTemplate rabbitTemplate; @Test public void adminRabbitMq(){ //1、建立一個交換器 amqpAdmin.declareExchange(new FanoutExchange("my_fanout_exchange")); //2、建立佇列 amqpAdmin.declareQueue(new Queue("queue_email")); amqpAdmin.declareQueue(new Queue("queue_sms")); //3、繫結交換器和佇列 amqpAdmin.declareBinding(new Binding("queue_email",Binding.DestinationType.QUEUE,"my_fanout_exchange","",null)); amqpAdmin.declareBinding(new Binding("queue_sms",Binding.DestinationType.QUEUE,"my_fanout_exchange","",null)); System.out.println("訊息佇列建立成功!"); }}
4、執行該測試方法。觀察控制檯輸出,同時觀察RabbitMq
5、新增配置類
@Configuration public class RabbitMQConfig { /** * 定製JSON格式的訊息轉換器 * @return */ @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
新增User類
@Data public class User { Long id; String username; String email; int age; }User.java
6、將訊息傳送到RabbitMq交換器,在測試類中新增下面方法,傳送100個訊息。
@Test public void publishMag(){ User user=new User(); user.setAge(30); user.setId(10000L); user.setEmail("[email protected]"); user.setUsername("builderMsg"); for(int i=0;i<100;i++){ rabbitTemplate.convertAndSend("fanout_exchange","",user); //fanout_exchange自定義的交換器 } System.out.println("訊息傳送成功!"); }
觀察RabbitMq的後臺(其中Ready的訊息為100,表示佇列中沒有被消費的資料。)
7、編寫監聽類
@Service public class MessageService { int countOfEmailMsg=0; int countOfSmsMsg=0; @RabbitListener(queues = "queue_sms") public void readSmsQueue(Message message){ byte[] body=message.getBody(); String s=new String(body);//user countOfSmsMsg++; System.out.println(countOfEmailMsg+"簡訊業務接收到訊息:"+s); } @RabbitListener(queues = "queue_email") public void readEmailQueue(Message message){ byte[] body=message.getBody(); String s=new String(body); countOfEmailMsg++; System.out.println(countOfEmailMsg+"郵件業務接收到訊息:"+s); } }
啟動工程。觀察控制檯輸出