Spring Boot -- RabbitMQ
阿新 • • 發佈:2018-11-10
Spring Boot -- RabbitMQ
- 1. pom.xml
- 2. application.properties
- 3. config
- 4. 生產者
- 5. 消費者
- 6. 測試
- 發現物件訊息
- 一個生產者多個消費者
- 多個生產者多個消費者
- 一個交換機繫結多個路由鍵
- 扇形交換機,廣播模式,釋出訂閱模式
1. pom.xml
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. application.properties
#RabbitMQ spring.rabbitmq.host=192.168.1.199 spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=localhost spring.rabbitmq.username=root spring.rabbitmq.password=root
3. config
@Configuration
public class RabbitMQConfig {
@Bean
public Queue getQueue(){
return new Queue("queue.demo");
}
}
4. 生產者
@Component
public class Producer {
@Resource
private AmqpTemplate amqpTemplate;
public void send(){
String msg = "msg: " + new Date();
amqpTemplate.convertAndSend("queue.demo" ,msg);
System.out.println("Producer send "+ msg);
}
}
5. 消費者
@Component
@RabbitListener(queues = "queue.demo")
public class Consumer {
@RabbitHandler
public void accept(String msg){
System.out.println("Receiver = [" + msg + "]");
}
}
6. 測試
@Test
public void testMQ() throws InterruptedException {
producer.send();
Thread.sleep(20000);
}
發現物件訊息
public void send(){
String msg = "msg: " + new Date();
Map<String, Object> map = new HashMap<>();
map.put("name", "xubo");
map.put("age", 12);
amqpTemplate.convertAndSend("queue.demo", map.toString());
System.out.println("Producer send "+ map.toString());
}
一個生產者多個消費者
消費者2
@Component
@RabbitListener(queues = "queue.demo")
public class Consumer2 {
@RabbitHandler
public void accept(String msg){
System.out.println("Consumer2 = [" + msg + "]");
}
}
生產者傳送多個訊息
public void send(){
for (int i = 0; i < 10; i++){
String msg = "msg: " + new Date();
amqpTemplate.convertAndSend("queue.demo", msg);
System.out.println("Producer send "+ msg);
}
}
一個生產者多個消費者屬於佇列模式,多個消費者瓜分訊息
多個生產者多個消費者
生產者2
@Component
public class Producer2 {
@Resource
private AmqpTemplate amqpTemplate;
public void send(){
for (int i = 0; i < 5; i++){
String msg = "msg: " + i;
amqpTemplate.convertAndSend("queue.demo", msg);
System.out.println("Producer2 send "+ msg);
}
}
}
@Test
public void testMQ() throws InterruptedException {
producer.send();
producer2.send();
Thread.sleep(20000);
}
一個交換機繫結多個路由鍵
QueueConfig
@Configuration
public class QueueConfig {
@Bean(name = "masterQueue")
@Primary
public Queue masterQueue(){
return new Queue("queue.master");
}
@Bean(name = "clusterQueue")
public Queue clusterQueue(){
return new Queue("queue.cluster");
}
}
TopicRabbitConfig
@Configuration
public class TopicRabbitConfig {
@Bean
public TopicExchange exchange(){
return new TopicExchange("demo.amq.topic");
}
@Bean
public Binding masterBindingExchangeMessage(@Qualifier("masterQueue")Queue queue, TopicExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("master");
}
@Bean
public Binding clusterBindingExchangeMessage(@Qualifier("clusterQueue")Queue queue, TopicExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("cluster");
}
}
Producer
生產者傳送訊息需要指定交換機和路由鍵
public void send(){
for (int i = 0; i < 10; i++){
String msg = "msg: " + i;
if(i % 2 == 0){
amqpTemplate.convertAndSend("demo.amq.topic", "master", msg);
System.out.println("Producer to master send "+ msg);
}else {
amqpTemplate.convertAndSend("demo.amq.topic", "cluster", msg);
System.out.println("Producer to cluster send "+ msg);
}
}
}
Consumer
不同消費者要訂閱對應的不同的佇列
@Component
@RabbitListener(queues = "queue.master")
public class Consumer {
@RabbitHandler
public void accept(String msg){
System.out.println("Consumer1 master = [" + msg + "]");
}
}
@Component
@RabbitListener(queues = "queue.cluster")
public class Consumer2 {
@RabbitHandler
public void accept(String msg){
System.out.println("Consumer2 cluster = [" + msg + "]");
}
}
扇形交換機,廣播模式,釋出訂閱模式
所有繫結扇形交換機的佇列都會收到訊息
FanoutRabbitConfig
@Configuration
public class FanoutRabbitConfig {
@Bean
public FanoutExchange exchange(){
return new FanoutExchange("demo.amq.fanout");
}
@Bean
public Binding masterBindingExchangeMessage(@Qualifier("masterQueue")Queue queue, FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding clusterBindingExchangeMessage(@Qualifier("clusterQueue")Queue queue, FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
}
Producer
生產者傳送訊息針對於扇形交換機不需要路由鍵
for (int i = 0; i < 10; i++){
String msg = "msg: " + i;
amqpTemplate.convertAndSend("demo.amq.fanout", "", msg);
System.out.println("Producer all send "+ msg);
}