RabbitMQ訊息可靠性投遞解決方案 - 基於SpringBoot實現
https://www.imooc.com/article/49814
參考地址:
https://www.imooc.com/t/2726237
談到訊息的可靠性投遞,無法避免的,在實際的工作中會經常碰到,比如一些核心業務需要保障訊息不丟失,接下來我們看一個可靠性投遞的流程圖,說明可靠性投遞的概念:
-
Step 1: 首先把訊息資訊(業務資料)儲存到資料庫中,緊接著,我們再把這個訊息記錄也儲存到一張訊息記錄表裡(或者另外一個同源資料庫的訊息記錄表)
-
Step 2:傳送訊息到MQ Broker節點(採用confirm方式傳送,會有非同步的返回結果)
-
Step 3、4:生產者端接受MQ Broker節點返回的Confirm確認訊息結果,然後進行更新訊息記錄表裡的訊息狀態。比如預設Status = 0 當收到訊息確認成功後,更新為1即可!
-
Step 5:但是在訊息確認這個過程中可能由於網路閃斷、MQ Broker端異常等原因導致 回送訊息失敗或者異常。這個時候就需要傳送方(生產者)對訊息進行可靠性投遞了,保障訊息不丟失,100%的投遞成功!(有一種極限情況是閃斷,Broker返回的成功確認訊息,但是生產端由於網路閃斷沒收到,這個時候重新投遞可能會造成訊息重複,需要消費端去做冪等處理)所以我們需要有一個定時任務,(比如每5分鐘拉取一下處於中間狀態的訊息,當然這個訊息可以設定一個超時時間,比如超過1分鐘 Status = 0 ,也就說明了1分鐘這個時間視窗內,我們的訊息沒有被確認,那麼會被定時任務拉取出來)
-
Step 6:接下來我們把中間狀態的訊息進行重新投遞 retry send,繼續傳送訊息到MQ ,當然也可能有多種原因導致傳送失敗
-
Step 7:我們可以採用設定最大努力嘗試次數,比如投遞了3次,還是失敗,那麼我們可以將最終狀態設定為Status = 2 ,最後 交由人工解決處理此類問題(或者把訊息轉儲到失敗表中)。
接下來,我們使用SpringBoot2.x 實現這一可靠性投遞策略:
廢話不多說,直接上程式碼:
-
資料庫庫表結構:訂單表和訊息記錄表
-- 表 order 訂單結構 CREATE TABLE IF NOT EXISTS `t_order` ( `id` varchar(128) NOT NULL, -- 訂單ID `name` varchar(128), -- 訂單名稱 其他業務熟悉忽略 `message_id` varchar(128) NOT NULL, -- 訊息唯一ID PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- 表 broker_message_log 訊息記錄結構 CREATE TABLE IF NOT EXISTS `broker_message_log` ( `message_id` varchar(128) NOT NULL, -- 訊息唯一ID `message` varchar(4000) DEFAULT NULL, -- 訊息內容 `try_count` int(4) DEFAULT '0', -- 重試次數 `status` varchar(10) DEFAULT '', -- 訊息投遞狀態 0 投遞中 1 投遞成功 2 投遞失敗 `next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', -- 下一次重試時間 或 超時時間 `create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', -- 建立時間 `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', -- 更新時間 PRIMARY KEY (`message_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-
整合SpringBoot 實現生產端程式碼如下:pom.xml配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
|
-
application.properties配置:
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
spring.rabbitmq.addresses=
192.168
.
11.76
:
5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=
15000
spring.rabbitmq.publisher-confirms=
true
spring.rabbitmq.publisher-returns=
true
spring.rabbitmq.template.mandatory=
true
server.servlet.context-path=/
server.port=
8001
spring.http.encoding.charset=UTF-
8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+
8
spring.jackson.
default
-property-inclusion=NON_NULL
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.url=jdbc:mysql:
//localhost:3306/test?characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true
spring.datasource.driver-
class
-name=com.mysql.jdbc.Driver
spring.datasource.username=root
spring.datasource.password=root
mybatis.type-aliases-
package
=com.bfxy.springboot
mybatis.mapper-locations=classpath:com/bfxy/springboot/mapping/*.xml
logging.level.tk.mybatis=TRACE
-
資料來源druid.properties配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
|
-
實體物件:
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package
com.bfxy.springboot.entity;
import
java.io.Serializable;
public
class
Order
implements
Serializable {
private
static
final
long
serialVersionUID = 9111357402963030257L;
private
String id;
private
String name;
private
String messageId;
public
String getId() {
return
id;
}
public
void
setId(String id) {
this
.id = id ==
null
?
null
: id.trim();
}
public
String getName() {
return
name;
}
public
void
setName(String name) {
this
.name = name ==
null
?
null
: name.trim();
}
public
String getMessageId() {
return
messageId;
}
public
void
setMessageId(String messageId) {
this
.messageId = messageId ==
null
?
null
: messageId.trim();
}
}
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package
com.bfxy.springboot.entity;
import
java.util.Date;
public
class
BrokerMessageLog {
private
String messageId;
private
String message;
private
Integer tryCount;
private
String status;
private
Date nextRetry;
private
Date createTime;
private
Date updateTime;
public
String getMessageId() {
return
messageId;
}
public
void
setMessageId(String messageId) {
this
.messageId = messageId ==
null
?
null
: messageId.trim();
}
public
String getMessage() {
return
message;
}
public
void
setMessage(String message) {
this
.message = message ==
null
?
null
: message.trim();
}
public
Integer getTryCount() {
return
tryCount;
}
public
void
setTryCount(Integer tryCount) {
this
.tryCount = tryCount;
}
public
String getStatus() {
return
status;
}
public
void
setStatus(String status) {
this
.status = status ==
null
?
null
: status.trim();
}
public
Date getNextRetry() {
return
nextRetry;
}
public
void
setNextRetry(Date nextRetry) {
this
.nextRetry = nextRetry;
}
public
Date getCreateTime() {
return
createTime;
}
public
void
setCreateTime(Date createTime) {
this
.createTime = createTime;
}
public
Date getUpdateTime() {
return
updateTime;
}
public
void
setUpdateTime(Date updateTime) {
this
.updateTime = updateTime;
}
}
-
資料庫連線池程式碼:
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package
com.bfxy.springboot.config.database;
import
java.sql.SQLException;
import
javax.sql.DataSource;
import
org.slf4j.Logger;
import
org.slf4j.LoggerFactory;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.context.annotation.Bean;
import
org.springframework.context.annotation.Configuration;
import
org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import
org.springframework.jdbc.datasource.DataSourceTransactionManager;
import
org.springframework.transaction.PlatformTransactionManager;
import
org.springframework.transaction.annotation.EnableTransactionManagement;
import
com.alibaba.druid.pool.DruidDataSource;
@Configuration
@EnableTransactionManagement
public
class
DruidDataSourceConfig {
private
static
Logger logger = LoggerFactory.getLogger(DruidDataSourceConfig.
class
);
@Autowired
private
DruidDataSourceSettings druidSettings;
public
static
String DRIVER_CLASSNAME ;
@Bean
public
static
PropertySourcesPlaceholderConfigurer propertyConfigure(){
return
new
PropertySourcesPlaceholderConfigurer();
}
@Bean
public
DataSource dataSource()
throws
SQLException {
DruidDataSource ds =
new
DruidDataSource();
ds.setDriverClassName(druidSettings.getDriverClassName());
DRIVER_CLASSNAME = druidSettings.getDriverClassName();
ds.setUrl(druidSettings.getUrl());
ds.setUsername(druidSettings.getUsername());
ds.setPassword(druidSettings.getPassword());
ds.setInitialSize(druidSettings.getInitialSize());
ds.setMinIdle(druidSettings.getMinIdle());
ds.setMaxActive(druidSettings.getMaxActive());
ds.setTimeBetweenEvictionRunsMillis(druidSettings.getTimeBetweenEvictionRunsMillis());
ds.setMinEvictableIdleTimeMillis(druidSettings.getMinEvictableIdleTimeMillis());
ds.setValidationQuery(druidSettings.getValidationQuery());
ds.setTestWhileIdle(druidSettings.isTestWhileIdle());
ds.setTestOnBorrow(druidSettings.isTestOnBorrow());
ds.setTestOnReturn(druidSettings.isTestOnReturn());
ds.setPoolPreparedStatements(druidSettings.isPoolPreparedStatements());
ds.setMaxPoolPreparedStatementPerConnectionSize(druidSettings.getMaxPoolPreparedStatementPerConnectionSize());
ds.setFilters(druidSettings.getFilters());
ds.setConnectionProperties(druidSettings.getConnectionProperties());
logger.info(
" druid datasource config : {} "
, ds);
return
ds;
}
@Bean
public
PlatformTransactionManager transactionManager()
throws
Exception {
DataSourceTransactionManager txManager =
new
DataSourceTransactionManager();
txManager.setDataSource(dataSource());
return
txManager;
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
|