1. 程式人生 > >一個基於RabbitMQ的可複用的事務訊息方案

一個基於RabbitMQ的可複用的事務訊息方案

前提

分散式事務是微服務實踐中一個比較棘手的問題,在筆者所實施的微服務實踐方案中,都採用了折中或者規避強一致性的方案。參考Ebay多年前提出的本地訊息表方案,基於RabbitMQMySQLJDBC)做了輕量級的封裝,實現了低入侵性的事務訊息模組。本文的內容就是詳細分析整個方案的設計思路和實施。環境依賴如下:

  • JDK1.8+
  • spring-boot-start-web:2.x.xspring-boot-start-jdbc:2.x.xspring-boot-start-amqp:2.x.x
  • HikariCP:3.x.xspring-boot-start-jdbc自帶)、mysql-connector-java:5.1.48
  • redisson:3.12.1

方案設計思路

事務訊息原則上只適合弱一致性(或者說最終一致性)的場景,常見的弱一致性場景如:

  • 使用者服務完成了註冊動作,向簡訊服務推送一條營銷相關的訊息。
  • 信貸體系中,訂單服務儲存訂單完畢,向審批服務推送一條待審批的訂單記錄資訊。
  • ......

強一致性的場景一般不應該選用事務訊息。

一般情況下,要求強一致性說明要嚴格同步,也就是所有操作必須同時成功或者同時失敗,這樣就會引入同步帶來的額外消耗。如果一個事務訊息模組設計合理,補償、查詢、監控等等功能都完畢,由於系統互動是非同步的,整體吞吐要比嚴格同步高。在筆者負責的業務系統中基於事務訊息使用還定製了一條基本原則:訊息內容正確的前提下,消費方出現異常需要自理。

簡單來說就是:上游保證了自身的業務正確性,成功推送了正確的訊息到RabbitMQ就認為上游義務已經結束。

為了降低程式碼的入侵性,事務訊息需要藉助Spring的程式設計式事務或者宣告式事務。程式設計式事務一般依賴於TransactionTemplate,而宣告式事務依託於AOP模組,依賴於註解@Transactional

接著需要自定義一個事務訊息功能模組,新增一個事務訊息記錄表(其實就是本地訊息表),用於儲存每一條需要傳送的訊息記錄。事務訊息功能模組的主要功能是:

  • 儲存訊息記錄。
  • 推送訊息到RabbitMQ服務端。
  • 訊息記錄的查詢、補償推送等等。

事務執行的邏輯單元

在事務執行的邏輯單元裡面,需要進行待推送的事務訊息記錄的儲存,也就是:本地(業務)邏輯和事務訊息記錄儲存操作繫結在同一個事務。

傳送訊息到RabbitMQ服務端這一步需要延後到事務提交之後,這樣才能保證事務提交成功和訊息成功傳送到RabbitMQ服務端這兩個操作是一致的。為了把儲存待發送的事務訊息和傳送訊息到RabbitMQ兩個動作從使用者感知角度合併為一個動作,這裡需要用到Spring特有的事務同步器TransactionSynchronization,這裡分析一下事務同步器的主要方法的回撥位置,主要參考AbstractPlatformTransactionManager#commit()或者AbstractPlatformTransactionManager#processCommit()方法:

上圖僅僅演示了事務正確提交的場景(不包含異常的場景)。這裡可以明確知道,事務同步器TransactionSynchronizationafterCommit()afterCompletion(int status)方法都在真正的事務提交點AbstractPlatformTransactionManager#doCommit()之後回撥,因此可以選用這兩個方法其中之一用於執行推送訊息到RabbitMQ服務端,整體的虛擬碼如下:

@Transactional
public Dto businessMethod(){
    business transaction code block ...
    // 儲存事務訊息
    [saveTransactionMessageRecord()]
    // 註冊事務同步器 - 在afterCommit()方法中推送訊息到RabbitMQ
    [register TransactionSynchronization,send message in method afterCommit()]
    business transaction code block ...
}

上面虛擬碼中,儲存事務訊息和註冊事務同步器兩個步驟可以安插在事務方法中的任意位置,也就是說與執行順序無關。

事務訊息的補償

雖然之前提到筆者建議下游服務自理自身服務消費異常的場景,但是有些時候迫於無奈還是需要上游把對應的訊息重新推送,這個算是特殊的場景。另外還有一個場景需要考慮:事務提交之後觸發事務同步器TransactionSynchronizationafterCommit()方法失敗。這是一個低概率的場景,但是在生產中一定會出現,一個比較典型的原因就是:事務提交完成後尚未來得及觸發TransactionSynchronization#afterCommit()方法進行推送服務例項就被重啟。如下圖所示:

為了統一處理補償推送的問題,使用了有限狀態判斷訊息是否已經推送成功:

  • 在事務方法內,儲存事務訊息的時候,標記訊息記錄推送狀態為處理中。
  • 事務同步器介面TransactionSynchronizationafterCommit()方法的實現中,推送對應的訊息到RabbitMQ,然後更變事務訊息記錄的狀態為推送成功。

還有一種極為特殊的情況是RabbitMQ服務端本身出現故障導致訊息推送異常,這種情況下需要進行重試(補償推送),經驗證明短時間內的反覆重試是沒有意義的,故障的服務一般不會瞬時恢復,所以可以考慮使用指數退避演算法進行重試,同時需要限制最大重試次數。

指數值、間隔值和最大重試次數上限需要根據實際情況設定,否則容易出現訊息延時過大或者重試過於頻繁等問題。

方案實施

引入核心依賴:

<properties>
    <spring.boot.version>2.2.4.RELEASE</spring.boot.version>
    <redisson.version>3.12.1</redisson.version>
    <mysql.connector.version>5.1.48</mysql.connector.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql.connector.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-aop</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>${redisson.version}</version>
    </dependency>
</dependencies>

spring-boot-starter-jdbcmysql-connector-javaspring-boot-starter-aopMySQL事務相關,而spring-boot-starter-amqpRabbitMQ客戶端的封裝,redisson主要使用其分散式鎖,用於補償定時任務的加鎖執行(以防止服務多個節點併發執行補償推送)。

表設計

事務訊息模組主要涉及兩張表,以MySQL為例,建表DDL如下:

CREATE TABLE `t_transactional_message`
(
    id                  BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    create_time         DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP,
    edit_time           DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    creator             VARCHAR(20)     NOT NULL DEFAULT 'admin',
    editor              VARCHAR(20)     NOT NULL DEFAULT 'admin',
    deleted             TINYINT         NOT NULL DEFAULT 0,
    current_retry_times TINYINT         NOT NULL DEFAULT 0 COMMENT '當前重試次數',
    max_retry_times     TINYINT         NOT NULL DEFAULT 5 COMMENT '最大重試次數',
    queue_name          VARCHAR(255)    NOT NULL COMMENT '佇列名',
    exchange_name       VARCHAR(255)    NOT NULL COMMENT '交換器名',
    exchange_type       VARCHAR(8)      NOT NULL COMMENT '交換型別',
    routing_key         VARCHAR(255) COMMENT '路由鍵',
    business_module     VARCHAR(32)     NOT NULL COMMENT '業務模組',
    business_key        VARCHAR(255)    NOT NULL COMMENT '業務鍵',
    next_schedule_time  DATETIME        NOT NULL COMMENT '下一次排程時間',
    message_status      TINYINT         NOT NULL DEFAULT 0 COMMENT '訊息狀態',
    init_backoff        BIGINT UNSIGNED NOT NULL DEFAULT 10 COMMENT '退避初始化值,單位為秒',
    backoff_factor      TINYINT         NOT NULL DEFAULT 2 COMMENT '退避因子(也就是指數)',
    INDEX idx_queue_name (queue_name),
    INDEX idx_create_time (create_time),
    INDEX idx_next_schedule_time (next_schedule_time),
    INDEX idx_business_key (business_key)
) COMMENT '事務訊息表';

CREATE TABLE `t_transactional_message_content`
(
    id         BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    message_id BIGINT UNSIGNED NOT NULL COMMENT '事務訊息記錄ID',
    content    TEXT COMMENT '訊息內容'
) COMMENT '事務訊息內容表';

因為此模組有可能擴展出一個後臺管理模組,所以要把訊息的管理和狀態相關欄位和大體積的訊息內容分別存放在兩個表,從而避免大批量查詢訊息記錄的時候MySQL服務IO使用率過高的問題(這是和上一個公司的DBA團隊商討後得到的一個比較合理的方案)。預留了兩個業務欄位business_modulebusiness_key用於標識業務模組和業務鍵(一般是唯一識別號,例如訂單號)。

一般情況下,如果服務通過配置自行提前宣告佇列和交換器的繫結關係,那麼傳送RabbitMQ訊息的時候其實只依賴於exchangeNameroutingKey兩個欄位(header型別的交換器是特殊的,也比較少用,這裡暫時不用考慮),考慮到服務可能會遺漏宣告操作,傳送訊息的時候會基於佇列進行首次繫結宣告並且快取相關的資訊(RabbitMQ中的佇列-交換器繫結宣告只要每次宣告繫結關係的引數一致,則不會丟擲異常)。

方案程式碼設計

下面的方案設計描述中,暫時忽略了訊息事務管理後臺的API設計,這些可以在後期補充。

定義貧血模型實體類TransactionalMessageTransactionalMessageContent

@Data
public class TransactionalMessage {

    private Long id;
    private LocalDateTime createTime;
    private LocalDateTime editTime;
    private String creator;
    private String editor;
    private Integer deleted;
    private Integer currentRetryTimes;
    private Integer maxRetryTimes;
    private String queueName;
    private String exchangeName;
    private String exchangeType;
    private String routingKey;
    private String businessModule;
    private String businessKey;
    private LocalDateTime nextScheduleTime;
    private Integer messageStatus;
    private Long initBackoff;
    private Integer backoffFactor;
}

@Data
public class TransactionalMessageContent {

    private Long id;
    private Long messageId;
    private String content;
}

然後定義dao介面(這裡暫時不展開實現的細節程式碼,儲存使用MySQL,如果要替換為其他型別的資料庫,只需要使用不同的實現即可):

public interface TransactionalMessageDao {

    void insertSelective(TransactionalMessage record);

    void updateStatusSelective(TransactionalMessage record);

    List<TransactionalMessage> queryPendingCompensationRecords(LocalDateTime minScheduleTime,
                                                               LocalDateTime maxScheduleTime,
                                                               int limit);
}

public interface TransactionalMessageContentDao {

    void insert(TransactionalMessageContent record);

    List<TransactionalMessageContent> queryByMessageIds(String messageIds);
}

接著定義事務訊息服務介面TransactionalMessageService

// 對外提供的服務類介面
public interface TransactionalMessageService {

    void sendTransactionalMessage(Destination destination, TxMessage message);
}


@Getter
@RequiredArgsConstructor
public enum ExchangeType {

    FANOUT("fanout"),

    DIRECT("direct"),

    TOPIC("topic"),

    DEFAULT(""),

    ;

    private final String type;
}

// 傳送訊息的目的地
public interface Destination {

    ExchangeType exchangeType();

    String queueName();

    String exchangeName();

    String routingKey();
}

@Builder
public class DefaultDestination implements Destination {

    private ExchangeType exchangeType;
    private String queueName;
    private String exchangeName;
    private String routingKey;

    @Override
    public ExchangeType exchangeType() {
        return exchangeType;
    }

    @Override
    public String queueName() {
        return queueName;
    }

    @Override
    public String exchangeName() {
        return exchangeName;
    }

    @Override
    public String routingKey() {
        return routingKey;
    }
}

// 事務訊息
public interface TxMessage {

    String businessModule();

    String businessKey();

    String content();
}

@Builder
public class DefaultTxMessage implements TxMessage {

    private String businessModule;
    private String businessKey;
    private String content;

    @Override
    public String businessModule() {
        return businessModule;
    }

    @Override
    public String businessKey() {
        return businessKey;
    }

    @Override
    public String content() {
        return content;
    }
}

// 訊息狀態
@RequiredArgsConstructor
public enum TxMessageStatus {

    /**
     * 成功
     */
    SUCCESS(1),

    /**
     * 待處理
     */
    PENDING(0),

    /**
     * 處理失敗
     */
    FAIL(-1),

    ;

    private final Integer status;
}

TransactionalMessageService的實現類是事務訊息的核心功能實現,程式碼如下:

@Slf4j
@Service
@RequiredArgsConstructor
public class RabbitTransactionalMessageService implements TransactionalMessageService {

    private final AmqpAdmin amqpAdmin;
    private final TransactionalMessageManagementService managementService;

    private static final ConcurrentMap<String, Boolean> QUEUE_ALREADY_DECLARE = new ConcurrentHashMap<>();

    @Override
    public void sendTransactionalMessage(Destination destination, TxMessage message) {
        String queueName = destination.queueName();
        String exchangeName = destination.exchangeName();
        String routingKey = destination.routingKey();
        ExchangeType exchangeType = destination.exchangeType();
        // 原子性的預宣告
        QUEUE_ALREADY_DECLARE.computeIfAbsent(queueName, k -> {
            Queue queue = new Queue(queueName);
            amqpAdmin.declareQueue(queue);
            Exchange exchange = new CustomExchange(exchangeName, exchangeType.getType());
            amqpAdmin.declareExchange(exchange);
            Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
            amqpAdmin.declareBinding(binding);
            return true;
        });
        TransactionalMessage record = new TransactionalMessage();
        record.setQueueName(queueName);
        record.setExchangeName(exchangeName);
        record.setExchangeType(exchangeType.getType());
        record.setRoutingKey(routingKey);
        record.setBusinessModule(message.businessModule());
        record.setBusinessKey(message.businessKey());
        String content = message.content();
        // 儲存事務訊息記錄
        managementService.saveTransactionalMessageRecord(record, content);
        // 註冊事務同步器
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit() {
                managementService.sendMessageSync(record, content);
            }
        });
    }
}

訊息記錄狀態和內容持久化的管理統一放在TransactionalMessageManagementService中:

@Slf4j
@RequiredArgsConstructor
@Service
public class TransactionalMessageManagementService {

    private final TransactionalMessageDao messageDao;
    private final TransactionalMessageContentDao contentDao;
    private final RabbitTemplate rabbitTemplate;

    private static final LocalDateTime END = LocalDateTime.of(2999, 1, 1, 0, 0, 0);
    private static final long DEFAULT_INIT_BACKOFF = 10L;
    private static final int DEFAULT_BACKOFF_FACTOR = 2;
    private static final int DEFAULT_MAX_RETRY_TIMES = 5;
    private static final int LIMIT = 100;

    public void saveTransactionalMessageRecord(TransactionalMessage record, String content) {
        record.setMessageStatus(TxMessageStatus.PENDING.getStatus());
        record.setNextScheduleTime(calculateNextScheduleTime(LocalDateTime.now(), DEFAULT_INIT_BACKOFF,
                DEFAULT_BACKOFF_FACTOR, 0));
        record.setCurrentRetryTimes(0);
        record.setInitBackoff(DEFAULT_INIT_BACKOFF);
        record.setBackoffFactor(DEFAULT_BACKOFF_FACTOR);
        record.setMaxRetryTimes(DEFAULT_MAX_RETRY_TIMES);
        messageDao.insertSelective(record);
        TransactionalMessageContent messageContent = new TransactionalMessageContent();
        messageContent.setContent(content);
        messageContent.setMessageId(record.getId());
        contentDao.insert(messageContent);
    }

    public void sendMessageSync(TransactionalMessage record, String content) {
        try {
            rabbitTemplate.convertAndSend(record.getExchangeName(), record.getRoutingKey(), content);
            if (log.isDebugEnabled()) {
                log.debug("傳送訊息成功,目標佇列:{},訊息內容:{}", record.getQueueName(), content);
            }
            // 標記成功
            markSuccess(record);
        } catch (Exception e) {
            // 標記失敗
            markFail(record, e);
        }
    }

    private void markSuccess(TransactionalMessage record) {
        // 標記下一次執行時間為最大值
        record.setNextScheduleTime(END);
        record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ?
                record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1);
        record.setMessageStatus(TxMessageStatus.SUCCESS.getStatus());
        record.setEditTime(LocalDateTime.now());
        messageDao.updateStatusSelective(record);
    }

    private void markFail(TransactionalMessage record, Exception e) {
        log.error("傳送訊息失敗,目標佇列:{}", record.getQueueName(), e);
        record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ?
                record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1);
        // 計算下一次的執行時間
        LocalDateTime nextScheduleTime = calculateNextScheduleTime(
                record.getNextScheduleTime(),
                record.getInitBackoff(),
                record.getBackoffFactor(),
                record.getCurrentRetryTimes()
        );
        record.setNextScheduleTime(nextScheduleTime);
        record.setMessageStatus(TxMessageStatus.FAIL.getStatus());
        record.setEditTime(LocalDateTime.now());
        messageDao.updateStatusSelective(record);
    }

    /**
     * 計算下一次執行時間
     *
     * @param base          基礎時間
     * @param initBackoff   退避基準值
     * @param backoffFactor 退避指數
     * @param round         輪數
     * @return LocalDateTime
     */
    private LocalDateTime calculateNextScheduleTime(LocalDateTime base,
                                                    long initBackoff,
                                                    long backoffFactor,
                                                    long round) {
        double delta = initBackoff * Math.pow(backoffFactor, round);
        return base.plusSeconds((long) delta);
    }

    /**
     * 推送補償 - 裡面的引數應該根據實際場景定製
     */
    public void processPendingCompensationRecords() {
        // 時間的右值為當前時間減去退避初始值,這裡預防把剛儲存的訊息也推送了
        LocalDateTime max = LocalDateTime.now().plusSeconds(-DEFAULT_INIT_BACKOFF);
        // 時間的左值為右值減去1小時
        LocalDateTime min = max.plusHours(-1);
        Map<Long, TransactionalMessage> collect = messageDao.queryPendingCompensationRecords(min, max, LIMIT)
                .stream()
                .collect(Collectors.toMap(TransactionalMessage::getId, x -> x));
        if (!collect.isEmpty()) {
            StringJoiner joiner = new StringJoiner(",", "(", ")");
            collect.keySet().forEach(x -> joiner.add(x.toString()));
            contentDao.queryByMessageIds(joiner.toString())
                    .forEach(item -> {
                        TransactionalMessage message = collect.get(item.getMessageId());
                        sendMessageSync(message, item.getContent());
                    });
        }
    }
}

這裡有一點尚待優化:更新事務訊息記錄狀態的方法可以優化為批量更新,在limit比較大的時候,批量更新的效率會更高。

最後是定時任務的配置類:

@Slf4j
@RequiredArgsConstructor
@Configuration
@EnableScheduling
public class ScheduleJobAutoConfiguration {

    private final TransactionalMessageManagementService managementService;

    /**
     * 這裡用的是本地的Redis,實際上要做成配置
     */
    private final RedissonClient redisson = Redisson.create();

    @Scheduled(fixedDelay = 10000)
    public void transactionalMessageCompensationTask() throws Exception {
        RLock lock = redisson.getLock("transactionalMessageCompensationTask");
        // 等待時間5秒,預期300秒執行完畢,這兩個值需要按照實際場景定製
        boolean tryLock = lock.tryLock(5, 300, TimeUnit.SECONDS);
        if (tryLock) {
            try {
                long start = System.currentTimeMillis();
                log.info("開始執行事務訊息推送補償定時任務...");
                managementService.processPendingCompensationRecords();
                long end = System.currentTimeMillis();
                long delta = end - start;
                // 以防鎖過早釋放
                if (delta < 5000) {
                    Thread.sleep(5000 - delta);
                }
                log.info("執行事務訊息推送補償定時任務完畢,耗時:{} ms...", end - start);
            } finally {
                lock.unlock();
            }
        }
    }
}

基本程式碼編寫完,整個專案的結構如下:

最後新增兩個測試類:

@RequiredArgsConstructor
@Component
public class MockBusinessRunner implements CommandLineRunner {

    private final MockBusinessService mockBusinessService;

    @Override
    public void run(String... args) throws Exception {
        mockBusinessService.saveOrder();
    }
}

@Slf4j
@RequiredArgsConstructor
@Service
public class MockBusinessService {

    private final JdbcTemplate jdbcTemplate;
    private final TransactionalMessageService transactionalMessageService;
    private final ObjectMapper objectMapper;

    @Transactional(rollbackFor = Exception.class)
    public void saveOrder() throws Exception {
        String orderId = UUID.randomUUID().toString();
        BigDecimal amount = BigDecimal.valueOf(100L);
        Map<String, Object> message = new HashMap<>();
        message.put("orderId", orderId);
        message.put("amount", amount);
        jdbcTemplate.update("INSERT INTO t_order(order_id,amount) VALUES (?,?)", p -> {
            p.setString(1, orderId);
            p.setBigDecimal(2, amount);
        });
        String content = objectMapper.writeValueAsString(message);
        transactionalMessageService.sendTransactionalMessage(
                DefaultDestination.builder()
                        .exchangeName("tm.test.exchange")
                        .queueName("tm.test.queue")
                        .routingKey("tm.test.key")
                        .exchangeType(ExchangeType.DIRECT)
                        .build(),
                DefaultTxMessage.builder()
                        .businessKey(orderId)
                        .businessModule("SAVE_ORDER")
                        .content(content)
                        .build()
        );
        log.info("儲存訂單:{}成功...", orderId);
    }
}

某次測試結果如下:

2020-02-05 21:10:13.287  INFO 49556 --- [           main] club.throwable.cm.MockBusinessService    : 儲存訂單:07a75323-460b-42cb-aa63-1a0a45ce19bf成功...

模擬訂單資料成功儲存,而且RabbitMQ訊息在事務成功提交後正常傳送到RabbitMQ服務端中,如RabbitMQ控制檯資料所示。

小結

事務訊息模組的設計僅僅是使非同步訊息推送這個功能實現趨向於完備,其實一個合理的非同步訊息互動系統,一定會提供同步查詢介面,這一點是基於非同步訊息沒有回撥或者沒有響應的特性導致的。一般而言,一個系統的吞吐量和系統的非同步化處理佔比成正相關(這一點可以參考Amdahl's Law),所以在系統架構設計實際中應該儘可能使用非同步互動,提高系統吞吐量同時減少同步阻塞帶來的無謂等待。事務訊息模組可以擴展出一個後臺管理,甚至可以配合MicrometerPrometheusGrafana體系做實時資料監控。

本文demo專案倉庫:rabbit-transactional-message

demo必須本地安裝MySQLRedisRabbitMQ才能正常啟動,本地必須新建一個數據庫命名local

個人部落格

Throwable's Blog

(本文完 c-5-d e-a-20200202 疫情嚴重,馬上要開始在家辦公,少出門多看書