Docker下RabbitMQ延時佇列實戰兩部曲之二:細說開發
本章是《Docker下RabbitMQ延時佇列實戰兩部曲》的終篇,上一章《Docker下RabbitMQ延時佇列實戰兩部曲之一:極速體驗》我們快速體驗了延時佇列的生產和消費,今天來實戰整個開發過程;
SpringBoot框架下進行RabbitMQ開發,相關知識連結
本章涉及的指令碼和原始碼下載
本章會開發一個yml指令碼,三個基於SpringBoot的應用,功能如下:
1. docker-compose.yml:啟動所有容器的docker-compose指令碼;
2. delayrabbitmqconsumer:SpringBoot框架的應用,連線RabbitMQ的兩個佇列,消費訊息;
3. messagettlproducer:SpringBoot框架的應用,收到web請求後向RabbitMQ傳送訊息,訊息中帶有過期時間(TTL);
4. queuettlproducer:SpringBoot框架的應用,收到web請求後向RabbitMQ傳送訊息,訊息中不帶過期時間(TTL),但是對應的訊息佇列已經設定了過期時間;
整體部署情況如下:
上述指令碼和工程的原始碼都可以在github下載,地址和連結資訊如下表所示:
名稱 | 連結 | 備註 |
---|---|---|
git倉庫地址(ssh) | [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 |
這個git專案中有多個資料夾,三個SpringBoot工程分別在delayrabbitmqconsumer、messagettlproducer、queuettlproducer這三個資料夾下,如下圖的三個紅框所示:
docker-compose.yml檔案在rabbitmq_docker_files資料夾下面的delaymq資料夾下,如下圖:
環境資訊
作業系統:Ubuntu 16.04.3 LTS
Docker:1.12.6
RabbitMQ:3.7.5-rc.1
JDK:1.8.0_111
SpringBoot:1.4.1.RELEASE
Maven:3.5.0
開發步驟
本次開發實戰的步驟如下:
1. 開發messagettlproducer應用,製作映象;
2. 開發queuettlproducer應用,製作映象;
3. 開發delayrabbitmqconsumer應用,製作映象;
4. 開發docker-compose.yml指令碼;
messagettlproducer應用
messagettlproducer是個基於SpringBoot的web工程,有一個Controller可以響應web請求,收到請求後傳送一條帶有過期時間的訊息到RabbitMQ的message.ttl.queue.source佇列;
1. pom.xml內容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bolingcavalry</groupId>
<artifactId>messagettlproducer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>messagettlproducer</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>com.spotify</groupId>
<artifactId>docker-maven-plugin</artifactId>
<version>0.4.12</version>
<!--docker映象相關的配置資訊-->
<configuration>
<!--映象名,這裡用工程名-->
<imageName>bolingcavalry/${project.artifactId}</imageName>
<!--TAG,這裡用工程版本號-->
<imageTags>
<imageTag>${project.version}</imageTag>
</imageTags>
<!--映象的FROM,使用java官方映象-->
<baseImage>java:8u111-jdk</baseImage>
<!--該映象的容器啟動後,直接執行spring boot工程-->
<entryPoint>["java", "-jar", "/${project.build.finalName}.jar"]</entryPoint>
<!--構建映象的配置資訊-->
<resources>
<resource>
<targetPath>/</targetPath>
<directory>${project.build.directory}</directory>
<include>${project.build.finalName}.jar</include>
</resource>
</resources>
</configuration>
</plugin>
</plugins>
</build>
</project>
上面的內容中有以下兩點需要注意:
a. 新增對spring-boot-starter-amqp的依賴,這裡面是操作RabbitMQ所需的庫;
b. 新增docker-maven-plugin外掛,可以將當前工程直接製作成Docker映象;
2. src/main/resources資料夾下面建立application.properties檔案,內容如下,只配置了應用名稱和RabbitMQ的virtualHost路徑:
spring.application.name=messagettlproducer
mq.rabbit.virtualHost=/
3. RabbitTemplateConfig.java檔案中是應用連線RabbitMQ的配置資訊:
@Configuration
public class RabbitTemplateConfig {
@Value("${mq.rabbit.address}")
String address;
@Value("${mq.rabbit.username}")
String username;
@Value("${mq.rabbit.password}")
String password;
@Value("${mq.rabbit.virtualHost}")
String mqRabbitVirtualHost;
//建立mq連線
@Bean(name = "connectionFactory")
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(mqRabbitVirtualHost);
connectionFactory.setPublisherConfirms(true);
//該方法配置多個host,在當前連線host down掉的時候會自動去重連後面的host
connectionFactory.setAddresses(address);
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必須是prototype型別
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
}
上面的程式碼有以下幾點要注意:
a. address、username、password這些變數的值,是從作業系統的環境變數中獲取的,我們在啟動Docker容器的時候將這些值配置到容器的環境變數中,程式執行的時候就能取到了;
b. connectionFactory()方法根據上述配置引數和RabbitMQ建立連線;
c. rabbitTemplate()建立RabbitTemplate物件,我們可以在其他Bean中通過Autowire使用;
4. MessageTtlRabbitConfig.java類中是和訊息佇列相關的配置:
/**
* 成為死信後,重新發送到的交換機的名稱
*/
@Value("${message.ttl.exchange}")
private String MESSAGE_TTL_EXCHANGE_NAME;
/**
* 不會被消費的佇列,投遞到此佇列的訊息會成為死信
*/
@Value("${message.ttl.queue.source}")
private String MESSAGE_TTL_QUEUE_SOURCE;
/**
* 該佇列被繫結到接收死信的交換機
*/
@Value("${message.ttl.queue.process}")
private String MESSAGE_TTL_QUEUE_PROCESS;
/**
* 配置一個佇列,該佇列的訊息如果沒有被消費,就會投遞到死信交換機中,並且帶上指定的routekey
* @return
*/
@Bean
Queue messageTtlQueueSource() {
return QueueBuilder.durable(MESSAGE_TTL_QUEUE_SOURCE)
.withArgument("x-dead-letter-exchange", MESSAGE_TTL_EXCHANGE_NAME)
.withArgument("x-dead-letter-routing-key", MESSAGE_TTL_QUEUE_PROCESS)
.build();
}
@Bean("messageTtlQueueProcess")
Queue messageTtlQueueProcess() {
return QueueBuilder.durable(MESSAGE_TTL_QUEUE_PROCESS) .build();
}
@Bean("messageTtlExchange")
DirectExchange messageTtlExchange() {
return new DirectExchange(MESSAGE_TTL_EXCHANGE_NAME);
}
/**
* 繫結指定的佇列到死信交換機上
* @param messageTtlQueueProcess
* @param messageTtlExchange
* @return
*/
@Bean
Binding bindingExchangeMessage(@Qualifier("messageTtlQueueProcess") Queue messageTtlQueueProcess, @Qualifier("messageTtlExchange") DirectExchange messageTtlExchange) {
System.out.println("11111111111111111111111111111111111111111111111111");
System.out.println("11111111111111111111111111111111111111111111111111");
System.out.println("11111111111111111111111111111111111111111111111111");
System.out.println("11111111111111111111111111111111111111111111111111");
return BindingBuilder.bind(messageTtlQueueProcess)
.to(messageTtlExchange)
.with(MESSAGE_TTL_QUEUE_PROCESS);
}
上面的程式碼有以下幾點要注意:
a. MESSAGE_TTL_EXCHANGE_NAME、MESSAGE_TTL_QUEUE_SOURCE、MESSAGE_TTL_QUEUE_PROCESS這些變數的值,是從作業系統的環境變數中獲取的,我們在啟動Docker容器的時候將這些值配置到容器的環境變數中,程式執行的時候就能取到了;
b. connectionFactory()方法根據上述配置引數和RabbitMQ建立連線;
c. rabbitTemplate()建立RabbitTemplate物件,我們可以在其他Bean中通過Autowire使用;
d. messageTtlQueueSource()方法建立了一個佇列用於投遞訊息,通過x-dead-letter-exchange和x-dead-letter-routing-key這兩個引數,設定了佇列訊息過期後轉發的交換機名稱,以及攜帶的routing key;
- 為了設定訊息過期,我們還要定製一個ExpirationMessagePostProcessor類,作用是將給訊息類設定過期時間,後面傳送訊息時會用到這個類:
package com.bolingcavalry.messagettlproducer;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
/**
* @Description :
* @Author : [email protected]
* @Date : 2018-06-02 23:33
*/
public class ExpirationMessagePostProcessor implements MessagePostProcessor {
private final Long ttl; // 毫秒
public ExpirationMessagePostProcessor(Long ttl) {
this.ttl = ttl;
}
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties() .setExpiration(ttl.toString()); // 設定per-message的失效時間
return message;
}
}
6. 用於處理web請求的SendMessageController 類,原始碼如下:
/**
* @Description : 用於生產訊息的web介面類
* @Author : [email protected]
* @Date : 2018-06-02 23:00
*/
@RestController
public class SendMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${message.ttl.queue.source}")
private String MESSAGE_TTL_QUEUE_SOURCE;
/**
* 生產一條訊息,訊息中帶有過期時間
* @param name
* @param message
* @param delaytime
* @return
*/
@RequestMapping(value = "/messagettl/{name}/{message}/{delaytime}", method = RequestMethod.GET)
public @ResponseBody
String messagettl(@PathVariable("name") final String name, @PathVariable("message") final String message, @PathVariable("delaytime") final int delaytime) {
SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String timeStr = simpleDateFormat.format(new Date());
String queueName = MESSAGE_TTL_QUEUE_SOURCE;
String sendMessage = String.format("hello, %s , %s, from queue [%s], delay %d's, %s", name, message, MESSAGE_TTL_QUEUE_SOURCE, delaytime, timeStr);
rabbitTemplate.convertAndSend(MESSAGE_TTL_QUEUE_SOURCE,
(Object)sendMessage,
new ExpirationMessagePostProcessor(delaytime*1000L));
return "send message to [" + name + "] success , queue is : " + queueName + " (" + timeStr + ")";
}
}
如上所示,傳送訊息的程式碼很簡單,呼叫rabbitTemplate的convertAndSend就能傳送訊息到message.ttl.queue.source佇列(指定路由鍵的Direct方式),再傳入ExpirationMessagePostProcessor作為處理訊息的工具;
以上就是messagettlproducer應用的主要程式碼介紹,編碼完畢後,在pom.xml檔案所在目錄執行mvn clean package -U -DskipTests docker:build,即可編譯、構建、製作Docker映象;
queuettlproducer應用
queuettlproducer和messagettlproducer極為相似,都是接受web請求後向RabbitMQ傳送訊息,不同之處有以下兩點:
1. queuettlproducer在繫結佇列的時候,會設定佇列上所有訊息的過期時間,messagettlproducer沒做這個設定;
2. queuettlproducer在傳送訊息的時候,沒有設定該訊息的過期時間,messagettlproducer會對每條訊息都設定過期時間;
因此,queuettlproducer和messagettlproducer這兩個應用的程式碼大部分是相同的,這裡只要關注不同的部分即可;
- 佇列和交換機的配置類,QueueTtlRabbitConfig:
@Configuration
public class QueueTtlRabbitConfig {
/**
* 成為死信後,重新發送到的交換機的名稱
*/
@Value("${queue.ttl.exchange}")
private String QUEUE_TTL_EXCHANGE_NAME;
/**
* 不會被消費的佇列,投遞到此佇列的訊息會成為死信
*/
@Value("${queue.ttl.queue.source}")
private String QUEUE_TTL_QUEUE_SOURCE;
/**
* 該佇列被繫結到接收死信的交換機
*/
@Value("${queue.ttl.queue.process}")
private String QUEUE_TTL_QUEUE_PROCESS;
@Value("${queue.ttl.value}")
private long QUEUE_TTL_VALUE;
/**
* 配置一個佇列,該佇列有訊息過期時間,訊息如果沒有被消費,就會投遞到死信交換機中,並且帶上指定的routekey
* @return
*/
@Bean
Queue queueTtlQueueSource() {
return QueueBuilder.durable(QUEUE_TTL_QUEUE_SOURCE)
.withArgument("x-dead-letter-exchange", QUEUE_TTL_EXCHANGE_NAME)
.withArgument("x-dead-letter-routing-key", QUEUE_TTL_QUEUE_PROCESS)
.withArgument("x-message-ttl", QUEUE_TTL_VALUE)
.build();
}
@Bean("queueTtlQueueProcess")
Queue queueTtlQueueProcess() {
return QueueBuilder.durable(QUEUE_TTL_QUEUE_PROCESS) .build();
}
@Bean("queueTtlExchange")
DirectExchange queueTtlExchange() {
return new DirectExchange(QUEUE_TTL_EXCHANGE_NAME);
}
/**
* 繫結
* @param queueTtlQueueProcess
* @param queueTtlExchange
* @return
*/
@Bean
Binding bindingExchangeMessage(@Qualifier("queueTtlQueueProcess") Queue queueTtlQueueProcess, @Qualifier("queueTtlExchange") DirectExchange queueTtlExchange) {
System.out.println("22222222222222222222222222222222222222222222222222");
System.out.println("22222222222222222222222222222222222222222222222222");
System.out.println("22222222222222222222222222222222222222222222222222");
System.out.println("22222222222222222222222222222222222222222222222222");
return BindingBuilder.bind(queueTtlQueueProcess)
.to(queueTtlExchange)
.with(QUEUE_TTL_QUEUE_PROCESS);
}
}
上述程式碼請注意以下兩點:
a. queueTtlQueueSource()方法用來設定佇列,除了x-dead-letter-exchange和x-dead-letter-routing-key這兩個引數,還多了x-message-ttl,此引數對應的值就是進入該佇列的每一條訊息的過期時間;
b. bindingExchangeMessage()方法將佇列queue.ttl.queue.source繫結到Direct模式的交換機;
2. 處理web請求的SendMessageController類:
@RestController
public class SendMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${queue.ttl.queue.source}")
private String QUEUE_TTL_QUEUE_SOURCE;
/**
* 生產一條訊息,訊息中不帶過期時間,但是對應的佇列中已經配置了過期時間
* @param name
* @param message
* @return
*/
@RequestMapping(value = "/queuettl/{name}/{message}", method = RequestMethod.GET)
public @ResponseBody
String queuettl(@PathVariable("name") final String name, @PathVariable("message") final String message) {
SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String timeStr = simpleDateFormat.format(new Date());
String queueName = QUEUE_TTL_QUEUE_SOURCE;
String sendMessage = String.format("hello, %s , %s, from queue [%s], %s", name, message, queueName, timeStr);
rabbitTemplate.convertAndSend(queueName, sendMessage);
return "send message to [" + name + "] success , queue is : " + queueName + " (" + timeStr + ")";
}
}
如上所示,傳送訊息時只有routing key和訊息物件這兩個引數;
以上就是傳送訊息到佇列的應用原始碼,編碼完畢後,在pom.xml檔案所在目錄執行mvn clean package -U -DskipTests docker:build,即可編譯、構建、製作Docker映象;
接下來我們看看訊息消費者工程delayrabbitmqconsumer的原始碼;
delayrabbitmqconsumer應用
delayrabbitmqconsumer應用連線到訊息佇列,消費收到的每條訊息;
- RabbitTemplateConfig.java是連線到RabbitMQ的配置資訊,和前面兩個應用一樣,不再贅述;
- 消費message.ttl.queue.process這個佇列發出的訊息,對應實現類是MessageTtlReceiver:
/**
* @Description : 訊息接受類,接收第一類延時訊息(在每條訊息中指定過期時間)的轉發結果
* @Author : [email protected]
* @Date : 2018-06-03 9:52
*/
@Component
@RabbitListener(queues = "${message.ttl.queue.process}")
public class MessageTtlReceiver {
private static final Logger logger = LoggerFactory.getLogger(MessageTtlReceiver.class);
@RabbitHandler
public void process(String message) {
logger.info("receive message : " + message);
}
}
如上所示,只要用註解RabbitListener配置好佇列的名稱即可,編碼完畢後,在pom.xml檔案所在目錄執行mvn clean package -U -DskipTests docker:build,即可編譯、構建、製作Docker映象;
docker-compose.yml配置
最後我們看一下所有容器的配置檔案docker-compose.yml:
version: '2'
services:
rabbit1:
image: bolingcavalry/rabbitmq-server:0.0.3
hostname: rabbit1
ports:
- "15672:15672"
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=888888
rabbit2:
image: bolingcavalry/rabbitmq-server:0.0.3
hostname: rabbit2
depends_on:
- rabbit1
links:
- rabbit1
environment:
- CLUSTERED=true
- CLUSTER_WITH=rabbit1
- RAM_NODE=true
- HA_ENABLE=true
ports:
- "15673:15672"
rabbit3:
image: bolingcavalry/rabbitmq-server:0.0.3
hostname: rabbit3
depends_on:
- rabbit2
links:
- rabbit1
- rabbit2
environment:
- CLUSTERED=true
- CLUSTER_WITH=rabbit1
ports:
- "15675:15672"
messagettlproducer:
image: bolingcavalry/messagettlproducer:0.0.1-SNAPSHOT
hostname: messagettlproducer
depends_on:
- rabbit3
links:
- rabbit1:rabbitmqhost1
- rabbit2:rabbitmqhost2
- rabbit3:rabbitmqhost3
ports:
- "18080:8080"
environment:
- mq.rabbit.address=rabbitmqhost1:5672,rabbitmqhost2:5672,rabbitmqhost3:5672
- mq.rabbit.username=admin
- mq.rabbit.password=888888
- message.ttl.exchange=message.ttl.exchange
- message.ttl.queue.source=message.ttl.queue.source
- message.ttl.queue.process=message.ttl.queue.process
queuettlproducer:
image: bolingcavalry/queuettlproducer:0.0.1-SNAPSHOT
hostname: queuettlproducer
depends_on:
- messagettlproducer
links:
- rabbit1:rabbitmqhost1
- rabbit2:rabbitmqhost2
- rabbit3:rabbitmqhost3
ports:
- "18081:8080"
environment:
- mq.rabbit.address=rabbitmqhost1:5672,rabbitmqhost2:5672,rabbitmqhost3:5672
- mq.rabbit.username=admin
- mq.rabbit.password=888888
- queue.ttl.exchange=queue.ttl.exchange
- queue.ttl.queue.source=queue.ttl.queue.source
- queue.ttl.queue.process=queue.ttl.queue.process
- queue.ttl.value=5000
delayrabbitmqconsumer:
image: bolingcavalry/delayrabbitmqconsumer:0.0.1-SNAPSHOT
hostname: delayrabbitmqconsumer
depends_on:
- queuettlproducer
links:
- rabbit1:rabbitmqhost1
- rabbit2:rabbitmqhost2
- rabbit3:rabbitmqhost3
environment:
- mq.rabbit.address=rabbitmqhost1:5672,rabbitmqhost2:5672,rabbitmqhost3:5672
- mq.rabbit.username=admin
- mq.rabbit.password=888888
- message.ttl.queue.process=message.ttl.queue.process
- queue.ttl.queue.process=queue.ttl.queue.process
上述配置檔案有以下幾點需要注意:
1. rabbit1、rabbit2、rabbit3是RabbitMQ高可用叢集,如果您對RabbitMQ高可用叢集感興趣,推薦您請看《Docker下RabbitMQ四部曲》系列文章;
2. 三個SpringBoot應用都配置了mq.rabbit.address引數,值是三個RabbitMQ server的IP加埠,這樣如果RabbitMQ叢集中有一臺機器故障了也不會影響正常的訊息收發;
3. 使用了link引數後,容器內就能通過link的引數取代對應的IP;
至此,Docker下的RabbitMQ延時佇列實戰就完成了,實戰中Docker發揮的作用並不大,只是用來快速搭建環境,關鍵還是三個工程中對佇列的各種操作,希望本系列能幫助您快速構建延時佇列相關服務;
相關推薦
Docker下RabbitMQ延時佇列實戰兩部曲之二:細說開發
本章是《Docker下RabbitMQ延時佇列實戰兩部曲》的終篇,上一章《Docker下RabbitMQ延時佇列實戰兩部曲之一:極速體驗》我們快速體驗了延時佇列的生產和消費,今天來實戰整個開發過程; SpringBoot框架下進行RabbitMQ開發,相關
Docker下RabbitMQ延時佇列實戰兩部曲之一:極速體驗
有的應用場景中,向RabbitMQ發出訊息後,我們希望消費方不要立即消費,可以通過延時佇列來實現,思路是將訊息傳送到A佇列,此佇列沒有消費者,等訊息過期後會進入A佇列的Dead Letter Exchange中,B佇列綁定了這個Dead Letter Excha
Kubernetes持久卷實戰兩部曲之二:細說開發
在上一章《Kubernetes持久卷實戰兩部曲之一:極速體驗》我們體驗了K8S環境下基於NFS的持久卷讀寫,今天我們一起來了解整個體驗環境背後的細節; 全文概要 要完成上一章的體驗,需要做以下事情: 1. 建立PV; 2. 建立PVC; 3. 開發we
docker-compose下的java應用啟動順序兩部曲之二:實戰
上篇回顧 本文是《docker-compose下的java應用啟動順序兩部曲》的終篇,在上一篇《docker-compose下的java應用啟動順序兩部曲之一:問題分析》中,我們以SpringCloud環境下的註冊中心和業務服務為例,展示了docker-compose.yml中depends_on引數的不足
Springboot+rabbitmq實現延時佇列的兩種方式
什麼是延時佇列,延時佇列應用於什麼場景 延時佇列顧名思義,即放置在該佇列裡面的訊息是不需要立即消費的,而是等待一段時間之後取出消費。 那麼,為什麼需要延遲消費呢?我們來看以下的場景 網上商城下訂單後30分鐘後沒有完成支付,取消訂單(如:淘寶、去哪兒網) 系統
SpringBoot使用RabbitMQ延時佇列
延時佇列延時佇列的使用場景:1.訂單業務:在電商中,使用者下單後30分鐘後未付款則取消訂單。2.簡訊通知:使用者下單並付款後,1分鐘後發簡訊給使用者。延時佇列實現思路AMQP協議和RabbitMQ佇列本身沒有直接支援延遲佇列功能,但是我們可以通過RabbitMQ的兩個特性來曲
java實現rabbitMQ延時佇列詳解以及spring-rabbit整合教程
java實現rabbitMQ延時佇列詳解 這是我在公司開發中使用的倆套方案,感興趣的話可以看一下:點選下載 在實際的業務中我們會遇見生產者產生的訊息,不立即消費,而是延時一段時間在消費。RabbitMQ本身沒有直接支援延遲佇列功能,但是我們可以根據其特性Per-Queu
docker-compose下的java應用啟動順序兩部曲之一:問題分析
在docker-compose編排多個容器時,需要按實際情況控制各容器的啟動順序,本文是《docker-compose下的java應用啟動順序兩部曲》的第一篇,文中會分析啟動順序的重要性,以及啟動順序有問題時會有什麼樣的影響,再給出臨時解決的和官方推薦的兩種解決方案,為下一篇的實戰做好鋪墊。 環境資訊 本次實
Kubernetes持久卷實戰兩部曲之一:極速體驗
章節列表 整個《Kubernetes持久卷實戰》由以下兩篇文章組成: 1. 極速體驗靜態持久化儲存,也就是本章的內容; 2. 瞭解k8s的pod、service、pv、pvc的細節; 本章內容 本章目標是用最少的步驟和時間體驗PV,所以先不展開
kubernetes下的Nginx加Tomcat三部曲之二:細說開發
本文是《kubernetes下的Nginx加Tomcat三部曲》的第二章,在《kubernetes下的Nginx加Tomcat三部曲之一:極速體驗》一文我們快速部署了Nginx和Tomcat,達到以下效果: 本文我會詳細說明在kubernetes部署上述網
最小生成樹的兩個演算法之二:kruskal演算法
基本概念 樹(Tree):如果一個無向連通圖中不存在迴路,則這種圖稱為樹。生成樹 (Spanning Tree):無向連通圖G的一個子圖如果是一顆包含G的所有頂點的樹,則該子圖稱為G的生成樹。 生成樹是連通圖的極小連通子圖。這裡所謂極小是指:若在樹中任意增加一條邊,則將
RabbitMQ 的延時佇列和映象佇列原理與實戰
摘要:在阿里雲棲開發者沙龍PHP技術專場上,掌閱資深後端工程師、掘金小測《Redis深度歷險》作者錢文品為大家介紹了RabbitM
rabbitmq實現延時佇列(死信佇列)
基於佇列和基於訊息的TTL TTL是time to live 的簡稱,顧名思義指的是訊息的存活時間。rabbitMq可以從兩種維度設定訊息過期時間,分別是佇列和訊息本身。 佇列訊息過期時間-Per-Queue Message TTL: 通過設定佇列的x-message-ttl引數來設定指定佇列上訊息的存活時
Java 使用RabbitMQ外掛實現延時佇列
Springboot專案,windows環境 環境配置 在rabbitmq 3.5.7及以上的版本提供了一個外掛(rabbitmq-delayed-message-exchange)來實現延遲佇列功能。同時外掛依賴Erlang/OPT 18.0及以上。 外掛下載地址: http
RabbitMQ進階使用-延時佇列的配置(Spring Boot)
依賴 MAVEN配置pom.xml <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spri
RabbitMQ高階之訊息限流與延時佇列
>人生終將是場單人旅途,孤獨之前是迷茫,孤獨過後是成長。 ## 楔子 本篇是訊息佇列`RabbitMQ`的第五彈。 上篇本來打算講述`RabbitMQ`的一些高階用法: * 如何保證訊息的可靠性? * 訊息佇列如何進行限流? * 如何設定延時佇列進行延時消費? 最終因為篇幅緣故,上篇只講了`
RabbitMQ延時任務
sicp exceptio exclusive 緩沖 type 很多 單獨 設置 utf8 概念: 消息的TTL(Time To Live)消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連著的保留時間,也可以對
php訂單延時處理-延時佇列
延遲佇列,顧名思義它是一種帶有延遲功能的訊息佇列。 那麼,是在什麼場景下我才需要這樣的佇列呢? 一、背景 先看看一下業務場景: 1.會員過期前3天傳送召回通知 2.訂單支付成功後,5分鐘後檢測下游環節是否都正常,比如使用者購買會員後,各種會員狀態是否都設定成功 3.如何定期檢查處於退款狀態
go延時佇列
package main import ( "errors" "flag" "fmt" log "github.com/cihub/seelog" "github.com/garyburd/redigo/redis" "github.com/robfig/cron" "runtime" "strings"
Redis 非同步訊息佇列與延時佇列
訊息中介軟體,大家都會想到 Rabbitmq 和 Kafka 作為訊息佇列中介軟體,來給應用程式之間增加非同步訊息傳遞功能。這兩個中介軟體都是專業的訊息佇列中介軟體,特性之多超出了大多數人的理解能力。但是這種屬於重量級的應