1. 程式人生 > >理解Sharding jdbc原理,看這一篇就夠了

理解Sharding jdbc原理,看這一篇就夠了

相比於Spring基於AbstractRoutingDataSource實現的分庫分表功能,Sharding jdbc在單庫單表擴充套件到多庫多表時,相容性方面表現的更好一點。例如,spring實現的分庫分表sql寫法如下:

select id, name, price, publish, intro 
from book${tableIndex}
where id = #{id,jdbcType=INTEGER}

sql中的表名book需要加一個分表的字尾tableIndex,也就是需要在sql注入的引數中指定插入哪個表。相比,Sharding jdbc在這一塊封裝的更好一點。其sql中,根本不需要指定tableIndex,而是根據分庫分表策略自動路由。

select id, name, price, publish, intro
from book 
where id = #{id,jdbcType=INTEGER}

Sharding jdbc的這種特性,在水平擴充套件的時候無疑更具有吸引力。試想一下,一個專案開發一段時間後,單庫單表資料量急劇上升,需要分庫分表解決資料庫的訪問壓力。而現有sql配置都是基於單庫單表實現的,如果基於spring的AbstractRoutingDataSource實現,需要修改每一個相關表的sql,修改涉及較多地方,出錯概率較大。而基於Sharding jdbc實現時,sql無需修改,只需要在spring中新增Sharding jdbc的相關配置即可,減少了修改面,大大簡化分庫分表的實現難度。

那麼,Sharding jdbc是如何實現這種分庫分表的邏輯呢?下面我們用一段簡單、易懂的程式碼描述Sharding jdbc的原理。

通常我們在寫一段訪問資料庫的資料時,邏輯是這樣的:

    ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("application.xml");
    DataSource dataSource = ctx.getBean("dataSource", DataSource.class);
    Connection connection = dataSource.getConnection();
		
    String sql = "select id, name, price, publish, intro from book where id = 111";
    PreparedStatement ps = connection.prepareStatement(sql);
    ResultSet rs = ps.executeQuery();
    // handle ResultSet...

Sharding jdbc是基於JDBC協議實現的,當我們獲得dataSource時,這個dataSource是Sharding jdbc自己定義的一個SpringShardingDataSource型別的資料來源,該資料來源在返回getConnection()及prepareStatement()時,分別返回ShardingConnection和ShardingPreparedStatement的例項物件。然後在executeQuery()時,ShardingPreparedStatement做了這樣的一件事:

  1. 根據邏輯sql,經過分庫分表策略邏輯計算,獲得分庫分表的路由結果SQLRouteResult;
  2. SQLRouteResult中包含真實的資料來源以及轉換後的真正sql,利用真實的資料來源去執行獲得ResultSet;
  3. 將ResultSet列表封裝成一個可以順序讀的ResultSet物件IteratorReducerResultSet
class ShardingPreparedStatement implements PreparedStatement {

	@Override
	public ResultSet executeQuery() throws SQLException {
		List<SQLRouteResult> routeResults = routeSql(logicSql);
		
		List<ResultSet> resultSets = new ArrayList<>(routeResults.size());
		for (SQLRouteResult routeResult : routeResults) {
			PreparedStatement ps = routeResult.getDataSource().getConnection.prepareStatement(routeResult.getParsedSql());
			ResultSet rs = ps.executeQuery();
			resultSets.add(rs);
		}
		
		return new IteratorReducerResultSet(resultSets);
	}
        .....

}

其中,分庫分表策略的sql路由過程,我們將Sharding jdbc中的相關程式碼全部抽出來,放到一起來觀看這個過程的實現:

    // 環境準備
    @SuppressWarnings("resource")
    ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("application.xml");
    SpringShardingDataSource dataSource = ctx.getBean(SpringShardingDataSource.class);
    Field field = SpringShardingDataSource.class.getSuperclass().getDeclaredField("shardingContext");
    field.setAccessible(true);
    ShardingContext sctx = (ShardingContext)field.get(dataSource);
    ShardingRule shardingRule = sctx.getShardingRule();
		
    String logicSql = "select id, name, price, publish, intro from book where id = ?";
    List<Object> parameters = new ArrayList<>();
    parameters.add(2000);
        
    // sql解析
    MySqlStatementParser parser = new MySqlStatementParser(logicSql);
    MySQLSelectVisitor visitor = new MySQLSelectVisitor();
    SQLStatement statement = parser.parseStatement();
    visitor.getParseContext().setShardingRule(shardingRule);
    statement.accept(visitor);
		
    SQLParsedResult parsedResult = visitor.getParseContext().getParsedResult();
    if (visitor.getParseContext().isHasOrCondition()) {
        new OrParser(statement, visitor).fillConditionContext(parsedResult);
    } 
    visitor.getParseContext().mergeCurrentConditionContext();
    System.out.println("Parsed SQL result: " + parsedResult);
    System.out.println("Parsed SQL: " + visitor.getSQLBuilder());
    parsedResult.getRouteContext().setSqlBuilder(visitor.getSQLBuilder());
    parsedResult.getRouteContext().setSqlStatementType(SQLStatementType.SELECT);
        
    // 分庫分表路由
    SQLRouteResult result = new SQLRouteResult(parsedResult.getRouteContext().getSqlStatementType(), parsedResult.getMergeContext(), parsedResult.getGeneratedKeyContext());
    for (ConditionContext each : parsedResult.getConditionContexts()) {
        Collection<Table> tables = parsedResult.getRouteContext().getTables();
        final Set<String> logicTables = new HashSet<>();
        tables.forEach(a -> logicTables.add(a.getName()));
        	
        SingleTableRouter router = new SingleTableRouter(shardingRule, 
            logicTables.iterator().next(), 
            each, 
            parsedResult.getRouteContext().getSqlStatementType());
        	
        RoutingResult routingResult = router.route();
            
        // sql改寫 --> routingResult.getSQLExecutionUnits() 
        // 		---> SingleRoutingTableFactor.replaceSQL(sqlBuilder).buildSQL()
        // 結果合併
        result.getExecutionUnits().addAll(routingResult.getSQLExecutionUnits(parsedResult.getRouteContext().getSqlBuilder()));
    }
//        amendSQLAccordingToRouteResult(parsedResult, parameters, result);
    for (SQLExecutionUnit each : result.getExecutionUnits()) {
        System.out.println(each.getDataSource() + " " + each.getSql() + " " + parameters);
    }
  1. 準備環境。由於Sharding jdbc分庫分表中ShardingRule這個類是貫穿整個路由過程,我們在Spring中寫好Sharding jdbc的配置,利用反射獲取一個這個物件。(Sharding jdbc版本以及配置,在文章最後列出,方便debug這個過程)
  2. sql解析。Sharding jdbc使用阿里的Druid庫解析sql。在這個過程中,Sharding jdbc實現了一個自己的sql解析內容快取容器SqlBuilder。當語法分析中解析到一個表名的時候,在SqlBuilder中快取一個sql相關的邏輯表名的token。並且,Sharding jdbc會將sql按照語義解析為多個segment。例如,"select id, name, price, publish, intro from book where id = ?"將解析為,"select id, name, price, publish, intro | from | book | where | id = ?"。
  3. 分庫分表路由。根據ShardingRule中指定的分庫分表列的引數值,以及分庫分表策略,實行分庫分表,得到一個RoutingResult 。RoutingResult 中包含一個真實資料來源,以及邏輯表名和實際表名。
  4. sql改寫。在SqlBuilder中,查詢sql中解析的segment,將和邏輯表名一致的segment替換成實際表名。(segment中可以標註該地方是不是表名)

以上程式碼執行結果如下:

Parsed SQL result: SQLParsedResult(routeContext=RouteContext(tables=[Table(name=book, alias=Optional.absent())], sqlStatementType=null, sqlBuilder=null), generatedKeyContext=GeneratedKeyContext(columns=[], columnNameToIndexMap={}, valueTable={}, rowIndex=0, columnIndex=0, autoGeneratedKeys=0, columnIndexes=null, columnNames=null), conditionContexts=[ConditionContext(conditions={})], mergeContext=MergeContext(orderByColumns=[], groupByColumns=[], aggregationColumns=[], limit=null))
Parsed SQL: SELECT id, name, price, publish, intro FROM [Token(book)] WHERE id = ?
dataSource1 SELECT id, name, price, publish, intro FROM book_00 WHERE id = ? [2000]
dataSource2 SELECT id, name, price, publish, intro FROM book_02 WHERE id = ? [2000]
dataSource1 SELECT id, name, price, publish, intro FROM book_02 WHERE id = ? [2000]
dataSource2 SELECT id, name, price, publish, intro FROM book_01 WHERE id = ? [2000]
dataSource0 SELECT id, name, price, publish, intro FROM book_00 WHERE id = ? [2000]
dataSource0 SELECT id, name, price, publish, intro FROM book_01 WHERE id = ? [2000]
dataSource2 SELECT id, name, price, publish, intro FROM book_00 WHERE id = ? [2000]
dataSource1 SELECT id, name, price, publish, intro FROM book_01 WHERE id = ? [2000]
dataSource0 SELECT id, name, price, publish, intro FROM book_02 WHERE id = ? [2000]

實際上,我們可以用更通俗易懂的程式碼表示sql改寫的這個過程:

    String logicSql = "select id, name, price, publish, intro from book where id = 111";
    MySqlStatementParser parser = new MySqlStatementParser(logicSql);
    SQLStatement statement = parser.parseStatement();
    MySQLSimpleVisitor visitor = new MySQLSimpleVisitor();
    statement.accept(visitor);
		
    String logicTable = "book";
    String realTable = "book_00";
    String token = "\\$\\{" + logicTable + "\\}";
		
    String sqlBuilder = visitor.getAppender().toString();
    String sql = sqlBuilder.replaceAll(token, realTable);
		
    System.out.println(sqlBuilder);
    System.out.println(sql);

MySQLSimpleVisitor程式碼如下:

public class MySQLSimpleVisitor extends MySqlOutputVisitor {

    public MySQLSimpleVisitor() {
        super(new StringBuilder());
    }

    @Override
    public boolean visit(SQLExprTableSource x) {
        StringBuilder sb = new StringBuilder();
        sb.append("${");
        sb.append(x.getExpr().toString()).append('}');
        print(sb.toString());
		
        if (x.getAlias() != null) {
            print(' ');
            print(x.getAlias());
        }

        for (int i = 0; i < x.getHintsSize(); ++i) {
            print(' ');
            x.getHints().get(i).accept(this);
        }

        return false;
    }
	
}

結果如下:

SELECT id, name, price, publish, intro
FROM ${book}
WHERE id = 111
SELECT id, name, price, publish, intro
FROM book_00
WHERE id = 111

以上,大致將Sharding jdbc的原理及實現過程介紹了一下,如果想要了解正真的實現過程和細節,還需要對照程式碼仔細推敲。

本文的實現環境:

        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>sharding-jdbc-core</artifactId>
            <version>1.4.2</version>
        </dependency>
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>sharding-jdbc-config-spring</artifactId>
            <version>1.4.0</version>
        </dependency>

application.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:rdb="http://www.dangdang.com/schema/ddframe/rdb"
	xsi:schemaLocation="
     http://www.springframework.org/schema/beans 
     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
     http://www.springframework.org/schema/context 
     http://www.springframework.org/schema/context/spring-context-4.0.xsd 
     http://www.springframework.org/schema/tx 
     http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
     http://www.dangdang.com/schema/ddframe/rdb 
     http://www.dangdang.com/schema/ddframe/rdb/rdb.xsd">

	<context:property-placeholder location="classpath:jdbc.properties" ignore-unresolvable="true" />

	<bean id="dataSource0" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
		<property name="driverClassName" value="com.mysql.jdbc.Driver" />
		<property name="url" value="${jdbc.mysql.url0}" />
		<property name="username" value="${jdbc.mysql.username0}" />
		<property name="password" value="${jdbc.mysql.password0}" />
	</bean>

	<bean id="dataSource1" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
		<property name="driverClassName" value="${driver}" />
		<property name="url" value="${jdbc.mysql.url1}" />
		<property name="username" value="${jdbc.mysql.username1}" />
		<property name="password" value="${jdbc.mysql.password1}" />
	</bean>

	<bean id="dataSource2" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
		<property name="driverClassName" value="${driver}" />
		<property name="url" value="${jdbc.mysql.url2}" />
		<property name="username" value="${jdbc.mysql.username2}" />
		<property name="password" value="${jdbc.mysql.password2}" />
	</bean>
	
	<!-- sharding jdbc -->
	<rdb:strategy id="tableShardingStrategy" sharding-columns="id" 
		algorithm-class="com.wy.sharding.MemberSingleKeyTableShardingAlgorithm" />
	
	<rdb:data-source id="shardingDataSource">
        <rdb:sharding-rule data-sources="dataSource0,dataSource1,dataSource2">
            <rdb:table-rules>
                <rdb:table-rule logic-table="book" 
                	actual-tables="book_0${0..2}"  
                	table-strategy="tableShardingStrategy"/>
            </rdb:table-rules>
        </rdb:sharding-rule>
    </rdb:data-source>
</beans>

MemberSingleKeyTableShardingAlgorithm.java

public class MemberSingleKeyTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<Integer> {

	public String doEqualSharding(Collection<String> availableTargetNames, ShardingValue<Integer> shardingValue) {
		String routeDBSuffix = getRouteDBSuffix(shardingValue.getValue());
        for (String each : availableTargetNames) {
            if (each.endsWith(routeDBSuffix)) {
                return each;
            }
        }
        throw new IllegalArgumentException();
	}

	public Collection<String> doInSharding(Collection<String> availableTargetNames, ShardingValue<Integer> shardingValue) {
		Collection<String> result = new LinkedHashSet<String>(availableTargetNames.size());
        for (int value : shardingValue.getValues()) {
        	String routeDBSuffix = getRouteDBSuffix(value);
            for (String tableName : availableTargetNames) {
                if (tableName.endsWith(routeDBSuffix)) {
                    result.add(tableName);
                }
            }
        }
        return result;
	}

	public Collection<String> doBetweenSharding(Collection<String> availableTargetNames,
			ShardingValue<Integer> shardingValue) {
		Collection<String> result = new LinkedHashSet<String>(availableTargetNames.size());
		Range<Integer> range = (Range<Integer>) shardingValue.getValueRange();
		for (int i = range.lowerEndpoint(); i <= range.upperEndpoint(); i++) {
			String routeDBSuffix = getRouteDBSuffix(i);
			for (String each : availableTargetNames) {
				if (each.endsWith(routeDBSuffix)) {
					result.add(each);
				}
			}
		}
		return result;
	}
	
	public String getRouteDBSuffix(Integer shardingCode) {
		int modValue = shardingCode % 3;
		return "0" + modValue;
	}

}