SpringBoot2.0系列--09--訊息佇列(Rabbit)
SpringBoot2.0系列–09–訊息佇列(Rabbit)
前言
JDK出11了,SpringBoot出2.0了,還沒有系統的學習過,剛好最近專案中有使用到,就把一些關鍵的東西列出來,避免忘記
SpringBoot2.0系列–00–目錄
介紹
當專案需要拆分,分散式的時候一般需要使用訊息佇列,Rabbit作為一個訊息中介軟體,在實際專案中使用的比重還是挺大的。
訊息中介軟體最主要的作用是解耦,中介軟體最標準的用法是生產者生產訊息傳送到佇列,消費者從佇列中拿取訊息並處理,生產者不用關心是誰來消費,消費者不用關心誰在生產訊息,從而達到解耦的目的。
這邊主要springboot和RabbitMQ的結合使用。具體介紹可以檢視這篇:
// todo 某連結
總流程
- 安裝rabbit,啟動
- 配置引用
- 簡單(一對一下訊息)例項
- 一對多訊息
- 多對多訊息
- 傳送物件
- Topic Exchange
- Fanout Exchange
具體操作
安裝rabbit,啟動
可以檢視
// todo 某連結
配置引用
pom檔案
<!-- rabbitmq需要的包--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
application.properties檔案
# 這幾個是預設的配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
簡單(一對一下訊息)例項
建立佇列配置
@Configuration public class RabbitConfig { // 測試一對一 @Bean public Queue helloQueue() { return new Queue("hello"); } @Bean public Queue fooQueue() { return new Queue("foo"); }
建立生產者
/*
* Copyright (C), 2015-2018
* FileName: Sender
* Author: zhao
* Date: 2018/11/14 15:28
* Description: 傳送者
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.easy;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 〈一句話功能簡述〉<br>
* 〈傳送者〉
*
* @author zhao
* @date 2018/11/14 15:28
* @since 1.0.1
*/
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hello " + new Date();
System.out.println("Sender : " + context);
rabbitTemplate.convertAndSend("hello", context);
}
public void sendFoo() {
String context = "foo " + new Date();
System.out.println("Foo Sender : " + context);
rabbitTemplate.convertAndSend("foo", context);
}
}
建立消費者
/*
* Copyright (C), 2015-2018
* FileName: Receiver
* Author: zhao
* Date: 2018/11/14 15:28
* Description: 消費者
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.easy;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 〈一句話功能簡述〉<br>
* 〈消費者〉
*
* @author zhao
* @date 2018/11/14 15:28
* @since 1.0.1
*/
@Component
@RabbitListener(queues = "hello")
public class Receiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
測試類及結果
這邊發出去一條,接收到了一條
// 測試一對一
@Test
public void easy() {
// 結果
// Sender : hello Wed Nov 14 19:33:16 GMT+08:00 2018
// Receiver : hello Wed Nov 14 19:33:16 GMT+08:00 2018
sender.send();
// sender.sendFoo();
}
一對多訊息
建立佇列配置
// 測試一對多
@Bean
public Queue multimapQueue1() {
return new Queue("OneToMany");
}
建立生產者
/*
* Copyright (C), 2015-2018
* FileName: one2ManySender
* Author: zhao
* Date: 2018/11/14 16:57
* Description: 一對多翻譯
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.multimap;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 〈一句話功能簡述〉<br>
* 〈一對多翻譯〉
*
* @author zhao
* @date 2018/11/14 16:57
* @since 1.0.1
*/
@Component
public class OneToManySender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "OneToMany " + new Date();
System.out.println("OneToManySender : " + context);
rabbitTemplate.convertAndSend("OneToMany", context);
}
}
建立消費者
/*
* Copyright (C), 2015-2018
* FileName: OneToManyReceiver1
* Author: zhao
* Date: 2018/11/14 16:59
* Description: 一對多接收者1
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.multimap;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 〈一句話功能簡述〉<br>
* 〈一對多接收者1〉
*
* @author zhao
* @date 2018/11/14 16:59
* @since 1.0.1
*/
@Component
@RabbitListener(queues = "OneToMany")
public class OneToManyReceiver1 {
@RabbitHandler
public void process(String foo) {
System.out.println("OneToManyReceiver1 : " + foo);
}
}
/*
* Copyright (C), 2015-2018
* FileName: OneToManyReceiver2
* Author: zhao
* Date: 2018/11/14 16:59
* Description: 一對多接收者2
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.multimap;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 〈一句話功能簡述〉<br>
* 〈一對多接收者2〉
*
* @author zhao
* @date 2018/11/14 16:59
* @since 1.0.1
*/
@Component
@RabbitListener(queues = "OneToMany")
public class OneToManyReceiver2 {
@RabbitHandler
public void process(String foo) {
System.out.println("OneToManyReceiver2 : " + foo);
}
}
測試類、說明及結果
一個傳送端,2個接收端,可以看到結果是平均分佈的
// 測試一對多訊息
@Test
public void testOneToMany() throws Exception {
// 一個傳送端,2個接收端,可以看到結果是平均分佈的
// 結果
// OneToManySender : OneToMany Wed Nov 14 19:37:17 GMT+08:00 2018
//OneToManyReceiver2 : OneToMany Wed Nov 14 19:37:17 GMT+08:00 2018
//OneToManySender : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
//OneToManyReceiver1 : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
//OneToManySender : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
//OneToManyReceiver2 : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
//OneToManySender : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
//OneToManyReceiver1 : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
//OneToManySender : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
//OneToManyReceiver2 : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
//OneToManySender : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
//OneToManyReceiver1 : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
//OneToManySender : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
//OneToManyReceiver2 : OneToMany Wed Nov 14 19:37:18 GMT+08:00 2018
for (int i = 0; i < 100; i++) {
oneToManySender.send();
Thread.sleep(100);
}
}
多對多訊息
建立佇列配置
// 測試多對多
@Bean
public Queue multimapQueue3() {
return new Queue("manyToMany");
}
建立生產者
/*
* Copyright (C), 2015-2018
* FileName: ManyToManySender
* Author: zhao
* Date: 2018/11/14 16:57
* Description: 多對多翻譯
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.multimap;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 〈一句話功能簡述〉<br>
* 〈多對多翻譯〉
*
* @author zhao
* @date 2018/11/14 16:57
* @since 1.0.1
*/
@Component
public class ManyToManySender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "ManyToManySender1 " + new Date();
System.out.println("ManyToManySender1 : " + context);
rabbitTemplate.convertAndSend("manyToMany", context);
}
}
/*
* Copyright (C), 2015-2018
* FileName: ManyToManySender
* Author: zhao
* Date: 2018/11/14 16:57
* Description: 多對多翻譯
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.multimap;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 〈一句話功能簡述〉<br>
* 〈多對多翻譯〉
*
* @author zhao
* @date 2018/11/14 16:57
* @since 1.0.1
*/
@Component
public class ManyToManySender2 {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "ManyToManySender2 " + new Date();
System.out.println("ManyToManySender2 : " + context);
rabbitTemplate.convertAndSend("manyToMany", context);
}
}
建立消費者
/*
* Copyright (C), 2015-2018
* FileName: ManyToManyReceiver1
* Author: zhao
* Date: 2018/11/14 16:59
* Description: 多對多接收者1
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.multimap;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 〈一句話功能簡述〉<br>
* 〈多對多接收者1〉
*
* @author zhao
* @date 2018/11/14 16:59
* @since 1.0.1
*/
@Component
@RabbitListener(queues = "manyToMany")
public class ManyToManyReceiver1 {
public int count = 0;
@RabbitHandler
public void process(String foo) {
System.out.println("ManyToManyReceiver1 : " + foo);
count++;
}
}
/*
* Copyright (C), 2015-2018
* FileName: ManyToManyReceiver2
* Author: zhao
* Date: 2018/11/14 16:59
* Description: 多對多接收者2
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.multimap;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 〈一句話功能簡述〉<br>
* 〈多對多接收者2〉
*
* @author zhao
* @date 2018/11/14 16:59
* @since 1.0.1
*/
@Component
@RabbitListener(queues = "manyToMany")
public class ManyToManyReceiver2 {
public int count = 0;
@RabbitHandler
public void process(String foo) {
System.out.println("ManyToManyReceiver2 : " + foo);
count++;
}
}
/*
* Copyright (C), 2015-2018
* FileName: ManyToManyReceiver3
* Author: zhao
* Date: 2018/11/14 16:59
* Description: 多對多接收者1
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.multimap;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 〈一句話功能簡述〉<br>
* 〈多對多接收者3〉
*
* @author zhao
* @date 2018/11/14 16:59
* @since 1.0.1
*/
@Component
@RabbitListener(queues = "manyToMany")
public class ManyToManyReceiver3 {
public int count = 0;
@RabbitHandler
public void process(String foo) {
System.out.println("ManyToManyReceiver3 : " + foo);
count++;
}
}
測試類、說明及結果
這裡是2對3的關係,結果看上去好像不是平均的,我們加上一個count,來統計各個接收者執行的次數,最後發現是66.67.67,所以是平均的
// 測試多對多訊息
@Test
public void testManyToMany() throws Exception {
//這裡是2對3的關係,結果看上去好像不是平均的,
// 我們加上一個count,來統計各個接收者執行的次數,最後發現是66.67.67,
// 所以是平均的
// 結果
// manyToManyReceiver1.count: 67
// manyToManyReceiver2.count: 66
// manyToManyReceiver3.count: 67
// ManyToManyReceiver3 : ManyToManySender1 Wed Nov 14 19:40:31 GMT+08:00 2018
// ManyToManyReceiver1 : ManyToManySender2 Wed Nov 14 19:40:31 GMT+08:00 2018
// ManyToManySender1 : ManyToManySender1 Wed Nov 14 19:40:31 GMT+08:00 2018
// ManyToManySender2 : ManyToManySender2 Wed Nov 14 19:40:31 GMT+08:00 2018
// ManyToManyReceiver2 : ManyToManySender1 Wed Nov 14 19:40:31 GMT+08:00 2018
// ManyToManyReceiver3 : ManyToManySender2 Wed Nov 14 19:40:31 GMT+08:00 2018
// ManyToManySender1 : ManyToManySender1 Wed Nov 14 19:40:31 GMT+08:00 2018
// ManyToManySender2 : ManyToManySender2 Wed Nov 14 19:40:31 GMT+08:00 2018
// ManyToManyReceiver2 : ManyToManySender2 Wed Nov 14 19:40:31 GMT+08:00 2018
// ManyToManyReceiver1 : ManyToManySender1 Wed Nov 14 19:40:31 GMT+08:00 2018
// ManyToManySender1 : ManyToManySender1 Wed Nov 14 19:40:31 GMT+08:00 2018
// ManyToManySender2 : ManyToManySender2 Wed Nov 14 19:40:31 GMT+08:00 2018
// ManyToManyReceiver1 : ManyToManySender2 Wed Nov 14 19:40:31 GMT+08:00 2018
// ManyToManyReceiver3 : ManyToManySender1 Wed Nov 14 19:40:31 GMT+08:00 2018
for (int i = 0; i < 100; i++) {
manyToManySender.send();
manyToManySender2.send();
Thread.sleep(100);
}
System.out.println(
"manyToManyReceiver1.count: " + manyToManyReceiver1.count + "\n" + "manyToManyReceiver2.count: "
+ manyToManyReceiver2.count + "\n" + "manyToManyReceiver3.count: " + manyToManyReceiver3.count
+ "\n");
}
傳送物件
建立佇列配置
// 測試傳送物件
@Bean
public Queue entityQueue() {
return new Queue("entity");
}
物件
/*
* Copyright (C), 2015-2018
* FileName: User
* Author: zhao
* Date: 2018/11/14 18:25
* Description: 實體--使用者
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.advance.entity;
import java.io.Serializable;
/**
* 〈一句話功能簡述〉<br>
* 〈實體--使用者〉
*
* @author zhao
* @date 2018/11/14 18:25
* @since 1.0.1
*/
public class User implements Serializable {
private int id;
private String name;
public User(int id, String name) {
this.id = id;
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
@Override
public String toString() {
return "User{" + "id=" + id + ", name='" + name + '\'' + '}';
}
}
建立生產者
/*
* Copyright (C), 2015-2018
* FileName: Sender
* Author: zhao
* Date: 2018/11/14 15:28
* Description: 傳送者
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.advance.entity;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 〈一句話功能簡述〉<br>
* 〈傳送者〉
*
* @author zhao
* @date 2018/11/14 15:28
* @since 1.0.1
*/
@Component
public class EntitySender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
User user = new User(1, "小王");
System.out.println("Sender : " + user);
rabbitTemplate.convertAndSend("entity", user);
}
}
建立消費者
/*
* Copyright (C), 2015-2018
* FileName: Receiver
* Author: zhao
* Date: 2018/11/14 15:28
* Description: 消費者
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.advance.entity;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 〈一句話功能簡述〉<br>
* 〈消費者〉
*
* @author zhao
* @date 2018/11/14 15:28
* @since 1.0.1
*/
@Component
@RabbitListener(queues = "entity")
public class EntityReceiver {
@RabbitHandler
public void process(User user) {
System.out.println("Receiver : " + user.toString());
}
}
測試類、說明及結果
讓實體實現Serializable介面,就能直接傳送了
// 測試傳送實體
@Test
public void entity() {
// 讓實體實現Serializable介面,就能直接傳送了
// 結果
// Sender : User{id=1, name='小王'}
// Receiver : User{id=1, name='小王'}
entitySender.send();
}
Topic Exchange
建立佇列配置
// 測試topic
final static String message = "topic.message";
final static String messages = "topic.messages";
@Bean
public Queue queueMessage() {
return new Queue(RabbitConfig.message);
}
@Bean
public Queue queueMessages() {
return new Queue(RabbitConfig.messages);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("exchange");
}
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
建立生產者
/*
* Copyright (C), 2015-2018
* FileName: Sender
* Author: zhao
* Date: 2018/11/14 15:28
* Description: 傳送者
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.advance.topic;
import com.lizhaobolg.message.rabbit.advance.entity.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 〈一句話功能簡述〉<br>
* 〈傳送者〉
*
* @author zhao
* @date 2018/11/14 15:28
* @since 1.0.1
*/
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);
}
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);
}
}
建立消費者
/*
* Copyright (C), 2015-2018
* FileName: Receiver
* Author: zhao
* Date: 2018/11/14 15:28
* Description: 消費者
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.advance.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 〈一句話功能簡述〉<br>
* 〈消費者〉
*
* @author zhao
* @date 2018/11/14 15:28
* @since 1.0.1
*/
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {
@RabbitHandler
public void process(String hello) {
System.out.println("TopicReceiver1 : " + hello);
}
}
/*
* Copyright (C), 2015-2018
* FileName: Receiver
* Author: zhao
* Date: 2018/11/14 15:28
* Description: 消費者
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.advance.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 〈一句話功能簡述〉<br>
* 〈消費者〉
*
* @author zhao
* @date 2018/11/14 15:28
* @since 1.0.1
*/
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {
@RabbitHandler
public void process(String hello) {
System.out.println("TopicReceiver2 : " + hello);
}
}
測試類及結果
// topic -- 測試根據key來繫結佇列
@Test
public void topic() {
// 這裡queueMessages這個佇列,可以被2個key匹配,
// 本身topic.messages會繫結過一個佇列
// 所以會執行2次topic.messages的訊息
// 結果
// Sender : hi, i am message 1
// Sender : hi, i am messages 2
// TopicReceiver2 : hi, i am message 1
// TopicReceiver1 : hi, i am message 1
// TopicReceiver2 : hi, i am messages 2
topicSender.send1();
topicSender.send2();
}
Fanout Exchange
建立佇列配置
// 測試fanout
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
建立生產者
/*
* Copyright (C), 2015-2018
* FileName: Sender
* Author: zhao
* Date: 2018/11/14 15:28
* Description: 傳送者
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.advance.fanout;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 〈一句話功能簡述〉<br>
* 〈傳送者〉
*
* @author zhao
* @date 2018/11/14 15:28
* @since 1.0.1
*/
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send1() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}
}
建立消費者
/*
* Copyright (C), 2015-2018
* FileName: FanoutReceiver
* Author: zhao
* Date: 2018/11/14 18:58
* Description: fanout消費者
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.advance.fanout;
import com.lizhaobolg.message.rabbit.advance.entity.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 〈一句話功能簡述〉<br>
* 〈fanout消費者〉
*
* @author zhao
* @date 2018/11/14 18:58
* @since 1.0.1
*/
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverA : " + msg);
}
}
/*
* Copyright (C), 2015-2018
* FileName: FanoutReceiver
* Author: zhao
* Date: 2018/11/14 18:58
* Description: fanout消費者
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.advance.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 〈一句話功能簡述〉<br>
* 〈fanout消費者〉
*
* @author zhao
* @date 2018/11/14 18:58
* @since 1.0.1
*/
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverB : " + msg);
}
}
/*
* Copyright (C), 2015-2018
* FileName: FanoutReceiver
* Author: zhao
* Date: 2018/11/14 18:58
* Description: fanout消費者
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改時間 版本號 描述
*/
package com.lizhaobolg.message.rabbit.advance.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 〈一句話功能簡述〉<br>
* 〈fanout消費者〉
*
* @author zhao
* @date 2018/11/14 18:58
* @since 1.0.1
*/
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverC : " + msg);
}
}
測試類及結果
這裡屬於廣播,所以傳送一次,三個客戶端都能接收到
// fanout -- 廣播機制
@Test
public void fanout() {
// 這裡屬於廣播,所以傳送一次,三個客戶端都能接收到
// 結果
// Sender : hi, fanout msg
// FanoutReceiverB : hi, fanout msg
// FanoutReceiverC : hi, fanout msg
// FanoutReceiverA : hi, fanout msg
fanoutSender.send1();
}
參考連結
聯絡方式
聯絡方式:QQ3060507060
檢視下一篇或者其他文章,可點選目錄或者專欄檢視
相關推薦
SpringBoot2.0系列--09--訊息佇列(Rabbit)
SpringBoot2.0系列–09–訊息佇列(Rabbit) 前言 JDK出11了,SpringBoot出2.0了,還沒有系統的學習過,剛好最近專案中有使用到,就把一些關鍵的東西列出來,避免忘記 SpringBoot2.0系列–00–目錄 介紹 當專案需要拆
訊息佇列(MQ)系列1.0 為啥需要訊息佇列?
什麼場景會需要訊息佇列(MQ)? message queue: 主要原因是由於在高併發環境下,由於來不及同步處理,請求往往會發生堵塞,比如說,大量的insert,update之類的請求同時到達資料庫, 直接導致無數的行鎖表鎖,甚至最後請求會堆積過多,從而觸發too many
大型網站架構系列:訊息佇列(二)(轉)
本文是大型網站架構系列:訊息佇列(二),主要分享JMS訊息服務,常用訊息中介軟體(Active MQ,Rabbit MQ,Zero MQ,Kafka)。【第二篇的內容大部分為網路資源的整理和彙總,供大家學習總結使用,最後有文章來源】 本次分享大綱 訊息佇列概述(見第一篇:大型網站架構系列:分散式訊息
SpringBoot2.0系列--07--熱部署
SpringBoot2.0系列–07–熱部署 文章目錄 SpringBoot2.0系列--07--熱部署 前言 介紹 總流程 具體操作 聯絡方式 前言 JDK出11了,SpringBoot
SpringBoot2.0系列--08--打包jar和war包
SpringBoot2.0系列–08–打包jar和war包 文章目錄 SpringBoot2.0系列--08--打包jar和war包 前言 介紹 總流程 具體操作 聯絡方式 前言 JDK出
Cris 玩轉大資料系列之訊息佇列神器 Kafka
Cris 玩轉大資料系列之訊息佇列神器 Kafka Author:Cris 文章目錄 Cris 玩轉大資料系列之訊息佇列神器 Kafka Author:Cris 1. Kafka 概述
SpringBoot2.0系列--01--HelloWorld
SpringBoot2.0系列–01–HelloWorld 前言 JDK出11了,SpringBoot出2.0了,還沒有系統的學習過,剛好最近專案中有使用到,就把一些關鍵的東西列出來,避免忘記吧 準備工具 IntelliJ IDEA 2018.2.4 Mav
SpringBoot2.0系列--04--最簡單的Mybatis連線資料庫
SpringBoot2.0系列–04–最簡單的Mybatis連線資料庫 前言 JDK出11了,SpringBoot出2.0了,還沒有系統的學習過,剛好最近專案中有使用到,就把一些關鍵的東西列出來,避免忘記 SpringBoot2.0系列–00–目錄 介紹 寫完
叢集與負載均衡系列(4)——訊息佇列之Rabbitmq的搭建
前面的三篇文章介紹了共享session,從這篇文章開始介紹訊息佇列,這裡用的是Rabbitmq。對於Rabbitmq的一些基本概念,不打算在這裡總結了。因為網上有大把總結的不錯的文章,比如點選開啟連結 這篇文章介紹Rabbitmq的安裝。
SpringBoot2.0整合MQTT訊息推送功能
這幾天在弄後端管理系統向指定的Android客戶端推送訊息的功能模組,查閱了網上很多部落格介紹的許多方式,最終選擇基於MQTT協議來實現,MQTT是一個輕量級的訊息釋出/訂閱協議,它是實現基於手機客戶端的訊息推送伺服器的理想解決方案。 實現M
叢集與負載均衡系列(7)——訊息佇列之分散式事務
XA協議: 為了解決分散式事務,各大廠家資料庫都提供了xa協議介面。什麼是XA協議,就是通過多階段提交,確保資料一致性。以兩階段提交為例 第一階段為準備階段,
大型網站架構系列——分散式訊息佇列
轉載自:https://www.cnblogs.com/itfly8/p/5155983.html 訊息佇列概述 訊息佇列使用場景 1、訊息佇列概述: 訊息佇列中介軟體是分散式系統中重要的元件,主要解決 應用耦合,非同步訊息,流量削鋒 等問題。實現高效能,高可用,可伸縮和最終一致性架構。是大型
SpringBoot熱部署devtool和配置檔案自動注入(SpringBoot2.0系列-二)
1、SpringBoot2.x使用Dev-tool熱部署 簡介:什麼是熱部署,使用springboot結合dev-tool工具,快速載入啟動應用 核心依賴包: <dependency> <groupId>o
XXL-MQ v1.2.0,分散式訊息佇列
Release Notes 1、client端與Broker長鏈初始化優化,防止重複建立連線。 2、POM多項依賴升級; 3、UI元件升級; 4、規範專案目錄結構; 6、超時控制; 5、通訊遷移至 xxl-rpc; 6、除了springboot型別示例;新增無框架
網際網路面試開小灶系列之訊息佇列(一)
目錄 背景 為什麼使用訊息佇列 訊息佇列有什麼優缺點 訊息佇列的選型 重複消費你們是怎麼解決的? @(目錄) 背景 程式設計師不懂點訊息佇列的知識,怎麼能證明你
高併發架構系列:如何從0到1設計一個MQ訊息佇列
訊息佇列作為系統解耦,流量控制的利器,成為分散式系統核心元件之一。 如果你對訊息佇列背後的實現原理關注不多,其實瞭解訊息佇列背後的實現非常重要。 不僅知其然還要知其所以然,這才是一個優秀的工程師需要具備的特徵。 今天,我們就一起來探討設計一個訊息佇列背後的技術。 訊息佇列整體設計思路 主要是設計
SpringBoot2.0高階案例(07) 整合:Redis叢集 ,實現訊息佇列場景
本文原始碼 GitHub地址:知了一笑 https://github.com/cicadasmile/middle-ware-pa
大型網站架構系列:分散式訊息佇列(一)(轉)
以下是訊息佇列以下的大綱,本文主要介紹訊息佇列概述,訊息佇列應用場景和訊息中介軟體示例(電商,日誌系統)。 本次分享大綱 訊息佇列概述 訊息佇列應用場景 訊息中介軟體示例 JMS訊息服務(見第二篇:大型網站架構系列:分散式訊息佇列(二)) 常用訊息佇列(見第二篇:大型網站架構系列:分
springboot2.0 快速使用教程系列
1 springboot2.0專案搭建篇 (一)springboot2.0快速專案搭建和專案的配置(Spring Tool Suite(STS)) (二)springboot2.0快速專案搭建和專案的配置(IntellJ IDEA) (三)springboot2.0快速專案搭建和專
springboot2.x簡單詳細教程--訊息佇列介紹及整合ActiveMQ (第十三章)
一、JMS介紹和使用場景及基礎程式設計模型 簡介:講解什麼是小寫佇列,JMS的基礎知識和使用場景 1、什麼是JMS: Java訊息服務(Java Message Service),Java平臺中關於面向訊息中介