1. 程式人生 > 實用技巧 >Seata(simple extensiable autonomous transaction architecture)

Seata(simple extensiable autonomous transaction architecture)

Seata 是一款開源的分散式事務解決方案,致力於提供高效能和簡單易用的分散式事務服務。Seata 將為使用者提供了 AT、TCC、SAGA 和 XA 事務模式,為使用者打造一站式的分散式解決方案。

AT 模式(比較完善)

前提

  • 基於支援本地 ACID 事務的關係型資料庫。
  • Java 應用,通過 JDBC 訪問資料庫。

整體機制

兩階段提交協議的演變:

  • 一階段:業務資料和回滾日誌記錄在同一個本地事務中提交,釋放本地鎖和連線資源。

  • 二階段:

    • 提交非同步化,非常快速地完成。
    • 回滾通過一階段的回滾日誌進行反向補償。

寫隔離

  • 一階段本地事務提交前,需要確保先拿到 全域性鎖
  • 拿不到 全域性鎖
    ,不能提交本地事務。
  • 全域性鎖 的嘗試被限制在一定範圍內,超出範圍將放棄,並回滾本地事務,釋放本地鎖。

以一個示例來說明:

兩個全域性事務 tx1 和 tx2,分別對 a 表的 m 欄位進行更新操作,m 的初始值 1000。

tx1 先開始,開啟本地事務,拿到本地鎖,更新操作 m = 1000 - 100 = 900。本地事務提交前,先拿到該記錄的 全域性鎖 ,本地提交釋放本地鎖。 tx2 後開始,開啟本地事務,拿到本地鎖,更新操作 m = 900 - 100 = 800。本地事務提交前,嘗試拿該記錄的 全域性鎖 ,tx1 全域性提交前,該記錄的全域性鎖被 tx1 持有,tx2 需要重試等待 全域性鎖

如果 tx1 的二階段全域性回滾,則 tx1 需要重新獲取該資料的本地鎖,進行反向補償的更新操作,實現分支的回滾。

此時,如果 tx2 仍在等待該資料的 全域性鎖,同時持有本地鎖,則 tx1 的分支回滾會失敗。分支的回滾會一直重試,直到 tx2 的 全域性鎖 等鎖超時,放棄 全域性鎖 並回滾本地事務釋放本地鎖,tx1 的分支回滾最終成功。

因為整個過程 全域性鎖 在 tx1 結束前一直是被 tx1 持有的,所以不會發生 髒寫 的問題。

讀隔離

在資料庫本地事務隔離級別 讀已提交(Read Committed) 或以上的基礎上,Seata(AT 模式)的預設全域性隔離級別是 讀未提交(Read Uncommitted)

如果應用在特定場景下,必需要求全域性的 讀已提交 ,目前 Seata 的方式是通過 SELECT FOR UPDATE 語句的代理。

SELECT FOR UPDATE 語句的執行會申請 全域性鎖 ,如果 全域性鎖 被其他事務持有,則釋放本地鎖(回滾 SELECT FOR UPDATE 語句的本地執行)並重試。這個過程中,查詢是被 block 住的,直到 全域性鎖 拿到,即讀取的相關資料是 已提交 的,才返回。

出於總體效能上的考慮,Seata 目前的方案並沒有對所有 SELECT 語句都進行代理,僅針對 FOR UPDATE 的 SELECT 語句。

工作機制

以一個示例來說明整個 AT 分支的工作過程。

業務表:product

AT 分支事務的業務邏輯:

update product set name = 'GTS' where name = 'TXC';

一階段

過程:

  1. 解析 SQL:得到 SQL 的型別(UPDATE),表(product),條件(where name = 'TXC')等相關的資訊。
  2. 查詢前映象:根據解析得到的條件資訊,生成查詢語句,定位資料
 select id, name, since from product where name = 'TXC';

得到前映象:

  1. 執行業務 SQL:更新這條記錄的 name 為 'GTS'。
  2. 查詢後鏡像:根據前映象的結果,通過 主鍵 定位資料。
                select id, name, since from product where id = 1;
得到後鏡像:

5.插入回滾日誌:把前後映象資料以及業務 SQL 相關的資訊組成一條回滾日誌記錄,插入到 UNDO_LOG 表中。

{
    "branchId": 641789253,
    "undoItems": [{
        "afterImage": {
            "rows": [{
                "fields": [{
                    "name": "id",
                    "type": 4,
                    "value": 1
                }, {
                    "name": "name",
                    "type": 12,
                    "value": "GTS"
                }, {
                    "name": "since",
                    "type": 12,
                    "value": "2014"
                }]
            }],
            "tableName": "product"
        },
        "beforeImage": {
            "rows": [{
                "fields": [{
                    "name": "id",
                    "type": 4,
                    "value": 1
                }, {
                    "name": "name",
                    "type": 12,
                    "value": "TXC"
                }, {
                    "name": "since",
                    "type": 12,
                    "value": "2014"
                }]
            }],
            "tableName": "product"
        },
        "sqlType": "UPDATE"
    }],
    "xid": "xid:xxx"
}
  1. 提交前,向 TC 註冊分支:申請 product 表中,主鍵值等於 1 的記錄的 全域性鎖
  2. 本地事務提交:業務資料的更新和前面步驟中生成的 UNDO LOG 一併提交。
  3. 將本地事務提交的結果上報給 TC。

二階段-回滾

  1. 收到 TC 的分支回滾請求,開啟一個本地事務,執行如下操作。
  2. 通過 XID 和 Branch ID 查詢到相應的 UNDO LOG 記錄。
  3. 資料校驗:拿 UNDO LOG 中的後鏡與當前資料進行比較,如果有不同,說明資料被當前全域性事務之外的動作做了修改。這種情況,需要根據配置策略來做處理,詳細的說明在另外的文件中介紹。
  4. 根據 UNDO LOG 中的前映象和業務 SQL 的相關資訊生成並執行回滾的語句:
update product set name = 'TXC' where id = 1;
  1. 提交本地事務。並把本地事務的執行結果(即分支事務回滾的結果)上報給 TC。

二階段-提交

  1. 收到 TC 的分支提交請求,把請求放入一個非同步任務的佇列中,馬上返回提交成功的結果給 TC。
  2. 非同步任務階段的分支提交請求將非同步和批量地刪除相應 UNDO LOG 記錄。

-- 注意此處0.7.0+ 增加欄位 context
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

------------------------------------------------------------------------實現------------------------------------------------------

1.下載seata-server-0.9.0並解壓

2.修改registry.conf檔案

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "file"   

  nacos {
    serverAddr = "localhost"
    namespace = ""
    cluster = "default"
  }
  eureka {
    serviceUrl = "http://localhost:8761/eureka"
    application = "default"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"

  nacos {
    serverAddr = "localhost"
    namespace = ""
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    app.id = "seata-server"
    apollo.meta = "http://192.168.1.204:8801"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    session.timeout = 6000
    connect.timeout = 2000
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

3.有必要的時候修改file.conf 目前沒有發揮作用

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  #thread factory for netty
  thread-factory {
    boss-thread-prefix = "NettyBoss"
    worker-thread-prefix = "NettyServerNIOWorker"
    server-executor-thread-prefix = "NettyServerBizHandler"
    share-boss-worker = false
    client-selector-thread-prefix = "NettyClientSelector"
    client-selector-thread-size = 1
    client-worker-thread-prefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    boss-thread-size = 1
    #auto default pin or 8
    worker-thread-size = 8
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  #vgroup->rgroup
  vgroup_mapping.my_test_tx_group = "default"
  #only support single node
  default.grouplist = "127.0.0.1:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
  max.commit.retry.timeout = "-1"
  max.rollback.retry.timeout = "-1"
}

client {
  async.commit.buffer.limit = 10000
  lock {
    retry.internal = 10
    retry.times = 30
  }
  report.retry.count = 5
  tm.commit.retry.count = 1
  tm.rollback.retry.count = 1
}

## transaction log store
store {
  ## store mode: file、db
  mode = "file"

  ## file store
  file {
    dir = "sessionStore"

    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    max-branch-session-size = 16384
    # globe session size , if exceeded throws exceptions
    max-global-session-size = 512
    # file buffer size , if exceeded allocate new buffer
    file-write-buffer-cache-size = 16384
    # when recover batch read size
    session.reload.read_size = 100
    # async, sync
    flush-disk-mode = async
  }

  ## database store
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    datasource = "dbcp"
    ## mysql/oracle/h2/oceanbase etc.
    db-type = "mysql"
    driver-class-name = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://127.0.0.1:3306/seata"
    user = "mysql"
    password = "mysql"
    min-conn = 1
    max-conn = 3
##這幾個表需要在資料庫新建,檔案在conf資料夾下
global.table = "global_table" branch.table = "branch_table" lock-table = "lock_table" query-limit = 100 } } lock { ## the lock store mode: local、remote mode = "remote" local { ## store locks in user's database } remote { ## store locks in the seata's server } } recovery { #schedule committing retry period in milliseconds committing-retry-period = 1000 #schedule asyn committing retry period in milliseconds asyn-committing-retry-period = 1000 #schedule rollbacking retry period in milliseconds rollbacking-retry-period = 1000 #schedule timeout retry period in milliseconds timeout-retry-period = 1000 } transaction { undo.data.validation = true undo.log.serialization = "jackson" undo.log.save.days = 7 #schedule delete expired undo_log in milliseconds undo.log.delete.period = 86400000 undo.log.table = "undo_log" ##必須存在,那個微服務需要用到seata就在那個微服務所在庫新建,檔案也在conf資料夾下 } ## metrics settings metrics { enabled = false registry-type = "compact" # multi exporters use comma divided exporter-list = "prometheus" exporter-prometheus-port = 9898 } support { ## spring spring { # auto proxy the DataSource bean datasource.autoproxy = false } }

4.可以去bin目錄下啟動seata

5.在微服務中新建 那個微服務需要用到seata就在那個微服務下建立 最好抽取出來

package com.atguigu.gmall.pms.config;

import com.zaxxer.hikari.HikariDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;

@Configuration
public class DataSourceConfig {

    /**
     * 需要將 DataSourceProxy 設定為主資料來源,否則事務無法回滾
     */
    @Primary
    @Bean("dataSource")
    public DataSource dataSource(@Value("${spring.datasource.url}")String url,@Value("${spring.datasource.driver-class-name}")String driverClassName,
                                 @Value("${spring.datasource.username}")String username,@Value("${spring.datasource.password}")String password) {
        HikariDataSource hikariDataSource = new HikariDataSource();
        hikariDataSource.setJdbcUrl(url);
        hikariDataSource.setDriverClassName(driverClassName);
        hikariDataSource.setUsername(username);
        hikariDataSource.setPassword(password);
        return new DataSourceProxy(hikariDataSource);
    }
}

在resource下新建file.conf registry.conf 檔案,,,也在解壓後seata檔案中有 conf目錄下

registry.conf檔案

相應選擇:

registry { # file 、nacos 、eureka、redis、zk type
= "nacos" nacos { serverAddr = "localhost:8848" namespace = "public" cluster = "default" } } config { # file、nacos 、apollo、zk type = "file" file { name = "file.conf" } }

file.conf檔案

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  #thread factory for netty
  thread-factory {
    boss-thread-prefix = "NettyBoss"
    worker-thread-prefix = "NettyServerNIOWorker"
    server-executor-thread-prefix = "NettyServerBizHandler"
    share-boss-worker = false
    client-selector-thread-prefix = "NettyClientSelector"
    client-selector-thread-size = 1
    client-worker-thread-prefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    boss-thread-size = 1
    #auto default pin or 8
    worker-thread-size = 8
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  #vgroup->rgroup
#預設是vgroup_mapping.${spring.application.name}-fescar-service-group ,, vgroup_mapping.這部分可以在配置檔案中配置 vgroup_mapping.pms
-server-fescar-service-group="default" #only support single node default.grouplist = "127.0.0.1:8091" #degrade current not support enableDegrade = false #disable disable = false #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent max.commit.retry.timeout = "-1" max.rollback.retry.timeout = "-1" } client { async.commit.buffer.limit = 10000 lock { retry.internal = 10 retry.times = 30 } report.retry.count = 5 tm.commit.retry.count = 1 tm.rollback.retry.count = 1 } ## transaction log store store { ## store mode: file、db mode = "file" #推薦選擇file,,db好像還不完善,,,下面這些連結隨便寫也不會報錯,目前 ## file store file { dir = "sessionStore" # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions max-branch-session-size = 16384 # globe session size , if exceeded throws exceptions max-global-session-size = 512 # file buffer size , if exceeded allocate new buffer file-write-buffer-cache-size = 16384 # when recover batch read size session.reload.read_size = 100 # async, sync flush-disk-mode = async } ## database store db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc. datasource = "dbcp" ## mysql/oracle/h2/oceanbase etc. db-type = "mysql" driver-class-name = "com.mysql.cj.jdbc.Driver" url = "jdbc:mysql://192.168.146.140:3306/seata" user = "root" password = "123456" min-conn = 1 max-conn = 3 global.table = "global_table" branch.table = "branch_table" lock-table = "lock_table" query-limit = 100 } } lock { ## the lock store mode: local、remote mode = "remote" local { ## store locks in user's database } remote { ## store locks in the seata's server } } recovery { #schedule committing retry period in milliseconds committing-retry-period = 1000 #schedule asyn committing retry period in milliseconds asyn-committing-retry-period = 1000 #schedule rollbacking retry period in milliseconds rollbacking-retry-period = 1000 #schedule timeout retry period in milliseconds timeout-retry-period = 1000 } transaction { undo.data.validation = true undo.log.serialization = "jackson" undo.log.save.days = 7 #schedule delete expired undo_log in milliseconds undo.log.delete.period = 86400000 undo.log.table = "undo_log" } ## metrics settings metrics { enabled = false registry-type = "compact" # multi exporters use comma divided exporter-list = "prometheus" exporter-prometheus-port = 9898 } support { ## spring spring { # auto proxy the DataSource bean datasource.autoproxy = false } }

6.在主入口中新增

    @Override
    @GlobalTransactional
    public void master(...) {}


選項:
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package io.seata.spring.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@Inherited
public @interface GlobalTransactional {
int timeoutMills() default 60000;

String name() default "";

Class<? extends Throwable>[] rollbackFor() default {};

String[] rollbackForClassName() default {};

Class<? extends Throwable>[] noRollbackFor() default {};

String[] noRollbackForClassName() default {};
}

其他進口:

    @Override
    @Transactional
    public void saveSkuSaleInfo(SkuSlaseVo skuSlaseVo) {}


選項:
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.springframework.transaction.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.core.annotation.AliasFor;

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Transactional {
@AliasFor("transactionManager")
String value() default "";

@AliasFor("value")
String transactionManager() default "";

Propagation propagation() default Propagation.REQUIRED;

Isolation isolation() default Isolation.DEFAULT;

int timeout() default -1;

boolean readOnly() default false;

Class<? extends Throwable>[] rollbackFor() default {};

String[] rollbackForClassName() default {};

Class<? extends Throwable>[] noRollbackFor() default {};

String[] noRollbackForClassName() default {};
}

官網:http://seata.io