1. 程式人生 > 實用技巧 >ShardingSphere的分散式事務

ShardingSphere的分散式事務

如何理解分散式事務?

在傳統的關係型資料庫中,事務是一個標準組件,幾乎所有成熟的關係型資料庫都提供了對本地事務的原生支援。本地事務提供了 ACID 事務特性。基於本地事務,為了保證資料的一致性,我們先開啟一個事務後,才可以執行資料操作,最後提交或回滾就可以了。更進一步,藉助於 Spring 等整合化框架,開發人員只需關注引起資料改變的業務即可。

但在分散式環境下,事情就會變得比較複雜。假設系統中存在多個獨立的資料庫,為了確保資料在這些獨立的資料庫中保持一致,我們需要把這些資料庫納入同一個事務中。這時本地事務就無能為力了,我們需要使用分散式事務。

業界關於如何實現分散式事務也有一些通用的實現機制,例如支援兩階段提交的 XA 協議以及以 Saga 為代表的柔性事務。針對不同的實現機制,也存在一些供應商和開發工具。因為這些開發工具在使用方式上和實現原理上都有較大的差異性,所以開發人員的一大訴求在於,希望能有一套統一的解決方案能夠遮蔽這些差異。同時,我們也希望這種解決方案能夠提供友好的系統整合性。

ShardingSphere 作為一款分散式資料庫中介軟體,勢必要考慮分散式事務的實現方案。而在設計上,ShardingSphere 從一開始就充分考慮到了開發人員的這些訴求,接下來讓我們一起來看一下。

ShardingSphere 中的分散式事務
在 ShardingSphere 中,除本地事務之外,還提供針對分散式事務的兩種實現方案,分別是 XA 事務和柔性事務。這點可以從事務型別列舉值 TransactionType 中得到驗證:

XA 事務
XA 事務提供基於兩階段提交協議的實現機制。所謂兩階段提交,顧名思義分成兩個階段,一個是準備階段,一個是執行階段。在準備階段中,協調者發起一個提議,分別詢問各參與者是否接受。在執行階段,協調者根據參與者的反饋,提交或終止事務。如果參與者全部同意則提交,只要有一個參與者不同意就終止。

目前,業界在實現 XA 事務時也存在一些主流工具庫,包括 Atomikos、Narayana 和 Bitronix。ShardingSphere 對這三種工具庫都進行了整合,並預設使用 Atomikos 來完成兩階段提交。

BASE 事務
XA 事務是典型的強一致性事務,也就是完全遵循事務的 ACID 設計原則。與 XA 事務這種“剛性”不同,柔性事務則遵循 BASE 設計理論,追求的是最終一致性。這裡的 BASE 來自基本可用(Basically Available)、軟狀態(Soft State)和最終一致性(Eventual Consistency)這三個概念。

關於如何實現基於 BASE 原則的柔性事務,業界也存在一些優秀的框架,例如阿里巴巴提供的 Seata。ShardingSphere 內部也集成了對 Seata 的支援。當然,我們也可以根據需要,整合其他分散式事務類開源框架,並基於微核心架構嵌入到 ShardingSphere 執行時環境中。

介紹完理論知識之後,接下來讓我們分別使用 XA 事務和 BASE 事務來實現分散式環境下的資料一致性。

使用 XA 事務
在 Spring 應用程式中新增對 XA 事務的支援相對簡單,無論是 Spring 框架,還是 ShardingSphere 自身,都為我們提供了低成本的開發機制。

開發環境準備
在今天的案例中,我們將演示如何在分庫環境下實現分散式事務,因此我們需要在 Spring Boot 中建立一個 .properties 檔案,幷包含分庫需要的所有配置項資訊:

spring.shardingsphere.datasource.names=ds0,ds1



spring.shardingsphere.datasource.ds0.type=com.zaxxer.hikari.HikariDataSource

spring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.jdbc.Driver

spring.shardingsphere.datasource.ds0.jdbc-url=jdbc:mysql://localhost:3306/ds0

spring.shardingsphere.datasource.ds0.username=root

spring.shardingsphere.datasource.ds0.password=root

spring.shardingsphere.datasource.ds0.autoCommit: false



spring.shardingsphere.datasource.ds1.type=com.zaxxer.hikari.HikariDataSource

spring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.jdbc.Driver

spring.shardingsphere.datasource.ds1.jdbc-url=jdbc:mysql://localhost:3306/ds1

spring.shardingsphere.datasource.ds1.username=root

spring.shardingsphere.datasource.ds1.password=root

spring.shardingsphere.datasource.ds0.autoCommit: false



spring.shardingsphere.sharding.default-database-strategy.inline.sharding-column=user_id

spring.shardingsphere.sharding.default-database-strategy.inline.algorithm-expression=ds$->{user_id % 2}

spring.shardingsphere.sharding.binding-tables=health_record,health_task

spring.shardingsphere.sharding.broadcast-tables=health_level



spring.shardingsphere.sharding.tables.health_record.actual-data-nodes=ds$->{0..1}.health_record

spring.shardingsphere.sharding.tables.health_record.key-generator.column=record_id

spring.shardingsphere.sharding.tables.health_record.key-generator.type=SNOWFLAKE

spring.shardingsphere.sharding.tables.health_record.key-generator.props.worker.id=33

spring.shardingsphere.sharding.tables.health_task.actual-data-nodes=ds$->{0..1}.health_task

spring.shardingsphere.sharding.tables.health_task.key-generator.column=task_id

spring.shardingsphere.sharding.tables.health_task.key-generator.type=SNOWFLAKE

spring.shardingsphere.sharding.tables.health_task.key-generator.props.worker.id=33



spring.shardingsphere.props.sql.show=true

實現 XA 事務

通過分庫配置,我們將獲取 SQL 執行的目標 DataSource。由於我們使用 Spring 框架而不是使用原生的 JDBC 進行事務管理,所以需要將 DataSource 與 Spring 中的事務管理器 PlatformTransactionManager 關聯起來。

另一方面,為了更好地整合 ShardingSphere 中的分散式事務支援,我們可以通過 Spring 框架提供的 JdbcTemplate 模板類來簡化 SQL 的執行過程。一種常見的做法是建立一個事務配置類來初始化所需的 PlatformTransactionManager 和 JdbcTemplate 物件:

@Configuration
@EnableTransactionManagement
public class TransactionConfiguration {
    
    @Bean
    public PlatformTransactionManager txManager(final DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }
    
    @Bean
    public JdbcTemplate jdbcTemplate(final DataSource dataSource) {
        return new JdbcTemplate(dataSource);
    }    
}

一旦初始化了 JdbcTemplate,就可以在業務程式碼中注入這個模板類來執行各種 SQL 操作,常見的做法是傳入一個 PreparedStatementCallback,並在這個回撥中執行各種具體的 SQL:

@Autowired

JdbcTemplate jdbcTemplate;



jdbcTemplate.execute(SQL, (PreparedStatementCallback<Object>) preparedStatement -> {

 …

 return preparedStatement;

});

在上面的程式碼中,我們通過 PreparedStatementCallback 回撥獲取一個 PreparedStatement 物件。或者,我們可以使用 JdbcTemplate 另一種執行 SQL 的程式碼風格,通過使用更基礎的 ConnectionCallback 回撥介面:

jdbcTemplate.execute((ConnectionCallback<Object>) connection-> {

 …

 return connection;

});

為了在業務程式碼中以最少的開發成本嵌入分散式事務機制,ShardingSphere 也專門提供了一個 @ShardingTransactionType 註解來配置所需要執行的事務型別:

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface ShardingTransactionType {
    TransactionType value() default TransactionType.LOCAL;
}

我們知道,ShardingSphere 提供的事務型別有三種,分別是 LOCAL、XA 和 BASE,預設使用的是 LOCAL。所以如果需要用到分散式事務,需要在業務方法上顯式的新增這個註解:

@Transactional
@ShardingTransactionType(TransactionType.XA)
public void insert(){
 …
}

另一種設定 TransactionType 的方式是使用 TransactionTypeHolder 工具類。TransactionTypeHolder 類中通過 ThreadLocal 來儲存 TransactionType:

public final class TransactionTypeHolder {
    
    private static final ThreadLocal<TransactionType> CONTEXT = new ThreadLocal<TransactionType>() {
        
        @Override
        protected TransactionType initialValue() {
            return TransactionType.LOCAL;
        }
    };
    
    /**
     * Get transaction type for current thread.
     *
     * @return transaction type
     */
    public static TransactionType get() {
        return CONTEXT.get();
    }
    
    /**
     * Set transaction type for current thread.
     *
     * @param transactionType transaction type
     */
    public static void set(final TransactionType transactionType) {
        CONTEXT.set(transactionType);
    }
    
    /**
     * Clear transaction type for current thread.
     */
    public static void clear() {
        CONTEXT.remove();
    }
}

可以看到,TransactionTypeHolder 中預設採用的是本地事務,我們可以通過 set 方法來改變初始設定:

TransactionTypeHolder.set(TransactionType.XA);

現在,使用 XA 開發分散式事務的整體結構的方法已經梳理清楚了,我們可以通過建立一個 insertHealthRecords 方法,在其中新增對 HealthRecord 和 HealthTask 的資料插入程式碼:

private List<Long> insertHealthRecords() throws SQLException {
		List<Long> result = new ArrayList<>(10);       
		jdbcTemplate.execute((ConnectionCallback<Object>) connection-> {
			connection.setAutoCommit(false);
			
        	try {        		
        		for (Long i = 1L; i <= 10; i++) {
        			HealthRecord healthRecord = createHealthRecord(i);   		   		
    	    		insertHealthRecord(healthRecord, connection);
    	    		
    	    		HealthTask healthTask = createHealthTask(i, healthRecord);
    	        	insertHealthTask(healthTask, connection);
    	        	
    	        	result.add(healthRecord.getRecordId());    	   
    	        	
    	        	//手工丟擲異常
    	        	//throw new SQLException("exception occur!");
                }        		
        		connection.commit();
        	} catch (final SQLException ex) {
        		connection.rollback();
                throw ex;
            }
	    	
			return connection;
	    });
		
		return result;
	}

可以看到,在執行插入操作之前,我們關閉了 Connection 的自動提交功能。在 SQL 執行完畢之後,手動通過 Connection commit 方法執行事務提交。一旦在 SQL 的執行過程中出現任何異常時,就呼叫 Connection 的 rollback 方法回滾事務。

這裡有必要介紹執行資料插入的具體實現過程,我們以 insertHealthRecord 方法為例進行展開

    private void insertHealthRecord(HealthRecord healthRecord, Connection connection) throws SQLException {
    	try (PreparedStatement preparedStatement = connection.prepareStatement(sql_health_record_insert, Statement.RETURN_GENERATED_KEYS)) {
    		preparedStatement.setLong(1, healthRecord.getUserId());
            preparedStatement.setLong(2, healthRecord.getLevelId() % 5 );
            preparedStatement.setString(3, "Remark" + healthRecord.getUserId());
            preparedStatement.executeUpdate(); 
            
            try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {
                if (resultSet.next()) {
                	healthRecord.setRecordId(resultSet.getLong(1));
                }
            }
    	}
    	
    }

首先通過 Connection 物件構建一個 PreparedStatement。請注意,由於我們需要通過 ShardingSphere 的主鍵自動生成機制,所以在建立 PreparedStatement 時需要進行特殊地設定:

connection.prepareStatement(sql_health_record_insert, Statement.RETURN_GENERATED_KEYS)

通過這種方式,在 PreparedStatement 完成 SQL 執行之後,我們就可以獲取自動生成的主鍵值:

try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) {

 if (resultSet.next()) {

 healthRecord.setRecordId(resultSet.getLong(1));

 }

}

當獲取這個主鍵值之後,就將這個主鍵值設定回 HealthRecord,這是使用自動生成主鍵的常見做法。

最後,我們在事務方法的入口處,需要設定 TransactionType:

	@Override
	public void processWithXA() throws SQLException {
		TransactionTypeHolder.set(TransactionType.XA);
		 
		insertHealthRecords();
	}

現在讓我們執行這個 processWithXA 方法,看看資料是否已經按照分庫的配置寫入到目標資料庫表中。下面是 ds0 中的 health_record 表和 health_task 表:

認為異常的時候,再次執行 processWithXA 方法,基於 connection 提供的 rollback 方法,我們發現已經執行的部分 SQL 並沒有提交到任何一個數據庫中。

使用 BASE 事務

相較於 XA 事務,在業務程式碼中整合 BASE 事務的過程就顯得相對複雜一點,因為我們需要藉助外部框架來做到這一點。這裡,我們將基於阿里巴巴提供的 Seata 框架來演示如何使用 BASE 事務。

開發環境準備

同樣,要想使用基於 Seata 的 BASE 事務,我們首先需要在 pom 檔案中新增對 sharding-jdbc-core 和 sharding-transaction-base-seata-at 這兩個依賴:

<dependency>

 <groupId>org.apache.shardingsphere</groupId>

 <artifactId>sharding-jdbc-core</artifactId>

</dependency>



<dependency>

 <groupId>org.apache.shardingsphere</groupId>

 <artifactId>sharding-transaction-base-seata-at</artifactId>

</dependency>

因為用到了 Seata 框架,所以也需要引入 Seate 框架的相關元件:

 <dependency>
 <groupId>io.seata</groupId>
 <artifactId>seata-rm-datasource</artifactId>
 </dependency>
 <dependency>
 <groupId>io.seata</groupId>
 <artifactId>seata-tm</artifactId>
 </dependency>
 <dependency>
 <groupId>io.seata</groupId>
 <artifactId>seata-codec-all</artifactId>
 </dependency>

然後,我們下載並啟動 Seata 伺服器,這個過程需要設定 Seata 伺服器 config 目錄下的 registry.conf,以便指定註冊中心,這裡使用 ZooKeeper 來充當註冊中心。關於如何啟動 Seata 伺服器的過程可以參考 Seata 的官方文件。請注意,按照 Seata 的執行要求,我們需要在每一個分片資料庫例項中建立一張 undo_log 表。然後,我們還需要在程式碼工程中 classpath 中增加一個 seata.conf 配置檔案:

client {
 application.id = health-base
 transaction.service.group = health-base-group
}

當然,這裡我們還是繼續沿用前面介紹的分庫配置。

實現 BASE 事務

基於 ShardingSphere 提供的分散式事務的抽象,我們從 XA 事務轉到 BASE 事務唯一要做的事情就是重新設定 TransactionType,也就是修改一行程式碼:

	@Override
	public void processWithBASE() throws SQLException {
		TransactionTypeHolder.set(TransactionType.BASE);
		 
		insertHealthRecords();
	}

下面這樣也是可以的

@Transactional
	@ShardingTransactionType(TransactionType.XA)  
	private List<Long> insertHealthRecords2() throws SQLException {
        List<Long> result = new ArrayList<>(10);       
        
        jdbcTemplate.execute((ConnectionCallback<Object>) connection-> {        	
        	for (Long i = 1L; i <= 10; i++) {
        		HealthRecord healthRecord = createHealthRecord(i);   		   		
	    		insertHealthRecord(healthRecord, connection);
	    		
	    		HealthTask healthTask = createHealthTask(i, healthRecord);
	        	insertHealthTask(healthTask, connection);
	        	
	        	result.add(healthRecord.getRecordId());    	        	
            }
	    	
			return connection;
	    });

        return result;
    }