1. 程式人生 > 實用技巧 ># SpringBoot+Mybatis+Druid+Jta+Atomikos 解決動態資料來源分散式事務問題

# SpringBoot+Mybatis+Druid+Jta+Atomikos 解決動態資料來源分散式事務問題

1.基本介紹

jta(java Transaction API)+Atomikos(事務管理器) 底層原理是分散式事務的兩階段提交

2.兩階段提交(two phase commit)

2.1 說明

當一個事務跨多個節點時,為了保持事務的原子性與一致性,需要引入一個協調者(Coordinator)來統一掌控所有參與者(Participant)的操作結果,並指示它們是否要把操作結果進行真正的提交(commit)或者回滾(rollback)。這裡資料庫充當的是參與者的角色

2.2 原理

提交請求(投票)階段

  • 協調者向所有參與者傳送prepare請求與事務內容,詢問是否可以準備事務提交,並等待參與者的響應。
  • 參與者執行事務中包含的操作,並記錄undo日誌(用於回滾)和redo日誌(用於重放),但不真正提交。
  • 參與者向協調者返回事務操作的執行結果,執行成功返回yes,否則返回no。

提交(執行)階段

分為成功與失敗兩種情況。

若所有參與者都返回yes,說明事務可以提交:

  • 協調者向所有參與者傳送commit請求。
  • 參與者收到commit請求後,將事務真正地提交上去,並釋放佔用的事務資源,並向協調者返回ack。
  • 協調者收到所有參與者的ack訊息,事務成功完成。

若有參與者返回no或者超時未返回,說明事務中斷,需要回滾:

  • 協調者向所有參與者傳送rollback請求。
  • 參與者收到rollback請求後,根據undo日誌回滾到事務執行前的狀態,釋放佔用的事務資源,並向協調者返回ack。
  • 協調者收到所有參與者的ack訊息,事務回滾完成

弊端

1.同步阻塞問題。執行過程中,所有參與節點都是事務阻塞型的。所以這樣很影響效率。

2.單點故障。由於協調者的重要性,一旦協調者發生故障。參與者會一直阻塞下去

3.仍然存在不一致風險。如果由於網路異常等意外導致只有部分參與者收到了commit請求,就會造成部分參與者提交了事務而其他參與者未提交的情況。

3.編寫程式碼

許多解釋在程式碼中均有體現.

3.1 引入相關jar包

<dependencies>
     <!--開啟AOP切面-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-aop</artifactId>
    </dependency>

    <dependency>
        <groupId>org.mybatis.spring.boot</groupId>
        <artifactId>mybatis-spring-boot-starter</artifactId>
        <version>2.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.47</version>
    </dependency>
	
     <!--druid連線池-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid-spring-boot-starter</artifactId>
        <version>1.1.14</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>

    <!--分散式事務-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    </dependency>

3.2 application.yml

spring:
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driverClassName: com.mysql.jdbc.Driver
    druid:
      # 主庫資料來源
      master:
        url: jdbc:mysql://localhost:3306/ry?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
        username: root
        password: 123456
      # 從庫資料來源
      slave:
        # 是否開啟從資料來源,預設關閉
        enabled: true
        url: jdbc:mysql://localhost:3306/data_catalog?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
        username: root
        password: 123456
      # 初始連線數
      initialSize: 5
      # 最小連線池數量
      minIdle: 10
      # 最大連線池數量
      maxActive: 20
      # 配置獲取連線等待超時的時間
      maxWait: 60000
      # 配置間隔多久才進行一次檢測,檢測需要關閉的空閒連線,單位是毫秒
      timeBetweenEvictionRunsMillis: 60000
      # 配置一個連線在池中最小生存的時間,單位是毫秒
      minEvictableIdleTimeMillis: 300000
      # 配置一個連線在池中最大生存的時間,單位是毫秒
      maxEvictableIdleTimeMillis: 900000
      # 配置檢測連線是否有效
      validationQuery: SELECT 1 FROM DUAL
      testWhileIdle: true
      testOnBorrow: false
      testOnReturn: false
      webStatFilter:
        enabled: true
      statViewServlet:
        enabled: true
        # 設定白名單,不填則允許所有訪問
        allow:
        url-pattern: /druid/*
        # 控制檯管理使用者名稱和密碼
        login-username:
        login-password:
      filter:
        stat:
          enabled: true
          # 慢SQL記錄
          log-slow-sql: true
          slow-sql-millis: 1000
          merge-sql: true
        wall:
          config:
            multi-statement-allow: true
server:
  port: 8000

3.3 自定義註解

@Target({ ElementType.METHOD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface MyDataSource {
    /**
     * 切換資料來源名稱
     */
    public DataSourceType value() default DataSourceType.MASTER;
}

3.4 自定義列舉類

public enum DataSourceType {
    /**
     * 主庫
     */
    MASTER,

    /**
     * 從庫
     */
    SLAVE
}

3.5 自定義aop切面

@Aspect
@Component
@Order(1)
public class MyDataSourceAsp {
    /**
     * 掃描所有與這個註解有關的
     * :@within:用於匹配所有持有指定註解型別內的方法和類;
     * 也就是說只要有一個類上的有這個,使用@within這個註解,就能拿到下面所有的方法
     *:@annotation:用於匹配當前執行方法持有指定註解的方法,而這個註解只針對方法
     *
     * 不新增掃描路徑,應該是根據啟動類的掃描範圍執行的
     */
    @Pointcut("@annotation(com.shw.dynamic.annotation.MyDataSource) " +
            "|| @within(com.shw.dynamic.annotation.MyDataSource)")
    public void doPointCut() {
    }

    @Around("doPointCut()")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        MyDataSource dataSource = getDataSource(joinPoint);
        if (dataSource != null) {
            RoutingDataSourceContext.setDataSourceRoutingKey(dataSource.value().name());
        }
        try {
            // 繼續執行
            return joinPoint.proceed();
        } finally {
            //關閉執行緒資源 在執行方法之後
            RoutingDataSourceContext.close();
        }
    }

    /**
     * 獲取類或者方法上的註解
     * 先獲取方法上的註解,然後在獲取類上的註解,這就實現了方法上資料來源切換優先於類上的
     * @param joinPoint 正在執行的連線點
     * @return 註解
     */
    private MyDataSource getDataSource(ProceedingJoinPoint joinPoint) {
        MethodSignature method = (MethodSignature) joinPoint.getSignature();
        // 獲取方法上的註解
        MyDataSource annotation = method.getMethod().getAnnotation(MyDataSource.class);
        if (annotation != null) {
            return annotation;
        } else {
            // 獲取到這個註解上的類
            Class<?> aClass = joinPoint.getTarget().getClass();
            // 獲取到這個類上的註解
            MyDataSource dataSource = aClass.getAnnotation(MyDataSource.class);
            // 返回類上的註解
            return dataSource;
        }
    }
}

3.6 編寫上下文資料來源

public class RoutingDataSourceContext  {

    private static Logger logger = LoggerFactory.getLogger(RoutingDataSourceContext.class);
    /**
     * 使用ThreadLocal維護變數,ThreadLocal為每個使用該變數的執行緒提供獨立的變數副本,
     *  所以每一個執行緒都可以獨立地改變自己的副本,而不會影響其它執行緒所對應的副本。
     */
    private static final ThreadLocal<String> THREAD_LOCAL_DATA_SOURCE_KEY = new ThreadLocal<>();

    /**
     * 得到資料來源名稱
     * @return
     */
    static String getDataSourceRoutingKey() {
        String key = THREAD_LOCAL_DATA_SOURCE_KEY.get();
        return key == null ? DataSourceType.MASTER.name() : key;
    }

    /**
     * 設定資料來源
     * @param key
     */
    public static void setDataSourceRoutingKey(String key) {
        logger.info("切換到{}資料來源",key);
        THREAD_LOCAL_DATA_SOURCE_KEY.set(key);
    }

    /**
     * 清空資料來源設定
     */
    public static void close() {
        THREAD_LOCAL_DATA_SOURCE_KEY.remove();
    }

}

3.7 druid連線池配置引數

@Configuration
public class DruidProperties {
    @Value("${spring.datasource.druid.initialSize}")
    private int initialSize;

    @Value("${spring.datasource.druid.minIdle}")
    private int minIdle;

    @Value("${spring.datasource.druid.maxActive}")
    private int maxActive;

    @Value("${spring.datasource.druid.maxWait}")
    private int maxWait;

    @Value("${spring.datasource.druid.timeBetweenEvictionRunsMillis}")
    private int timeBetweenEvictionRunsMillis;

    @Value("${spring.datasource.druid.minEvictableIdleTimeMillis}")
    private int minEvictableIdleTimeMillis;

    @Value("${spring.datasource.druid.maxEvictableIdleTimeMillis}")
    private int maxEvictableIdleTimeMillis;

    @Value("${spring.datasource.druid.validationQuery}")
    private String validationQuery;

    @Value("${spring.datasource.druid.testWhileIdle}")
    private boolean testWhileIdle;

    @Value("${spring.datasource.druid.testOnBorrow}")
    private boolean testOnBorrow;

    @Value("${spring.datasource.druid.testOnReturn}")
    private boolean testOnReturn;

    public DruidDataSource dataSource(DruidDataSource datasource) {
        /** 配置初始化大小、最小、最大 */
        datasource.setInitialSize(initialSize);
        datasource.setMaxActive(maxActive);
        datasource.setMinIdle(minIdle);

        /** 配置獲取連線等待超時的時間 */
        datasource.setMaxWait(maxWait);

        /** 配置間隔多久才進行一次檢測,檢測需要關閉的空閒連線,單位是毫秒 */
        datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);

        /** 配置一個連線在池中最小、最大生存的時間,單位是毫秒 */
        datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
        datasource.setMaxEvictableIdleTimeMillis(maxEvictableIdleTimeMillis);

        /**
         * 用來檢測連線是否有效的sql,要求是一個查詢語句,常用select 'x'。如果validationQuery為null,testOnBorrow、testOnReturn、testWhileIdle都不會起作用。
         */
        datasource.setValidationQuery(validationQuery);
        /** 建議配置為true,不影響效能,並且保證安全性。申請連線的時候檢測,如果空閒時間大於timeBetweenEvictionRunsMillis,執行validationQuery檢測連線是否有效。 */
        datasource.setTestWhileIdle(testWhileIdle);
        /** 申請連線時執行validationQuery檢測連線是否有效,做了這個配置會降低效能。 */
        datasource.setTestOnBorrow(testOnBorrow);
        /** 歸還連線時執行validationQuery檢測連線是否有效,做了這個配置會降低效能。 */
        datasource.setTestOnReturn(testOnReturn);
        return datasource;
    }
}

3.8 資料來源配置(重點)

@Configuration
@MapperScan(basePackages = DataSourceConfig.BASE_PACKAGES, sqlSessionTemplateRef = "sqlSessionTemplate")
public class DataSourceConfig {

    static final String BASE_PACKAGES = "com.shw.dynamic.mapper";

    private static final String MAPPER_LOCATION = "classpath:mybatis/mapper/*.xml";

    /***
     * 建立 DruidXADataSource master 用@ConfigurationProperties 自動配置屬性
     */
    @Bean
    @ConfigurationProperties("spring.datasource.druid.master")
    public DataSource druidDataSourceMaster(DruidProperties properties) {
        DruidXADataSource druidXADataSource = new DruidXADataSource();
        return properties.dataSource(druidXADataSource);
    }

    /***
     * 建立 DruidXADataSource slave
     */
    @Bean
    @ConfigurationProperties("spring.datasource.druid.slave")
    public DataSource druidDataSourceSlave(DruidProperties properties) {
        DruidXADataSource druidXADataSource = new DruidXADataSource();
        return properties.dataSource(druidXADataSource);
    }

    /**
     * 建立支援 XA 事務的 Atomikos 資料來源 master
     */
    @Bean
    public DataSource dataSourceMaster(DataSource druidDataSourceMaster) {
        AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
        sourceBean.setXaDataSource((DruidXADataSource) druidDataSourceMaster);
        // 必須為資料來源指定唯一標識
        sourceBean.setPoolSize(5);
        sourceBean.setTestQuery("SELECT 1");
        sourceBean.setUniqueResourceName("master");
        return sourceBean;
    }

    /**
     * 建立支援 XA 事務的 Atomikos 資料來源 slave
     */
    @Bean
    public DataSource dataSourceSlave(DataSource druidDataSourceSlave) {
        AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
        sourceBean.setXaDataSource((DruidXADataSource) druidDataSourceSlave);
        sourceBean.setPoolSize(5);
        sourceBean.setTestQuery("SELECT 1");
        sourceBean.setUniqueResourceName("slave");
        return sourceBean;
    }


    /**
     * @param dataSourceMaster 資料來源 master
     * @return 資料來源 master 的會話工廠
     */
    @Bean
    public SqlSessionFactory sqlSessionFactoryMaster(DataSource dataSourceMaster)
            throws Exception {
        return createSqlSessionFactory(dataSourceMaster);
    }


    /**
     * @param dataSourceSlave 資料來源 slave
     * @return 資料來源 slave 的會話工廠
     */
    @Bean
    public SqlSessionFactory sqlSessionFactorySlave(DataSource dataSourceSlave)
            throws Exception {
        return createSqlSessionFactory(dataSourceSlave);
    }


    /***
     * sqlSessionTemplate 與 Spring 事務管理一起使用,以確保使用的實際 SqlSession 是與當前 Spring 事務關聯的,
     * 此外它還管理會話生命週期,包括根據 Spring 事務配置根據需要關閉,提交或回滾會話
     * @param sqlSessionFactoryMaster 資料來源 master
     * @param sqlSessionFactorySlave 資料來源 slave
     */
    @Bean
    public MySqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactoryMaster, SqlSessionFactory sqlSessionFactorySlave) {
        Map<Object, SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>();
        sqlSessionFactoryMap.put(DataSourceType.MASTER.name(), sqlSessionFactoryMaster);
        sqlSessionFactoryMap.put(DataSourceType.SLAVE.name(), sqlSessionFactorySlave);
        MySqlSessionTemplate customSqlSessionTemplate = new MySqlSessionTemplate(sqlSessionFactoryMaster);
        customSqlSessionTemplate.setTargetSqlSessionFactories(sqlSessionFactoryMap);
        return customSqlSessionTemplate;
    }

    /***
     * 自定義會話工廠
     * @param dataSource 資料來源
     * @return :自定義的會話工廠
     */
    private SqlSessionFactory createSqlSessionFactory(DataSource dataSource) throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(dataSource);
        org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
        //配置駝峰命名
        configuration.setMapUnderscoreToCamelCase(true);
        //配置sql日誌
        configuration.setLogImpl(StdOutImpl.class);
        factoryBean.setConfiguration(configuration);
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        //配置讀取mapper.xml路徑
        factoryBean.setMapperLocations(resolver.getResources(MAPPER_LOCATION));
        return factoryBean.getObject();
    }
}

3.9 (重點)重寫SqlSessionTemplate,也就是把SqlSessionTemplate這個類copy一份,修改getSqlSessionFactory這個方法返回值.

public class MySqlSessionTemplate extends SqlSessionTemplate {
    private final SqlSessionFactory sqlSessionFactory;
    private final ExecutorType executorType;
    private final SqlSession sqlSessionProxy;
    private final PersistenceExceptionTranslator exceptionTranslator;
    private Map<Object, SqlSessionFactory> targetSqlSessionFactories;
    private SqlSessionFactory defaultTargetSqlSessionFactory;
 
    /**
     * 通過Map傳入
     * @param targetSqlSessionFactories
     */
    public void setTargetSqlSessionFactories(Map<Object, SqlSessionFactory> targetSqlSessionFactories) {
        this.targetSqlSessionFactories = targetSqlSessionFactories;
    }
    public void setDefaultTargetSqlSessionFactory(SqlSessionFactory defaultTargetSqlSessionFactory) {
        this.defaultTargetSqlSessionFactory = defaultTargetSqlSessionFactory;
    }
    public MySqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
        this(sqlSessionFactory, sqlSessionFactory.getConfiguration().getDefaultExecutorType());
    }
    public MySqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType) {
        this(sqlSessionFactory, executorType, new MyBatisExceptionTranslator(sqlSessionFactory.getConfiguration()
                .getEnvironment().getDataSource(), true));
    }
    public MySqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType,
                                PersistenceExceptionTranslator exceptionTranslator) {
        super(sqlSessionFactory, executorType, exceptionTranslator);
        this.sqlSessionFactory = sqlSessionFactory;
        this.executorType = executorType;
        this.exceptionTranslator = exceptionTranslator;
        this.sqlSessionProxy = (SqlSession) newProxyInstance(
                SqlSessionFactory.class.getClassLoader(),
                new Class[] { SqlSession.class },
                new SqlSessionInterceptor());
        this.defaultTargetSqlSessionFactory = sqlSessionFactory;
    }
    //通過DataSourceContextHolder獲取當前的會話工廠
    @Override
    public SqlSessionFactory getSqlSessionFactory() {
        String dataSourceKey = RoutingDataSourceContext.getDataSourceRoutingKey();
        SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactories.get(dataSourceKey);
        if (targetSqlSessionFactory != null) {
            return targetSqlSessionFactory;
        } else if (defaultTargetSqlSessionFactory != null) {
            return defaultTargetSqlSessionFactory;
        } else {
            Assert.notNull(targetSqlSessionFactories, "Property 'targetSqlSessionFactories' or 'defaultTargetSqlSessionFactory' are required");
            Assert.notNull(defaultTargetSqlSessionFactory, "Property 'defaultTargetSqlSessionFactory' or 'targetSqlSessionFactories' are required");
        }
        return this.sqlSessionFactory;
    }
 
 
    @Override
    public Configuration getConfiguration() {
        return this.getSqlSessionFactory().getConfiguration();
    }
    @Override
    public ExecutorType getExecutorType() {
        return this.executorType;
    }
    @Override
    public PersistenceExceptionTranslator getPersistenceExceptionTranslator() {
        return this.exceptionTranslator;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public <T> T selectOne(String statement) {
        return this.sqlSessionProxy.<T> selectOne(statement);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public <T> T selectOne(String statement, Object parameter) {
        return this.sqlSessionProxy.<T> selectOne(statement, parameter);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public <K, V> Map<K, V> selectMap(String statement, String mapKey) {
        return this.sqlSessionProxy.<K, V> selectMap(statement, mapKey);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey) {
        return this.sqlSessionProxy.<K, V> selectMap(statement, parameter, mapKey);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey, RowBounds rowBounds) {
        return this.sqlSessionProxy.<K, V> selectMap(statement, parameter, mapKey, rowBounds);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public <E> List<E> selectList(String statement) {
        return this.sqlSessionProxy.<E> selectList(statement);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public <E> List<E> selectList(String statement, Object parameter) {
        return this.sqlSessionProxy.<E> selectList(statement, parameter);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
        return this.sqlSessionProxy.<E> selectList(statement, parameter, rowBounds);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void select(String statement, ResultHandler handler) {
        this.sqlSessionProxy.select(statement, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void select(String statement, Object parameter, ResultHandler handler) {
        this.sqlSessionProxy.select(statement, parameter, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler) {
        this.sqlSessionProxy.select(statement, parameter, rowBounds, handler);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public int insert(String statement) {
        return this.sqlSessionProxy.insert(statement);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public int insert(String statement, Object parameter) {
        return this.sqlSessionProxy.insert(statement, parameter);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public int update(String statement) {
        return this.sqlSessionProxy.update(statement);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public int update(String statement, Object parameter) {
        return this.sqlSessionProxy.update(statement, parameter);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public int delete(String statement) {
        return this.sqlSessionProxy.delete(statement);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public int delete(String statement, Object parameter) {
        return this.sqlSessionProxy.delete(statement, parameter);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public <T> T getMapper(Class<T> type) {
        return getConfiguration().getMapper(type, this);
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void commit() {
        throw new UnsupportedOperationException("Manual commit is not allowed over a Spring managed SqlSession");
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void commit(boolean force) {
        throw new UnsupportedOperationException("Manual commit is not allowed over a Spring managed SqlSession");
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void rollback() {
        throw new UnsupportedOperationException("Manual rollback is not allowed over a Spring managed SqlSession");
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void rollback(boolean force) {
        throw new UnsupportedOperationException("Manual rollback is not allowed over a Spring managed SqlSession");
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void close() {
        throw new UnsupportedOperationException("Manual close is not allowed over a Spring managed SqlSession");
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void clearCache() {
        this.sqlSessionProxy.clearCache();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public Connection getConnection() {
        return this.sqlSessionProxy.getConnection();
    }
    /**
     * {@inheritDoc}
     * @since 1.0.2
     */
    @Override
    public List<BatchResult> flushStatements() {
        return this.sqlSessionProxy.flushStatements();
    }
    /**
     * Proxy needed to route MyBatis method calls to the proper SqlSession got from Spring's Transaction Manager It also
     * unwraps exceptions thrown by {@code Method#invoke(Object, Object...)} to pass a {@code PersistenceException} to
     * the {@code PersistenceExceptionTranslator}.
     */
    private class SqlSessionInterceptor implements InvocationHandler {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            final SqlSession sqlSession = getSqlSession(
                    MySqlSessionTemplate.this.getSqlSessionFactory(),
                    MySqlSessionTemplate.this.executorType,
                    MySqlSessionTemplate.this.exceptionTranslator);
            try {
                Object result = method.invoke(sqlSession, args);
                if (!isSqlSessionTransactional(sqlSession, MySqlSessionTemplate.this.getSqlSessionFactory())) {
                    sqlSession.commit(true);
                }
                return result;
            } catch (Throwable t) {
                Throwable unwrapped = unwrapThrowable(t);
                if (MySqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
                    Throwable translated = MySqlSessionTemplate.this.exceptionTranslator
                            .translateExceptionIfPossible((PersistenceException) unwrapped);
                    if (translated != null) {
                        unwrapped = translated;
                    }
                }
                throw unwrapped;
            } finally {
                closeSqlSession(sqlSession, MySqlSessionTemplate.this.getSqlSessionFactory());
            }
        }
    }
}

3.10 事務管理器配置

@Configuration
@EnableTransactionManagement
public class XATransactionManagerConfig {
 
    @Bean
    public UserTransaction userTransaction() throws Throwable {
        UserTransactionImp userTransactionImp = new UserTransactionImp();
        userTransactionImp.setTransactionTimeout(10000);
        return userTransactionImp;
    }
 
    @Bean
    public TransactionManager atomikosTransactionManager() {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setForceShutdown(true);
        return userTransactionManager;
    }
 
    @Bean
    public PlatformTransactionManager transactionManager(UserTransaction userTransaction,
                                                         TransactionManager transactionManager) {
        return new JtaTransactionManager(userTransaction, transactionManager);
    }
}

4.測試

@Controller
@RestController
public class Hello {

    @Autowired
    private HelloMapper helloMapper;

    @GetMapping("/hello")
    @Transactional(rollbackFor = Exception.class)
    public List<Map> hello() {
        List<Map> school = helloMapper.getSchool();
        System.out.println(school);
        List<Map> user = helloMapper.getCatalog();
        System.out.println(user);
        return null;
    }

    @GetMapping("/hi")
    @Transactional(rollbackFor = Exception.class)
    public List<Map> hi() {
        helloMapper.insertCatalog();
        int i = 1/0;
        helloMapper.insertSchool();
        return null;
    }

}
public interface HelloMapper {

    @MyDataSource(DataSourceType.SLAVE)
    List<Map> getCatalog();

    List<Map> getSchool();

    @MyDataSource(DataSourceType.SLAVE)
    void insertCatalog();

    void insertSchool();

}

結論

在以上程式碼的情況下,在需要進行資料來源切換的時候,在介面上或方法上添加註解@MyDataSource(DataSourceType.SLAVE)切換到slave資料來源,如果在方法上添加了事務,資料來源依舊可以切換成功,且當新增事務的方法中發生了異常,整個方法都會回滾.至此,多資料來源切換分散式事務問題解決成功.

參考文章:

https://blog.csdn.net/qq_35387940/article/details/103474353

git倉庫地址:

https://github.com/sunhuawei0517/dynamicDataSource/tree/jta

mster分支為多資料來源切換,jta分支為多資料來源+分散式事務