1. 程式人生 > >RabbitMQ訊息可靠性投遞解決方案 - 基於SpringBoot實現

RabbitMQ訊息可靠性投遞解決方案 - 基於SpringBoot實現

https://www.imooc.com/article/49814

參考地址:

https://www.imooc.com/t/2726237

談到訊息的可靠性投遞,無法避免的,在實際的工作中會經常碰到,比如一些核心業務需要保障訊息不丟失,接下來我們看一個可靠性投遞的流程圖,說明可靠性投遞的概念:

https://img2.mukewang.com/5b65729e0001439310630624.jpg

  • Step 1: 首先把訊息資訊(業務資料)儲存到資料庫中,緊接著,我們再把這個訊息記錄也儲存到一張訊息記錄表裡(或者另外一個同源資料庫的訊息記錄表)

  • Step 2:傳送訊息到MQ Broker節點(採用confirm方式傳送,會有非同步的返回結果)

  • Step 3、4:生產者端接受MQ Broker節點返回的Confirm確認訊息結果,然後進行更新訊息記錄表裡的訊息狀態。比如預設Status = 0 當收到訊息確認成功後,更新為1即可!

  • Step 5:但是在訊息確認這個過程中可能由於網路閃斷、MQ Broker端異常等原因導致 回送訊息失敗或者異常。這個時候就需要傳送方(生產者)對訊息進行可靠性投遞了,保障訊息不丟失,100%的投遞成功!(有一種極限情況是閃斷,Broker返回的成功確認訊息,但是生產端由於網路閃斷沒收到,這個時候重新投遞可能會造成訊息重複,需要消費端去做冪等處理)所以我們需要有一個定時任務,(比如每5分鐘拉取一下處於中間狀態的訊息,當然這個訊息可以設定一個超時時間,比如超過1分鐘 Status = 0 ,也就說明了1分鐘這個時間視窗內,我們的訊息沒有被確認,那麼會被定時任務拉取出來)

  • Step 6:接下來我們把中間狀態的訊息進行重新投遞 retry send,繼續傳送訊息到MQ ,當然也可能有多種原因導致傳送失敗

  • Step 7:我們可以採用設定最大努力嘗試次數,比如投遞了3次,還是失敗,那麼我們可以將最終狀態設定為Status = 2 ,最後 交由人工解決處理此類問題(或者把訊息轉儲到失敗表中)。

接下來,我們使用SpringBoot2.x 實現這一可靠性投遞策略:

廢話不多說,直接上程式碼:

  • 資料庫庫表結構:訂單表和訊息記錄表

-- 表 order 訂單結構
CREATE TABLE IF NOT EXISTS `t_order` (
  `id` varchar(128) NOT NULL, -- 訂單ID
  `name` varchar(128), -- 訂單名稱 其他業務熟悉忽略
  `message_id` varchar(128) NOT NULL, -- 訊息唯一ID
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- 表 broker_message_log 訊息記錄結構
CREATE TABLE IF NOT EXISTS `broker_message_log` (
  `message_id` varchar(128) NOT NULL, -- 訊息唯一ID
  `message` varchar(4000) DEFAULT NULL, -- 訊息內容
  `try_count` int(4) DEFAULT '0', -- 重試次數
  `status` varchar(10) DEFAULT '', -- 訊息投遞狀態  0 投遞中 1 投遞成功   2 投遞失敗
  `next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',  -- 下一次重試時間 或 超時時間
  `create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', -- 建立時間
  `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', -- 更新時間
  PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 整合SpringBoot 實現生產端程式碼如下:pom.xml配置

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

 

    <groupId>com.bfxy</groupId>

    <artifactId>rabbitmq-springboot-producer</artifactId>

    <version>0.0.1-SNAPSHOT</version>

    <packaging>jar</packaging>

 

    <name>rabbitmq-springboot-producer</name>

    <description>rabbitmq-springboot-producer</description>

 

    <parent>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-parent</artifactId>

        <version>2.0.2.RELEASE</version>

        <relativePath/> <!-- lookup parent from repository -->

    </parent>

 

    <properties>

        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

        <java.version>1.8</java.version>

    </properties>

 

    <dependencies>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-web</artifactId>

        </dependency

 

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-test</artifactId>

            <scope>test</scope>

        </dependency>

 

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-amqp</artifactId>

        </dependency>        

 

        <!-- 新增JDBC jar --> 

        <dependency>

          <groupId>org.mybatis.spring.boot</groupId>

          <artifactId>mybatis-spring-boot-starter</artifactId>

          <version>1.1.1</version>

        </dependency>

        <dependency>

          <groupId>tk.mybatis</groupId>

          <artifactId>mapper-spring-boot-starter</artifactId>

          <version>1.1.0</version>

        </dependency>    

        <dependency>

            <groupId>com.alibaba</groupId>

            <artifactId>druid</artifactId>

            <version>1.0.24</version>

        </dependency>

        <dependency>

            <groupId>mysql</groupId>

            <artifactId>mysql-connector-java</artifactId>

        </dependency>

        <!-- mybatis分頁外掛 -->

        <dependency>  

            <groupId>com.github.miemiedev</groupId>  

            <artifactId>mybatis-paginator</artifactId>  

            <version>1.2.17</version>  

            <exclusions>

                <exclusion>

                     <groupId>org.mybatis</groupId>

                    <artifactId>mybatis</artifactId>

                </exclusion>

            </exclusions>            

        </dependency>                    

        <dependency>

            <groupId>org.apache.commons</groupId>

            <artifactId>commons-lang3</artifactId>

        </dependency>

        <dependency>

            <groupId>commons-io</groupId>

            <artifactId>commons-io</artifactId>

            <version>2.4</version>

        </dependency>

        <dependency>

            <groupId>com.alibaba</groupId>

            <artifactId>fastjson</artifactId>

            <version>1.1.26</version>

        </dependency>    

        <dependency>

            <groupId>javax.servlet</groupId>

            <artifactId>javax.servlet-api</artifactId>

            <scope>provided</scope>    

        </dependency>  

                <dependency>

                    <groupId>log4j</groupId>

                    <artifactId>log4j</artifactId>

                    <version>1.2.17</version>

                </dependency>              

    </dependencies>

 

    <build>

        <plugins>

            <plugin>

                <groupId>org.springframework.boot</groupId>

                <artifactId>spring-boot-maven-plugin</artifactId>

            </plugin>

        </plugins>

    </build>

 

 

</project>

  • application.properties配置:

  • 1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    spring.rabbitmq.addresses=192.168.11.76:5672

    spring.rabbitmq.username=guest

    spring.rabbitmq.password=guest

    spring.rabbitmq.virtual-host=/

    spring.rabbitmq.connection-timeout=15000

     

    spring.rabbitmq.publisher-confirms=true

    spring.rabbitmq.publisher-returns=true

    spring.rabbitmq.template.mandatory=true

     

    server.servlet.context-path=/

    server.port=8001

     

    spring.http.encoding.charset=UTF-8

    spring.jackson.date-format=yyyy-MM-dd HH:mm:ss

    spring.jackson.time-zone=GMT+8

    spring.jackson.default-property-inclusion=NON_NULL

     

    spring.datasource.type=com.alibaba.druid.pool.DruidDataSource

    spring.datasource.url=jdbc:mysql://localhost:3306/test?characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true

    spring.datasource.driver-class-name=com.mysql.jdbc.Driver

    spring.datasource.username=root

    spring.datasource.password=root

     

    mybatis.type-aliases-package=com.bfxy.springboot

    mybatis.mapper-locations=classpath:com/bfxy/springboot/mapping/*.xml

     

    logging.level.tk.mybatis=TRACE

  • 資料來源druid.properties配置

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

##下面為連線池的補充設定,應用到上面所有資料來源中

#初始化大小,最小,最大

druid.initialSize=5

druid.minIdle=10

druid.maxActive=300

#配置獲取連線等待超時的時間

druid.maxWait=60000

#配置間隔多久才進行一次檢測,檢測需要關閉的空閒連線,單位是毫秒 

druid.timeBetweenEvictionRunsMillis=60000

#配置一個連線在池中最小生存的時間,單位是毫秒

druid.minEvictableIdleTimeMillis=300000

druid.validationQuery=SELECT 1 FROM DUAL

druid.testWhileIdle=true

druid.testOnBorrow=false

druid.testOnReturn=false

#開啟PSCache,並且指定每個連線上PSCache的大小

druid.poolPreparedStatements=true

druid.maxPoolPreparedStatementPerConnectionSize=20

#配置監控統計攔截的filters,去掉後監控介面sql無法統計,'wall'用於防火牆 

druid.filters=stat,wall,log4j

#通過connectProperties屬性來開啟mergeSql功能;慢SQL記錄

druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000

#合併多個DruidDataSource的監控資料

druid.useGlobalDataSourceStat=true

  • 實體物件:

  • 1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    package com.bfxy.springboot.entity;

     

    import java.io.Serializable;

     

    public class Order implements Serializable {

     

        private static final long serialVersionUID = 9111357402963030257L;

     

        private String id;

     

        private String name;

     

        private String messageId;

     

        public String getId() {

            return id;

        }

     

        public void setId(String id) {

            this.id = id == null null : id.trim();

        }

     

        public String getName() {

            return name;

        }

     

        public void setName(String name) {

            this.name = name == null null : name.trim();

        }

     

        public String getMessageId() {

            return messageId;

        }

     

        public void setMessageId(String messageId) {

            this.messageId = messageId == null null : messageId.trim();

        }

    }

  • 1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    68

    69

    70

    71

    72

    73

    74

    75

    package com.bfxy.springboot.entity;

     

    import java.util.Date;

     

    public class BrokerMessageLog {

        private String messageId;

     

        private String message;

     

        private Integer tryCount;

     

        private String status;

     

        private Date nextRetry;

     

        private Date createTime;

     

        private Date updateTime;

     

        public String getMessageId() {

            return messageId;

        }

     

        public void setMessageId(String messageId) {

            this.messageId = messageId == null null : messageId.trim();

        }

     

        public String getMessage() {

            return message;

        }

     

        public void setMessage(String message) {

            this.message = message == null null : message.trim();

        }

     

        public Integer getTryCount() {

            return tryCount;

        }

     

        public void setTryCount(Integer tryCount) {

            this.tryCount = tryCount;

        }

     

        public String getStatus() {

            return status;

        }

     

        public void setStatus(String status) {

            this.status = status == null null : status.trim();

        }

     

        public Date getNextRetry() {

            return nextRetry;

        }

     

        public void setNextRetry(Date nextRetry) {

            this.nextRetry = nextRetry;

        }

     

        public Date getCreateTime() {

            return createTime;

        }

     

        public void setCreateTime(Date createTime) {

            this.createTime = createTime;

        }

     

        public Date getUpdateTime() {

            return updateTime;

        }

     

        public void setUpdateTime(Date updateTime) {

            this.updateTime = updateTime;

        }

    }

  • 資料庫連線池程式碼:

  • 1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    68

    package com.bfxy.springboot.config.database;

     

     

    import java.sql.SQLException;

     

    import javax.sql.DataSource;

     

    import org.slf4j.Logger;

    import org.slf4j.LoggerFactory;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.context.annotation.Bean;

    import org.springframework.context.annotation.Configuration;

    import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;

    import org.springframework.jdbc.datasource.DataSourceTransactionManager;

    import org.springframework.transaction.PlatformTransactionManager;

    import org.springframework.transaction.annotation.EnableTransactionManagement;

     

    import com.alibaba.druid.pool.DruidDataSource; 

     

    @Configuration

    @EnableTransactionManagement

    public class DruidDataSourceConfig {

         

        private static Logger logger = LoggerFactory.getLogger(DruidDataSourceConfig.class);

         

        @Autowired

        private DruidDataSourceSettings druidSettings;

         

        public static String DRIVER_CLASSNAME ;

         

        @Bean

        public static PropertySourcesPlaceholderConfigurer propertyConfigure(){

            return new PropertySourcesPlaceholderConfigurer();

        }    

         

        @Bean

        public DataSource dataSource() throws SQLException {

            DruidDataSource ds = new DruidDataSource();

            ds.setDriverClassName(druidSettings.getDriverClassName());

            DRIVER_CLASSNAME = druidSettings.getDriverClassName();

            ds.setUrl(druidSettings.getUrl());

            ds.setUsername(druidSettings.getUsername());

            ds.setPassword(druidSettings.getPassword());

            ds.setInitialSize(druidSettings.getInitialSize());

            ds.setMinIdle(druidSettings.getMinIdle());

            ds.setMaxActive(druidSettings.getMaxActive());

            ds.setTimeBetweenEvictionRunsMillis(druidSettings.getTimeBetweenEvictionRunsMillis());

            ds.setMinEvictableIdleTimeMillis(druidSettings.getMinEvictableIdleTimeMillis());

            ds.setValidationQuery(druidSettings.getValidationQuery());

            ds.setTestWhileIdle(druidSettings.isTestWhileIdle());

            ds.setTestOnBorrow(druidSettings.isTestOnBorrow());

            ds.setTestOnReturn(druidSettings.isTestOnReturn());

            ds.setPoolPreparedStatements(druidSettings.isPoolPreparedStatements());

            ds.setMaxPoolPreparedStatementPerConnectionSize(druidSettings.getMaxPoolPreparedStatementPerConnectionSize());

            ds.setFilters(druidSettings.getFilters());

            ds.setConnectionProperties(druidSettings.getConnectionProperties());

            logger.info(" druid datasource config : {} ", ds);

            return ds;

        }

     

        @Bean

        public PlatformTransactionManager transactionManager() throws Exception {

            DataSourceTransactionManager txManager = new DataSourceTransactionManager();

            txManager.setDataSource(dataSource());

            return txManager;

        }

         

    }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

package com.bfxy.springboot.config.database;

 

import org.springframework.beans.factory.annotation.Value;

import org.springframework.boot.context.properties.ConfigurationProperties;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.PropertySource;

import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;

import org.springframework.stereotype.Component;

 

@Component

@ConfigurationProperties(prefix="spring.datasource"

@PropertySource("classpath:druid.properties")

public class DruidDataSourceSettings {

 

    private String driverClassName;

    private String url;

    private String username;

    private String password;

     

    @Value("${druid.initialSize}")

    private int initialSize;

     

    @Value("${druid.minIdle}")

    private int minIdle;

     

    @Value("${druid.maxActive}")

    private int maxActive;

     

    @Value("${druid.timeBetweenEvictionRunsMillis}")

    private long timeBetweenEvictionRunsMillis;

     

    @Value("${druid.minEvictableIdleTimeMillis}")

    private long minEvictableIdleTimeMillis;

     

    @Value("${druid.validationQuery}")

    private String validationQuery;

     

    @Value("${druid.testWhileIdle}")

    private boolean testWhileIdle;

     

    @Value("${druid.testOnBorrow}")

    private boolean testOnBorrow;

     

    @Value("${druid.testOnReturn}")

    private boolean testOnReturn;

     

    @Value("${druid.poolPreparedStatements}")

    private boolean poolPreparedStatements;

     

    @Value("${druid.maxPoolPreparedStatementPerConnectionSize}")

    private int maxPoolPreparedStatementPerConnectionSize;

     

    @Value("${druid.filters}")

    private String filters;

     

    @Value("${druid.connectionProperties}")

    private String connectionProperties;

     

    @Bean

    public static PropertySourcesPlaceholderConfigurer properdtyConfigure(){

        return new PropertySourcesPlaceholderConfigurer();

    }

     

    public String getDriverClassName() {

        return driverClassName;

    }

    public void setDriverClassName(String driverClassName) {

        this.driverClassName = driverClassName;

    }

    public String getUrl() {

        return url;

    }

    public void setUrl(String url) {

        this.url = url;

    }

    public String getUsername() {

        return username;

    }

    public void setUsername(String username) {

        this.username = username;

    }

    public String getPassword() {

        return password;

    }

    public void setPassword(String password) {

        this.password = password;

    }

    public int getInitialSize() {

        return initialSize;

    }

    public void setInitialSize(int initialSize) {

        this.initialSize = initialSize;

    }

    public int getMinIdle() {

        return minIdle;

    }

    public void setMinIdle(int minIdle) {

        this.minIdle = minIdle;

    }

    public int getMaxActive() {

        return maxActive;

    }

    public void setMaxActive(int maxActive) {

        this.maxActive = maxActive;

    }

    public long getTimeBetweenEvictionRunsMillis() {

        return timeBetweenEvictionRunsMillis;

    }

    public void setTimeBetweenEvictionRunsMillis(long timeBetweenEvictionRunsMillis) {

        this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;

    }

    public long getMinEvictableIdleTimeMillis() {

        return minEvictableIdleTimeMillis;

    }

    public void setMinEvictableIdleTimeMillis(long minEvictableIdleTimeMillis) {

        this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;

    }

    public String getValidationQuery() {

        return validationQuery;

    }

    public void setValidationQuery(String validationQuery) {

        this.validationQuery = validationQuery;

    }

    public boolean isTestWhileIdle() {

        return testWhileIdle;

    }

    public void setTestWhileIdle(boolean testWhileIdle) {

        this.testWhileIdle = testWhileIdle;

    }

    public boolean isTestOnBorrow() {

        return testOnBorrow;

    }

    public void setTestOnBorrow(boolean testOnBorrow) {

        this.testOnBorrow = testOnBorrow;

    }

    public boolean isTestOnReturn() {

        return testOnReturn;

    }

    public void setTestOnReturn(boolean testOnReturn) {

        this.testOnReturn = testOnReturn;

    }

    public boolean isPoolPreparedStatements() {

        return poolPreparedStatements;

    }

    public void setPoolPreparedStatements(boolean poolPreparedStatements) {

        this.poolPreparedStatements = poolPreparedStatements;

    }

    public int getMaxPoolPreparedStatementPerConnectionSize() {

        return maxPoolPreparedStatementPerConnectionSize;

    }

    public void setMaxPoolPreparedStatementPerConnectionSize(

            int maxPoolPreparedStatementPerConnectionSize) {

        this.maxPoolPreparedStatementPerConnectionSize = maxPoolPreparedStatementPerConnectionSize;

    }

    public String getFilters() {

        return filters;

    }

    public void setFilters(String filters) {

       &nb