1. 程式人生 > 實用技巧 >Seata分散式事務簡單使用

Seata分散式事務簡單使用

  在分散式開發過程中,分散式事務是必須面臨的問題。因為分散式系統中,存在多個服務之間的呼叫。服務與服務之間存在事務問題,可能在某個服務呼叫鏈過程中某個服務發生異常導致資料不一致問題。

  每個服務內部的資料一致性由本地事務控制,通常用@Transactional 來控制。但是服務拆分之後,多個服務協同完成的操作如果需要保證資料一致性就需要引入分散式事務。

1.Seata簡介

Seata 是一款開源的分散式事務解決方案,致力於在微服務架構下提供高效能和簡單易用的分散式事務服務。

官網:http://seata.io/zh-cn/

1. 術語

一個ID加三個元件。

(1)XID:TransactionID 全域性事務唯一ID

(2)三元件:

TC (Transaction Coordinator) - 事務協調者:維護全域性和分支事務的狀態,驅動全域性事務提交或回滾。

TM (Transaction Manager) - 事務管理器:定義全域性事務的範圍:開始全域性事務、提交或回滾全域性事務。

RM (Resource Manager) - 資源管理器:管理分支事務處理的資源,與TC交談以註冊分支事務和報告分支事務的狀態,並驅動分支事務提交或回滾。

2. 處理過程

如下圖:

1.TM 向 TC 請求發起(Begin)、提交(Commit)、回滾(Rollback)全域性事務。
2.TM 把代表全域性事務的 XID 繫結到分支事務上。
3.RM 向 TC 註冊,把分支事務關聯到 XID 代表的全域性事務中。
4.RM 把分支事務的執行結果上報給 TC。(可選)
5.TC 傳送分支提交(Branch Commit)或分支回滾(Branch Rollback)命令給 RM。

2.Seta安裝

1.下載

http://seata.io/zh-cn/blog/download.html

例如我下載的版本是:seata-server-1.4.0.zip

2.解壓後目錄如下:

3. 檢視README檔案,內容如下:

# 指令碼說明

## [client](https://github.com/seata/seata/tree/develop/script/client) 

> 存放用於客戶端的配置和SQL

- at: AT模式下的 `undo_log` 建表語句
- conf: 客戶端的配置檔案
- saga: SAGA 模式下所需表的建表語句
- spring: SpringBoot 應用支援的配置檔案

## [server](https:
//github.com/seata/seata/tree/develop/script/server) > 存放server側所需SQL和部署指令碼 - db: server 側的儲存模式為 `db` 時所需表的建表語句 - docker-compose: server 側通過 docker-compose 部署的指令碼 - helm: server 側通過 Helm 部署的指令碼 - kubernetes: server 側通過 Kubernetes 部署的指令碼 ## [config-center](https://github.com/seata/seata/tree/develop/script/config-center) > 用於存放各種配置中心的初始化指令碼,執行時都會讀取 `config.txt`配置檔案,並寫入配置中心 - nacos: 用於向 Nacos 中新增配置 - zk: 用於向 Zookeeper 中新增配置,指令碼依賴 Zookeeper 的相關指令碼,需要手動下載;ZooKeeper相關的配置可以寫在 `zk-params.txt` 中,也可以在執行的時候輸入 - apollo: 向 Apollo 中新增配置,Apollo 的地址埠等可以寫在 `apollo-params.txt`,也可以在執行的時候輸入 - etcd3: 用於向 Etcd3 中新增配置 - consul: 用於向 consul 中新增配置

4.到https://github.com/seata/seata/tree/develop/script/server檢視建表語句。(這裡暫時記住是AT模式,區別之後分析)

到資料庫執行mysql的建表語句

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(96),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

5.將配置匯入到nacos (這一步也可以使用本地檔案作為配置中心)

(1)nacos新建分名稱空間

(2) 參考https://github.com/seata/seata/tree/develop/script/config-center 將配置檔案匯入nacos

1》下載config.txt, 並且修改裡面的store.mode=db以及賬戶密碼資訊,如下:

transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
service.vgroupMapping.my_test_tx_group=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
store.mode=db
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
store.db.user=root
store.db.password=123456
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.host=127.0.0.1
store.redis.port=6379
store.redis.maxConn=10
store.redis.minConn=1
store.redis.database=0
store.redis.password=null
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

2》將config.txt拷貝到與nacos-config.sh同級目錄(這個檔案也是在config-center下載)

3》執行下面:

liqiang@root MINGW64 ~/Desktop/file/seata/configuration/nacos
$ sh nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 6a1ba3ab-1821-43a6-b7ba-77272ea94c7d -u nacos -w nacos

4》匯入成功可以看到:

=========================================================================
 Complete initialization parameters,  total-count:80 ,  failure-count:0
=========================================================================
 Init nacos config finished, please start seata-server.

6.修改conf/registry.conf檔案:修改指明註冊中心和配置中心

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"
  loadBalance = "RandomLoadBalance"
  loadBalanceVirtualNodes = 10

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = "6a1ba3ab-1821-43a6-b7ba-77272ea94c7d"
    group = "SEATA_GROUP"
    username = "nacos"
    password = "nacos"
  }
}

7.啟動seata服務

seata\bin/seata-server.bat雙擊即可

8.啟動成功可以到nacos檢視服務列表與配置列表進行驗證

3. 簡單使用實現分散式事務

如下場景涉及分散式事務:

模擬使用者下單,會在訂單服務建立一個訂單,然後遠端呼叫庫存服務減去庫存,再通過賬戶服務來減去使用者賬戶的餘額,最後修改訂單狀態為已完成。

涉及到的服務:訂單、庫存、賬戶服務。

參考:https://gitee.com/itCjb/spring-cloud-alibaba-seata-demo

這裡的版本是基於seta1.4.0。 之前看0.9版本的教程都是將register.conf 拷到工程,目前的版本可以直接基於yml配置。

1. 庫存服務

1.資料庫 (注意每個業務資料庫都需要加undo_log表)

/*
Navicat MySQL Data Transfer

Source Server         : mysql
Source Server Version : 50721
Source Host           : localhost:3306
Source Database       : test_storage

Target Server Type    : MYSQL
Target Server Version : 50721
File Encoding         : 65001

Date: 2019-12-08 15:11:00
*/

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for product
-- ----------------------------
DROP TABLE IF EXISTS `product`;
CREATE TABLE `product` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `price` double DEFAULT NULL,
  `stock` int(11) DEFAULT NULL,
  `last_update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of product
-- ----------------------------
INSERT INTO `product` VALUES ('1', '5', '9', '2019-12-06 21:51:01');

-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of undo_log
-- ----------------------------

2. pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring-cloud-alibaba-seata-demo</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>product-service</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>

        <!-- nacos 作為服務註冊中心 -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <!-- nacos 作為配置中心 -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>com.alibaba</groupId>
                    <artifactId>druid</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-spring-boot-starter</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>
    </dependencies>

</project>

3.yml配置

1》application.yml

spring:
   datasource:
      type: com.alibaba.druid.pool.DruidDataSource
      url: jdbc:mysql://127.0.0.1:3306/test_storage?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
      driver-class-name: com.mysql.cj.jdbc.Driver
      username: root
      password: 123456
      max-wait: 60000
      max-active: 100
      min-idle: 10
      initial-size: 10
mybatis-plus:
   mapper-locations: classpath:/mapper/*Mapper.xml
   typeAliasesPackage: icu.funkye.entity
   global-config:
      db-config:
         field-strategy: not-empty
         id-type: auto
         db-type: mysql
   configuration:
      map-underscore-to-camel-case: true
      cache-enabled: true
      auto-mapping-unknown-column-behavior: none
      log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
seata:
   enabled: true
   application-id: product-service
   tx-service-group: my_test_tx_group
   config:
      type: nacos
      nacos:
         namespace: 6a1ba3ab-1821-43a6-b7ba-77272ea94c7d
         serverAddr: 127.0.0.1:8848
         group: SEATA_GROUP
         username: "nacos"
         password: "nacos"
   registry:
      type: nacos
      nacos:
         application: seata-server
         server-addr: 127.0.0.1:8848
         group: SEATA_GROUP
         namespace:
         username: "nacos"
         password: "nacos"

2》bootstrap.yml

spring:
   application:
      name: product-service
   main:
      allow-bean-definition-overriding: true
   cloud:
      nacos:
         discovery:
            server-addr: 127.0.0.1:8848
            username: "nacos"
            password: "nacos"
         config:
            server-addr: 127.0.0.1:8848
            username: "nacos"
            password: "nacos"
server:
   port: 8083

4.主啟動類

package icu.funkye;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author funkye
 */
@SpringBootApplication
public class ProductServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProductServiceApplication.class, args);
    }

}

5. 業務程式碼

業務程式碼就不貼了,就是一個簡單的CRUD

6.啟動: 注意先啟動nacos,再啟動seata-server,最後啟動專案

啟動專案可以看到RM相關注冊資訊:

2. 賬戶服務和訂單服務

。。。在最後的git地址貼出

3.最後的client服務

client服務相當於一個閘道器層,呼叫feignclient服務完成相關的操作。

測試Controller如下:

    /**
     * 測試異常回滾
     *
     * @return
     * @throws TransactionException
     */
    @GetMapping(value = "testRollback")
    @GlobalTransactional
    public Object testRollback() throws TransactionException {
        Product product = productService.getById(1);
        if (product.getStock() > 0) {
            LocalDateTime now = LocalDateTime.now();
            logger.info("seata分散式事務Id:{}", RootContext.getXID());
            Account account = accountService.getById(1);
            Orders orders = new Orders();
            orders.setCreateTime(now);
            orders.setProductId(product.getId());
            orders.setReplaceTime(now);
            orders.setSum(1);
            orders.setAmount(product.getPrice());
            orders.setAccountId(account.getId());
            product.setStock(product.getStock() - 1);
            account.setSum(account.getSum() != null ? account.getSum() + 1 : 1);
            account.setLastUpdateTime(now);
            // 庫存減去一
            productService.updateById(product);
            // 賬戶加1
            accountService.updateById(account);

            int i = 1 / 0;

            // 建立訂單
            orderService.save(orders);

            return true;
        }
        return false;
    }

4. debug之後測試

1.斷點打在client中 int i = 1 / 0 這一行。

2.程式碼執行到這裡,檢視相關服務日誌:

(1) 庫存服務日誌如下: 可以看到將庫存數量 stock修改為8

Creating a new SqlSession
SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@73753ab8] was not registered for synchronization because synchronization is not active
JDBC Connection [io.seata.rm.datasource.ConnectionProxy@7db2daa6] will not be managed by Spring
Original SQL: SELECT id,price,stock,last_update_time FROM product WHERE id=? 
parser sql: SELECT id, price, stock, last_update_time FROM product WHERE id = ?
==>  Preparing: SELECT id, price, stock, last_update_time FROM product WHERE id = ? 
==> Parameters: 1(Integer)
<==    Columns: id, price, stock, last_update_time
<==        Row: 1, 5.0, 9, 2019-12-06 21:51:01
<==      Total: 1
Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@73753ab8]
2020-12-13 21:19:35.759  INFO 14932 --- [nio-8083-exec-2] icu.funkye.controller.ProductController  : product:Product(id=1, price=5.0, stock=8, lastUpdateTime=2019-12-06T21:51:01)
Creating a new SqlSession
SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6351d47d] was not registered for synchronization because synchronization is not active
JDBC Connection [io.seata.rm.datasource.ConnectionProxy@27e7719a] will not be managed by Spring
Original SQL: UPDATE product  SET price=?,
stock=?,
last_update_time=?  WHERE id=?
parser sql: UPDATE product SET price = ?, stock = ?, last_update_time = ? WHERE id = ?
==>  Preparing: UPDATE product SET price = ?, stock = ?, last_update_time = ? WHERE id = ? 
==> Parameters: 5.0(BigDecimal), 8(Integer), 2019-12-06T21:51:01(LocalDateTime), 1(Integer)
<==    Updates: 1
Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6351d47d]

(2)賬戶服務日誌如下: 可以看到將sum修改為2,可以理解為消費2個

Creating a new SqlSession
SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@1ff5e152] was not registered for synchronization because synchronization is not active
JDBC Connection [io.seata.rm.datasource.ConnectionProxy@11321cdc] will not be managed by Spring
Original SQL: SELECT id,user_name,sum,last_update_time FROM account WHERE id=? 
parser sql: SELECT id, user_name, sum, last_update_time FROM account WHERE id = ?
==>  Preparing: SELECT id, user_name, sum, last_update_time FROM account WHERE id = ? 
==> Parameters: 1(Integer)
<==    Columns: id, user_name, sum, last_update_time
<==        Row: 1, 1, 1, 2019-12-08 15:05:05
<==      Total: 1
Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@1ff5e152]
Creating a new SqlSession
SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@24d434f9] was not registered for synchronization because synchronization is not active
JDBC Connection [io.seata.rm.datasource.ConnectionProxy@2bbfadf0] will not be managed by Spring
Original SQL: UPDATE account  SET user_name=?,
sum=?,
last_update_time=?  WHERE id=?
parser sql: UPDATE account SET user_name = ?, sum = ?, last_update_time = ? WHERE id = ?
==>  Preparing: UPDATE account SET user_name = ?, sum = ?, last_update_time = ? WHERE id = ? 
==> Parameters: 1(String), 2(Integer), 2020-12-13T21:19:35.710(LocalDateTime), 1(Integer)
<==    Updates: 1
Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@24d434f9]
2020-12-13 21:20:37.238  INFO 10632 --- [ch_RMROLE_1_5_8] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:xid=169.254.51.32:8091:81496901952864256,branchId=81496905018900480,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/test_pay,applicationData=null
2020-12-13 21:20:37.239  INFO 10632 --- [ch_RMROLE_1_5_8] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 169.254.51.32:8091:81496901952864256 81496905018900480 jdbc:mysql://127.0.0.1:3306/test_pay
Sun Dec 13 21:20:37 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
2020-12-13 21:20:37.404  INFO 10632 --- [ch_RMROLE_1_5_8] i.s.r.d.undo.AbstractUndoLogManager      : xid 169.254.51.32:8091:81496901952864256 branch 81496905018900480, undo_log deleted with GlobalFinished
2020-12-13 21:20:37.406  INFO 10632 --- [ch_RMROLE_1_5_8] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked

(3)client服務日誌如下:

2020-12-13 21:19:35.677  INFO 19848 --- [nio-8081-exec-6] i.seata.tm.api.DefaultGlobalTransaction  : Begin new global transaction [169.254.51.32:8091:81496901952864256]
2020-12-13 21:19:35.710  INFO 19848 --- [nio-8081-exec-6] icu.funkye.controller.TestController     : seata分散式事務Id:169.254.51.32:8091:81496901952864256

3.程式碼執行到這裡,檢視資料庫

(1) 檢視seataserver伺服器的三個表資料:

mysql> select * from lock_table\G
*************************** 1. row ***************************
       row_key: jdbc:mysql://127.0.0.1:3306/test_pay^^^account^^^1
           xid: 169.254.51.32:8091:81498624360579072
transaction_id: 81498624360579072
     branch_id: 81498628714266625
   resource_id: jdbc:mysql://127.0.0.1:3306/test_pay
    table_name: account
            pk: 1
    gmt_create: 2020-12-13 21:26:27
  gmt_modified: 2020-12-13 21:26:27
*************************** 2. row ***************************
       row_key: jdbc:mysql://127.0.0.1:3306/test_storage^^^product^^^1
           xid: 169.254.51.32:8091:81498624360579072
transaction_id: 81498624360579072
     branch_id: 81498626721972225
   resource_id: jdbc:mysql://127.0.0.1:3306/test_storage
    table_name: product
            pk: 1
    gmt_create: 2020-12-13 21:26:26
  gmt_modified: 2020-12-13 21:26:26
2 rows in set (0.00 sec)

mysql> select * from branch_table\G
*************************** 1. row ***************************
        branch_id: 81498626721972225
              xid: 169.254.51.32:8091:81498624360579072
   transaction_id: 81498624360579072
resource_group_id: NULL
      resource_id: jdbc:mysql://127.0.0.1:3306/test_storage
      branch_type: AT
           status: 0
        client_id: product-service:169.254.51.32:56219
 application_data: NULL
       gmt_create: 2020-12-13 21:26:26.893741
     gmt_modified: 2020-12-13 21:26:26.893741
*************************** 2. row ***************************
        branch_id: 81498628714266625
              xid: 169.254.51.32:8091:81498624360579072
   transaction_id: 81498624360579072
resource_group_id: NULL
      resource_id: jdbc:mysql://127.0.0.1:3306/test_pay
      branch_type: AT
           status: 0
        client_id: account-service:169.254.51.32:56051
 application_data: NULL
       gmt_create: 2020-12-13 21:26:27.387070
     gmt_modified: 2020-12-13 21:26:27.387070
2 rows in set (0.07 sec)

mysql> select * from global_table\G
*************************** 1. row ***************************
                      xid: 169.254.51.32:8091:81498624360579072
           transaction_id: 81498624360579072
                   status: 1
           application_id: client
transaction_service_group: my_test_tx_group
         transaction_name: testRollback()
                  timeout: 60000
               begin_time: 1607865986218
         application_data: NULL
               gmt_create: 2020-12-13 21:26:26
             gmt_modified: 2020-12-13 21:26:26
1 row in set (0.12 sec)

可以看到lock_table 記錄了行鎖的資訊,鎖住了資料庫某個表某條資料的ID資訊。

branch_table 記錄了當前Tx參與的會話分支。

global_table 記錄了Tx資訊。

(2)檢視庫存庫中的產品表:發現庫存數是8

(3) 檢視庫存庫中的undo_log中資料如下

可以看到第三條資料,記錄branchId、xid、以及一個rollback_info與log_status(1是已經處理,2是未處理)。rollback_info資訊如下:

{
    "@class": "io.seata.rm.datasource.undo.BranchUndoLog",
    "xid": "169.254.51.32:8091:81498624360579072",
    "branchId": 81498626721972225,
    "sqlUndoLogs": ["java.util.ArrayList", [{
        "@class": "io.seata.rm.datasource.undo.SQLUndoLog",
        "sqlType": "UPDATE",
        "tableName": "product",
        "beforeImage": {
            "@class": "io.seata.rm.datasource.sql.struct.TableRecords",
            "tableName": "product",
            "rows": ["java.util.ArrayList", [{
                "@class": "io.seata.rm.datasource.sql.struct.Row",
                "fields": ["java.util.ArrayList", [{
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "id",
                    "keyType": "PRIMARY_KEY",
                    "type": 4,
                    "value": 1
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "price",
                    "keyType": "NULL",
                    "type": 8,
                    "value": 5.0
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "stock",
                    "keyType": "NULL",
                    "type": 4,
                    "value": 9
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "last_update_time",
                    "keyType": "NULL",
                    "type": 93,
                    "value": ["java.sql.Timestamp", [1575669061000, 0]]
                }]]
            }]]
        },
        "afterImage": {
            "@class": "io.seata.rm.datasource.sql.struct.TableRecords",
            "tableName": "product",
            "rows": ["java.util.ArrayList", [{
                "@class": "io.seata.rm.datasource.sql.struct.Row",
                "fields": ["java.util.ArrayList", [{
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "id",
                    "keyType": "PRIMARY_KEY",
                    "type": 4,
                    "value": 1
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "price",
                    "keyType": "NULL",
                    "type": 8,
                    "value": 5.0
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "stock",
                    "keyType": "NULL",
                    "type": 4,
                    "value": 8
                }, {
                    "@class": "io.seata.rm.datasource.sql.struct.Field",
                    "name": "last_update_time",
                    "keyType": "NULL",
                    "type": 93,
                    "value": ["java.sql.Timestamp", [1575640261000, 0]]
                }]]
            }]]
        }
    }]]
}

4. 放開斷點讓程式報錯,檢視產品庫存:發現仍然是9,證明分散式事務確實回滾。

也可以檢視業務日誌:如下是庫存服務的日誌(xid、branchid與上面不一致是多次試驗截得)

2020-12-13 21:39:11.182  INFO 14932 --- [ch_RMROLE_1_4_8] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:xid=169.254.51.32:8091:81501788107309056,branchId=81501789592092673,branchType=AT,resourceId=jdbc:mysql://127.0.0.1:3306/test_storage,applicationData=null
2020-12-13 21:39:11.182  INFO 14932 --- [ch_RMROLE_1_4_8] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 169.254.51.32:8091:81501788107309056 81501789592092673 jdbc:mysql://127.0.0.1:3306/test_storage
2020-12-13 21:39:11.408  INFO 14932 --- [ch_RMROLE_1_4_8] i.s.r.d.undo.AbstractUndoLogManager      : xid 169.254.51.32:8091:81501788107309056 branch 81501789592092673, undo_log deleted with GlobalFinished
2020-12-13 21:39:11.410  INFO 14932 --- [ch_RMROLE_1_4_8] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked

5. Seata過程簡單總結

1. 五步驟再次理解

1.TM 向 TC 請求發起(Begin)、提交(Commit)、回滾(Rollback)全域性事務。
2.TM 把代表全域性事務的 XID 繫結到分支事務上。
3.RM 向 TC 註冊,把分支事務關聯到 XID 代表的全域性事務中。
4.RM 把分支事務的執行結果上報給 TC。(可選)
5.TC 傳送分支提交(Branch Commit)或分支回滾(Branch Rollback)命令給 RM。

TM可以理解為加了GlobalTransactional 註解的方法,會向TC申請一個全域性事務ID(XID),TC就是seataserver。RM就是資源管理器,可以理解為一個數據庫連線就是一個RM,一個業務庫對應一個RM。

2. 上面其實是 Seata的AT模式,可以理解為無程式碼侵入,不需自己編寫回滾以及其他方法,seata自己控制。官方對AT模式解釋如下:提供無侵入自動補償的事務模式,目前已支援 MySQL、 Oracle 、PostgreSQL和 TiDB的AT模式。

3.AT模式整體機制如下:

兩階段提交協議的演變:

一階段:業務資料和回滾日誌記錄在同一個本地事務中提交,釋放本地鎖和連線資源。

二階段:

  提交非同步化,非常快速地完成。
  回滾通過一階段的回滾日誌進行反向補償。

解釋:

第一階段seata會攔截業務SQL(使用seata代理資料來源進行處理,也就是對資料來源做手腳):

(1)解析業務SQL將要更新的資料,記錄其更新前的值,"BeforeImage"

(2)執行業務SQL,進行更新

(3)記錄更新後的值,"AfterImage",最後生成行鎖。

  上面的操作在一個事務內操作,可以保證原子性。可以通過上面業務庫的undo_log日誌表的rollback_info 欄位進行檢視,說白了就是記錄一下更新前、更新後的值;如果報錯需要回滾將資料修改為更新前的值,如果正常提交刪掉undo_log中的記錄即可。

第二階段:如果報錯就回滾,否則提交。

提交》如果正常操作進行提交事務。Seata框架只需要將一階段生成的快照和行鎖刪掉即可,完成資料清理。

回滾》回滾時,使用"before image"進行還原資料,但是還原之前要校驗髒寫,對比"資料庫當前資料"和"after image",如果兩份資料一致證明沒有髒寫,可以還原;如果不一致,證明有髒寫,這時候就需要人工處理(可以根據undo_log中的記錄進行處理)。

參考:https://gitee.com/Qiao-Zhi/seata-test