spring boot2.1整合RabbitMQ
轉載請表明出處 https://blog.csdn.net/Amor_Leo/article/details/85085697 謝謝
spring boot2.1整合RabbitMQ
簡單介紹
RabbitMQ 即訊息佇列,主要是用來實現應用程式的非同步和解耦,同時也能起到訊息緩衝,訊息分發,削峰等作用.RabbitMQ使用的是AMQP(高階訊息佇列協議).預設啟動埠 5672,UI介面15672.
Direct是RabbitMQ預設的交換機模式,也是最簡單的模式,直連模式.即建立訊息佇列的時候,指定一個RouteKey.當傳送者傳送訊息的時候,指定對應的Key.當Key和訊息佇列的RouteKey一致的時候,訊息將會被髮送到該訊息佇列中.也可以使用rabbitMQ自帶的Exchange:default Exchange 。所以不需要將Exchange進行任何繫結(binding)操作 。訊息傳遞時,RouteKey必須完全匹配,才會被佇列接收,否則該訊息會被拋棄。
topic轉發資訊主要是依據萬用字元,佇列和交換機的繫結主要是依據一種模式(萬用字元+字串),而當傳送訊息的時候,只有指定的Key和該模式相匹配的時候,訊息才會被髮送到該訊息佇列中.其中* 一個字,# 0個或多個字.
Fanout是路由廣播的模式,將會把訊息發給繫結它的全部佇列,即便設定了key,也會被忽略.
headers也是根據一個規則進行匹配,在訊息佇列和交換機繫結的時候會指定一組鍵值對規則,而傳送訊息的時候也會指定一組鍵值對規則,當兩組鍵值對規則相匹配的時候,訊息會被髮送到匹配的訊息佇列中.
安裝RabbitMQ
Erlang語言寫的,安裝Rabbit,必須Erlang語言的執行環境。
Docker能打包安裝環境,使用docker,很容易解決這個問題。
放行虛擬機器埠
firewall-cmd --zone=public --add-port=埠號/tcp --permanent
firewall-cmd --reload
獲取映象:
docker pull rabbitmq:management
執行例項:
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:management
新增依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
簡單配置
簡單yml配置
spring:
rabbitmq:
host: 192.168.0.114
port: 5672
username: admin
password: admin
virtual-host: /
配置類
/**
* @author: LHL
* @ProjectName: rabbitmq
* @Package: com.amor.config
* @ClassName: RabbitmqConf
* @Date: 2018/11/18 21:06
* @Description:
* @Version: 1.0
*/
@Configuration
public class RabbitmqConf {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitmqConf.class);
/**
* 訊息交換機的名字
* */
private static final String DIRECT_EXCHANGE = "DirectExchange";
private static final String TOPIC_EXCHANGE = "TopicExchange";
private static final String FANOUT_EXCHANGE ="FanoutExchange" ;
private static final String HEADERS_EXCHANGE ="HeadersExchange" ;
/**
* 佇列的名字
* */
private static final String DIRECT_QUEUE = "DirectQueue";
private static final String TOPIC_QUEUE = "TopicQueue";
private static final String FANOUT_QUEUE = "FanoutQueue";
private static final String HEADERS_QUEUE = "HeadersQueue";
/**
* key
* */
private static final String DIRECT_KEY = "DirectKey";
private static final String TOPIC_KEY = "Topic.#";
/**
* 1.佇列名字
* 2.durable="true" 是否持久化 rabbitmq重啟的時候不需要建立新的佇列
* 3.auto-delete 表示訊息佇列沒有在使用時將被自動刪除 預設是false
* 4.exclusive 表示該訊息佇列是否只在當前connection生效,預設是false
*/
@Bean
public Queue dirctQueue() {
return new Queue(DIRECT_QUEUE,true,false,false);
}
@Bean
public Queue topicQueue() {
return new Queue(TOPIC_QUEUE,true,false,false);
}
@Bean
public Queue fanoutQueue() {
return new Queue(FANOUT_QUEUE,true,false,false);
}
@Bean
public Queue headersQueue() {
return new Queue(HEADERS_QUEUE,true,false,false);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DIRECT_EXCHANGE,true,false);
}
/**
* 1.交換機名字
* 2.durable="true" 是否持久化 rabbitmq重啟的時候不需要建立新的交換機
* 3.autoDelete 當所有消費客戶端連線斷開後,是否自動刪除佇列
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHANGE,true,false);
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE,true,false);
}
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange(HEADERS_EXCHANGE,true,false);
}
/**
* 將direct佇列和交換機進行繫結
*/
@Bean
public Binding bindingDirect() {
return BindingBuilder.bind(dirctQueue()).to(directExchange()).with(DIRECT_KEY);
}
@Bean
public Binding bindingTopic() {
return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(TOPIC_KEY);
}
@Bean
public Binding bindingFanout() {
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}
@Bean
public Binding headersBinding(){
Map<String,Object> map = new HashMap<>();
map.put("headers1","value1");
map.put("headers2","value2");
return BindingBuilder.bind(headersQueue()).to(headersExchange()).whereAll(map).match();
}
/**
* 定義訊息轉換例項 轉化成 JSON 傳輸 傳輸實體就可以不用實現序列化
* */
@Bean
public MessageConverter integrationEventMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
測試controller
/**
* @author: LHL
* @ProjectName: rabbitmq
* @Package: com.amor.controller
* @ClassName: RabbitmqController
* @Date: 2018/11/18 21:33
* @Description:
* @Version: 1.0
*/
@RestController
public class RabbitmqController {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitmqController.class);
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @Description: 傳送訊息
* 1.交換機
* 2.key
* 3.訊息
* 4.訊息ID
* rabbitTemplate.send(message); 發訊息;引數物件為org.springframework.amqp.core.Message
* rabbitTemplate.convertAndSend(message); 轉換併發送訊息;將引數物件轉換為org.springframework.amqp.core.Message後傳送,消費者不能有返回值
* rabbitTemplate.convertSendAndReceive(message) //轉換併發送訊息,且等待訊息者返回響應訊息.消費者可以有返回值
* @method: handleMessage
* @Param: message
* @return: void
* @auther: LHL
* @Date: 2018/11/18 21:40
*/
@GetMapping("/directSend")
public void directSend() {
String message="direct 傳送訊息";
rabbitTemplate.convertAndSend("DirectExchange","DirectKey",
message, new CorrelationData(UUID.randomUUID().toString()));
}
@GetMapping("/topicSend")
public void topicSend() {
String message="topic 傳送訊息";
rabbitTemplate.convertAndSend("TopicExchange","Topic.Key",
message, new CorrelationData(UUID.randomUUID().toString()));
}
@GetMapping("/fanoutSend")
public void fanoutSend() {
String message="fanout 傳送訊息";
rabbitTemplate.convertAndSend("FanoutExchange","",message, new CorrelationData(UUID.randomUUID().toString()));
}
@GetMapping("/headersSend")
public void headersSend(){
String msg="headers 傳送訊息";
MessageProperties properties = new MessageProperties();
properties.setHeader("headers1","value1");
properties.setHeader("headers2","value2");
Message message = new Message(msg.getBytes(),properties);
rabbitTemplate.convertAndSend("HeadersExchange","",message, new CorrelationData(UUID.randomUUID().toString()));
}
/**
* @Description: 消費訊息
* @method: handleMessage
* @Param: message
* @return: void
* @auther: LHL
* @Date: 2018/11/18 21:41
*/
@RabbitListener(queues = "DirectQueue")
@RabbitHandler
public void directMessage(String message){
LOGGER.info("DirectConsumer {} directMessage :"+message);
}
@RabbitListener(queues = "TopicQueue")
@RabbitHandler
public void topicMessage(String message){
LOGGER.info("TopicConsumer {} topicMessage :"+message);
}
@RabbitListener(queues = "FanoutQueue")
@RabbitHandler
public void fanoutMessage(String message){
LOGGER.info("FanoutConsumer {} fanoutMessage :"+message);
}
@RabbitListener(queues = "HeadersQueue")
@RabbitHandler
public void headersMessage(Message message){
LOGGER.info("HeadersConsumer {} headersMessage :"+message);
}
}
開啟訊息傳送交換機確認和佇列與交換器是否繫結確認
yml配置
spring:
rabbitmq:
host: 192.168.0.114
port: 5672
username: admin
password: admin
virtual-host: /
# 開啟發送確認
publisher-confirms: true
# 開啟發送失敗退回
publisher-returns: true
listener:
direct:
retry:
enabled: true #消費者端的重試
simple:
retry:
enabled: true #消費者端的重試
template:
reply-timeout: 10000 #超時時間
retry:
enabled: true #設定為true的時候RabbitTemplate(生產端)能夠實現重試
配置類
/**
* @author: LHL
* @ProjectName: rabbitmq
* @Package: com.amor.config
* @ClassName: RabbitSender
* @Date: 2018/11/18 23:24
* @Description:
* @Version: 1.0
*/
@Service
public class RabbitSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
private final Logger logger = LoggerFactory.getLogger(RabbitSender.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
/**
* 實現訊息傳送到RabbitMQ交換器後接收ack回撥,如果訊息傳送確認失敗就進行重試.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.info("訊息傳送成功,訊息ID:{}", correlationData.getId());
} else {
logger.info("訊息傳送失敗,訊息ID:{}", correlationData.getId());
}
}
/**
* 實現訊息傳送到RabbitMQ交換器,但無相應佇列與交換器繫結時的回撥.
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.error("訊息傳送失敗,replyCode:{}, replyText:{},exchange:{},routingKey:{},訊息體:{}",replyCode, replyText, exchange, routingKey, new String(message.getBody()));
}
/**
* convertAndSend 非同步,訊息是否傳送成功用ConfirmCallback和ReturnCallback回撥函式類確認。
* 傳送MQ訊息
*/
public void sendMessage(String exchangeName, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchangeName, routingKey, message, new CorrelationData(UUID.randomUUID().toString()));
}
/**
* sendMessageAndReturn 當傳送訊息過後,該方法會一直阻塞在哪裡等待返回結果,直到請求超時,配置spring.rabbitmq.template.reply-timeout來配置超時時間。
* 傳送MQ訊息並返回結果
*/
public Object sendMessageAndReturn(String exchangeName, String routingKey, Object message) {
return rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, message, new CorrelationData(UUID.randomUUID().toString()));
}
}
/**
* @author: LHL
* @ProjectName: rabbitmq
* @Package: com.amor.controller
* @ClassName: RabbitmqController
* @Date: 2018/11/18 21:33
* @Description:
* @Version: 1.0
*/
@RestController
public class RabbitmqController {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitmqController.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitSender rabbitSender;
/**
* @Description: 傳送訊息
* 1.交換機
* 2.key
* 3.訊息
* 4訊息ID
* rabbitTemplate.send(message); 發訊息;引數物件為org.springframework.amqp.core.Message
* rabbitTemplate.convertAndSend(message); 轉換併發送訊息;將引數物件轉換為org.springframework.amqp.core.Message後傳送,消費者不能有返回值
* rabbitTemplate.convertSendAndReceive(message) //轉換併發送訊息,且等待訊息者返回響應訊息.消費者可以有返回值
* @method: handleMessage
* @Param: message
* @return: void
* @auther: LHL
* @Date: 2018/11/18 21:40
*/
@GetMapping("/directSend")
public void directSend() {
String message = "direct 傳送訊息";
//rabbitTemplate.convertAndSend("DirectExchange", "DirectKey", message, new CorrelationData(UUID.randomUUID().toString()));
rabbitSender.sendMessage("DirectExchange", "DirectKey", message);
}
@GetMapping("/topicSend")
public void topicSend() {
String message = "topic 傳送訊息";
//rabbitTemplate.convertAndSend("TopicExchange", "Topic.Key", message, new CorrelationData(UUID.randomUUID().toString()));
rabbitSender.sendMessage("TopicExchange", "Topic.Key", message);
}
@GetMapping("/fanoutSend")
public void fanoutSend() {
String message = "fanout 傳送訊息";
//rabbitTemplate.convertAndSend("FanoutExchange", "", message, new CorrelationData(UUID.randomUUID().toString()));
rabbitSender.sendMessage("FanoutExchange", "", message);
}
@GetMapping("/headersSend")
public void headersSend() {
String msg = "headers 傳送訊息";
MessageProperties properties = new MessageProperties();
properties.setHeader("headers1", "value1");
properties.setHeader("headers2", "value2");
Message message = new Message(msg.getBytes(), properties);
//rabbitTemplate.convertAndSend("HeadersExchange","",message, new CorrelationData(UUID.randomUUID().toString()));
rabbitSender.sendMessage("HeadersExchange", "", message);
}
/**
* @Description: 消費訊息
* @method: handleMessage
* @Param: message
* @return: void
* @auther: LHL
* @Date: 2018/11/18 21:41
*/
@RabbitListener(queues = "DirectQueue")
@RabbitHandler
public void directMessage(String sendMessage) {
LOGGER.info("DirectConsumer {} directMessage :" + message);
}
@RabbitListener(queues = "TopicQueue")
@RabbitHandler
public void topicMessage(String message) {
LOGGER.info("TopicConsumer {} topicMessage :" + message);
}
@RabbitListener(queues = "FanoutQueue")
@RabbitHandler
public void fanoutMessage(String message) {
LOGGER.info("FanoutConsumer {} fanoutMessage :" + message);
}
@RabbitListener(queues = "HeadersQueue")
@RabbitHandler
public void headersMessage(Message message) {
LOGGER.info("HeadersConsumer {} headersMessage :" + message);
}
}
開啟ACK手動確認模式
手動確認配置
第一種直接yml配置
spring:
rabbitmq:
host: 192.168.0.114
port: 5672
username: admin
password: admin
virtual-host: /
# 開啟發送確認
publisher-confirms: true
# 開啟發送失敗退回
publisher-returns: true
listener:
direct:
acknowledge-mode: manual # 開啟ACK
retry:
enabled: true #消費者端的重試
simple:
retry:
enabled: true #消費者端的重試
acknowledge-mode: manual # 開啟ACK
concurrency: 10 #消費者的最小數量
max-concurrency: 20 #消費者的最大數量
prefetch: 5 #在單個請求中處理的訊息個數,他應該大於等於事務數量
auto-startup: true #啟動時自動啟動容器 true
default-requeue-rejected: true #投遞失敗時是否重新排隊
template:
reply-timeout
相關推薦
spring boot2.1整合RabbitMQ
轉載請表明出處 https://blog.csdn.net/Amor_Leo/article/details/85085697 謝謝
spring boot2.1整合RabbitMQ
簡單介紹
安裝RabbitMQ
新增依賴
簡單配置
spring boot2.1.1.RELEASE整合Activiti 6.0.0 實戰程式設計
下載地址:https://download.csdn.net/download/yebichao/10869999內容包括:spring boot2.1.1.RELEASE整合Activiti 6實戰程式設計:包含資料庫、源程式、activiti參考書,Readme.在開發環境中匯入專案程式及資料庫即可執行。
Spring Boot2.X整合訊息中介軟體RabbitMQ原理簡淺探析
目錄
1、簡單概述RabbitMQ重要作用
2、簡單概述RabbitMQ重要概念
3、Spring Boot整合RabbitMQ
前言
RabbitMQ是一個訊息佇列,主要是用來實現應用程式的非
新手入門教程-------Spring Boot中整合RabbitMQ
AMQP:是Advanced Message Queuing Protocol的簡稱,高階訊息佇列協議,是一個面向訊息中介軟體的開放式標準應用層協議。
定義了以下特性:
訊息方向
訊息佇列
訊息路由(包括:點到點和釋出-訂閱模式)
可靠性
安全
spring boot2.1讀取 apollo 配置中心3
上篇記錄了springboot讀取apollo的配置資訊,以及如何獲取服務端的推送更新配置。
接下來記錄一下,如何獲取公共namespace的配置。
上文中使用如下程式碼共聚公共名稱空間的配置:
@ApolloConfig("TEST1.MiddleWare")
private
【Spring Boot】--整合RabbitMQ
目錄
0、前言
1、訊息流程
2、新增依賴
3、新增配置
4、新建配置類
5、新建生產者介面
6、新建生產者實現類
7、新建生產者控制器類
8、測試
0、前言
需要已經安裝RabbitMQ,並且啟動、配置好使用者。參考《基於Cen
Spring Boot2.x 整合lettuce redis 和 redisson
前言
springboot2之前redis的連線池為jedis,2.0以後redis的連線池改為了lettuce,lettuce能夠支援redis4,需要java8及以上。
lettuce是基於netty實現的與redis進行同步和非同步的通訊。
lettuce和jedi
Spring Boot2.x 整合quartz叢集
springboot2.x支援對quartz的自動配置,引入jar
<!-- spring boot2.x + quartz -->
<dependency>
&
Spring Boot2 + Mybatis 整合(Mybatis自動生成外掛、分頁外掛)
內容:
Spring Boot2 + Mybatis 整合
Mybatis Generator自動生成程式碼
Mybatis PageHelper分頁外掛
Spring Cloud Bus整合RabbitMQ ( 17 )
轉自 https://blog.csdn.net/u012702547/article/details/77823434
這個系列我感覺真的太好了,可以一步一步的瞭解spring cloud 的搭建以及更深層次的東西,對想學這門技術的朋友真的入門特別的快,感謝這位大哥的分享,我也會持續
Spring Boot2.0整合Mybatis(自動生成註解方式,多環境配置)
本介紹Spring Boot2.0整合Mybatis,通過MyBatis Generator外掛自動生成mapper的sql註解及Provider類的過程,支援多環境的yml配置檔案首先用IDE開發工具(IDEA,STS,Eclipse)建立一個Spring Boot工程sp
手把手教你Spring Boot2.x整合kafka
#首先得自己搭建一個kafka,搭建教程請自行百度,本人是使用docker搭建了一個單機版的zookeeper+kafka作為演示,文末會有完整程式碼包提供給大家下載參考
![](https://img2020.cnblogs.com/blog/1543487/202103/1543487-202103021
手把手教你Spring Boot2.x整合Elasticsearch(ES)
#文末會附上完整的程式碼包供大家下載參考,碼字不易,如果對你有幫助請給個點贊和關注,謝謝!
#如果只是想看java對於Elasticsearch的操作可以直接看第四大點
##一、docker部署Elasticsearch(下面簡稱es)單機版教程
###1、部署es
* 拉取es映象(這裡我使用的版本
spring boot 1.5.4 整合rabbitMQ(十七)
rabbitmq springboot springboot1.5.4 springboot整合jsp springboot整合rabbitmq 上一篇:spring boot 1.5.4 整合redis、攔截器、過濾器、監聽器、靜態資源配置(十六) 關於rabbitMQ原理,請參閱博客:
Spring Boot2整合Shiro(1):身份認證
Spring Boot2整合Shiro(1):身份認證
前言
本文主要介紹了在Spring Boot2專案中整合Shiro實現登入認證。本文假設讀者已經對Shiro和基於RBAC的許可權控制系統有了基本的認識。 本專案沒有資料庫,也就沒有dao層,所有的使用者和
Spring Boot2.0.5整合ElasticSearch6.4.1 叢集
轉載請標明出處 https://blog.csdn.net/Amor_Leo/article/details/83012038 謝謝 ES叢集搭建 https://blog.csdn.net/Amor_Leo/article/details/83011372 兩個節點 ES叢集搭建
RabbitMQ (九) Spring整合RabbitMQ(1)
前面幾篇講解了如何使用rabbitMq,這一篇主要講解spring整合rabbitmq。
首先引入配置檔案org.springframework.amqp,如下
<dependency>
<groupId>com.rabbitmq&
spring mvc +Mybatis3.1 整合的時候異常
factory lec sta error .get for 1.0 character mod
在使用Mybatis3.10+spring3.10+mybatis-spring-1.0.0集成,使用spring 時發生例如以下錯誤:
嚴重: Servlet
spring boot實戰(第十二篇)整合RabbitMQ
this direct 還需要 添加屬性 創建 還需 topic start routing 前言
本篇主要講述Spring Boot與RabbitMQ的整合,內容非常簡單,純API的調用操作。 操作之間需要加入依賴Jar
[html] view plain cop
spring boot2 整合(一)Mybatis (特別完整!)
one OS solver mapper ant gin enabled selectall pom
大概介紹下流程:
借助idea實現mybatis逆向工程
用xml配置實現整合
用cmd命令行實現mybatis逆向工程
用mapping.xml配置實現數據交互