1. 程式人生 > >Shading - jdbc 原始碼分析(四) - sql 路由

Shading - jdbc 原始碼分析(四) - sql 路由

上一篇文章我們分析了sharding-jdbc 解析select語句(sql 解析之 Select),今天我們分析下sql路由。

宣告:本文基於1.5.M1版本

時序圖:

執行邏輯

下面我們以上篇文章的Select語句分析:

SELECT o.order_id FROM order o WHERE o.order_id = 4

在分析之前首先看下分庫分表的配置:

 Map<String, DataSource> dataSourceMap = new HashMap<>();
        dataSourceMap.put("ds_0"
, null); dataSourceMap.put("ds_1", null); DataSourceRule dataSourceRule = new DataSourceRule(dataSourceMap); TableRule orderTableRule = TableRule.builder("order").actualTables(Lists.newArrayList("order_0", "order_1")).dataSourceRule(dataSourceRule).build(); TableRule orderItemTableRule = TableRule.builder("order_item"
).actualTables(Lists.newArrayList("order_item_0", "order_item_1")).dataSourceRule(dataSourceRule).build(); TableRule orderAttrTableRule = TableRule.builder("order_attr").actualTables(Lists.newArrayList("ds_0.order_attr_a", "ds_1.order_attr_b")).dataSourceRule(dataSourceRule) .tableShardingStrategy(new TableShardingStrategy("order_id"
, new OrderAttrShardingAlgorithm())).build(); shardingRule = ShardingRule.builder().dataSourceRule(dataSourceRule).tableRules(Lists.newArrayList(orderTableRule, orderItemTableRule, orderAttrTableRule)) .bindingTableRules(Collections.singletonList(new BindingTableRule(Arrays.asList(orderTableRule, orderItemTableRule)))) .databaseShardingStrategy(new DatabaseShardingStrategy("order_id", new OrderShardingAlgorithm())) .tableShardingStrategy(new TableShardingStrategy("order_id", new OrderShardingAlgorithm())).build(); 複製程式碼

order表分了2個庫,2個表,以order_id為分片鍵

1、StatementRoutingEngine 路由入口:

  • 構造方法:通過ShardingContext(分片上下文)建立SQLRouter
  • SQLRouter 解析SQL(內部呼叫上一片文章的SQLParsingEngine),並路由
public StatementRoutingEngine(final ShardingContext shardingContext) {
        sqlRouter = SQLRouterFactory.createSQLRouter(shardingContext);
    }
    
    /**
     * SQL路由.
     *
     * @param logicSQL 邏輯SQL
     * @return 路由結果
     */
    public SQLRouteResult route(final String logicSQL) {
        SQLStatement sqlStatement = sqlRouter.parse(logicSQL, 0);
        return sqlRouter.route(logicSQL, Collections.emptyList(), sqlStatement);
    }
複製程式碼

這裡判斷是否只分庫,若只分庫,則new UnparsingSQLRouter,不需要走SQL解析的邏輯(直接落到具體的庫,執行SQL即可),否則new ParsingSQLRouter

 /**
     * 建立SQL路由器.
     * 
     * @param shardingContext 資料來源執行期上下文
     * @return SQL路由器
     */
    public static SQLRouter createSQLRouter(final ShardingContext shardingContext) {
        return HintManagerHolder.isDatabaseShardingOnly() ? new UnparsingSQLRouter(shardingContext) : new ParsingSQLRouter(shardingContext);
    }
複製程式碼

2、ParsingSQLRouter#route:

public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
        final Context context = MetricsContext.start("Route SQL");
        SQLRouteResult result = new SQLRouteResult(sqlStatement);
        if (sqlStatement instanceof InsertStatement && null != ((InsertStatement) sqlStatement).getGeneratedKey()) {
        //insert 語句處理主鍵(有空分析分析)
            processGeneratedKey(parameters, (InsertStatement) sqlStatement, result);
        }
        //路由
        RoutingResult routingResult = route(parameters, sqlStatement);
       ...(後面是重寫的邏輯,先省略)
        MetricsContext.stop(context);
        logSQLRouteResult(result, parameters);
        return result;
    }
複製程式碼

2-1、根據sqlStatement解析的表物件獲取邏輯表

若單表,走SimpleRoutingEngine#route,否則走ComplexRoutingEngine#route

private RoutingResult route(final List<Object> parameters, final SQLStatement sqlStatement) {
        Collection<String> tableNames = sqlStatement.getTables().getTableNames();
        RoutingEngine routingEngine;
        if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames)) {
            routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);
        } else {
            // TODO 可配置是否執行笛卡爾積
            routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);
        }
        return routingEngine.route();
    }
複製程式碼

2-2、SimpleRoutingEngine#route

正常情況下都是單表,我們就以單表的情況分析

  1. 根據邏輯表名詞獲取我們配置的TableRule
  2. 根據TableRule獲取真實的db、table
  3. 組裝RoutingResult
public RoutingResult route() {
        TableRule tableRule = shardingRule.getTableRule(logicTableName);
        Collection<String> routedDataSources = routeDataSources(tableRule);
        Collection<String> routedTables = routeTables(tableRule, routedDataSources);
        return generateRoutingResult(tableRule, routedDataSources, routedTables);
    }
複製程式碼
  • getTableRule: 首先根據邏輯表查詢,找到了就直接返回,沒找到的話判斷我們的預設資料庫是不是存在,然後用預設資料庫幫我們建立一個無需分庫分表的TableRule
/**
     * 根據邏輯表名稱查詢分片規則.
     *
     * @param logicTableName 邏輯表名稱
     * @return 該邏輯表的分片規則
     */
    public TableRule getTableRule(final String logicTableName) {
        Optional<TableRule> tableRule = tryFindTableRule(logicTableName);
        if (tableRule.isPresent()) {
            return tableRule.get();
        }
        if (dataSourceRule.getDefaultDataSource().isPresent()) {
            return createTableRuleWithDefaultDataSource(logicTableName, dataSourceRule);
        }
        throw new ShardingJdbcException("Cannot find table rule and default data source with logic table: '%s'", logicTableName);
    }
複製程式碼
private TableRule createTableRuleWithDefaultDataSource(final String logicTableName, final DataSourceRule defaultDataSourceRule) {
        Map<String, DataSource> defaultDataSourceMap = new HashMap<>(1);
        defaultDataSourceMap.put(defaultDataSourceRule.getDefaultDataSourceName(), defaultDataSourceRule.getDefaultDataSource().get());
        return TableRule.builder(logicTableName)
                .dataSourceRule(new DataSourceRule(defaultDataSourceMap))
                .databaseShardingStrategy(new DatabaseShardingStrategy("", new NoneDatabaseShardingAlgorithm()))
                .tableShardingStrategy(new TableShardingStrategy("", new NoneTableShardingAlgorithm())).build();
    }
複製程式碼
  • routeDataSources:(註釋就寫在程式碼裡了)
private Collection<String> routeDataSources(final TableRule tableRule) {
        1、根據TableRule 獲取資料庫分片策略
        DatabaseShardingStrategy strategy = shardingRule.getDatabaseShardingStrategy(tableRule);
        2、判斷有沒有用強制路由,有的話直接用強制路由的value,沒有的話就用我們查詢條件裡面用到的分片value
        List<ShardingValue<?>> shardingValues = HintManagerHolder.isUseShardingHint() ? getDatabaseShardingValuesFromHint(strategy.getShardingColumns())
                : getShardingValues(strategy.getShardingColumns());
        logBeforeRoute("database", logicTableName, tableRule.getActualDatasourceNames(), strategy.getShardingColumns(), shardingValues);
        3、呼叫分片策略計算分片值
        Collection<String> result = strategy.doStaticSharding(sqlStatement.getType(), tableRule.getActualDatasourceNames(), shardingValues);
        logAfterRoute("database", logicTableName, result);
        Preconditions.checkState(!result.isEmpty(), "no database route info");
        return result;
    }
    
複製程式碼
  • routeTables(沒區別嘛)
private Collection<String> routeTables(final TableRule tableRule, final Collection<String> routedDataSources) {
        TableShardingStrategy strategy = shardingRule.getTableShardingStrategy(tableRule);
        List<ShardingValue<?>> shardingValues = HintManagerHolder.isUseShardingHint() ? getTableShardingValuesFromHint(strategy.getShardingColumns())
                : getShardingValues(strategy.getShardingColumns());
        logBeforeRoute("table", logicTableName, tableRule.getActualTables(), strategy.getShardingColumns(), shardingValues);
        Collection<String> result = tableRule.isDynamic() ? strategy.doDynamicSharding(shardingValues)
                : strategy.doStaticSharding(sqlStatement.getType(), tableRule.getActualTableNames(routedDataSources), shardingValues);
        logAfterRoute("table", logicTableName, result);
        Preconditions.checkState(!result.isEmpty(), "no table route info");
        return result;
    }
複製程式碼

強制路由的感覺可以單獨寫一篇文章說,所以就不分析了,以後寫,我們看不走強制路由的邏輯。

  • getShardingValues
private List<ShardingValue<?>> getShardingValues(final Collection<String> shardingColumns) {
        List<ShardingValue<?>> result = new ArrayList<>(shardingColumns.size());
        for (String each : shardingColumns) {
          //SQL解析的getConditions物件(上一篇文章簡單分析過),這裡查詢分片列是否存在,存在就轉換為ShardingValue
            Optional<Condition> condition = sqlStatement.getConditions().find(new Column(each, logicTableName));
            if (condition.isPresent()) {
                result.add(condition.get().getShardingValue(parameters));
            }
        }
        return result;
    }
複製程式碼
  • getShardingValue

operator:這個可以理解為條件物件的操作符號(=、in、between)

/**
     * 將條件物件轉換為分片值.
     *
     * @param parameters 引數列表
     * @return 分片值
     */
    public ShardingValue<?> getShardingValue(final List<Object> parameters) {
        List<Comparable<?>> conditionValues = getValues(parameters);
        switch (operator) {
            case EQUAL:
                return new ShardingValue<Comparable<?>>(column.getTableName(), column.getName(), conditionValues.get(0));
            case IN:
                return new ShardingValue<>(column.getTableName(), column.getName(), conditionValues);
            case BETWEEN:
                return new ShardingValue<>(column.getTableName(), column.getName(), Range.range(conditionValues.get(0), BoundType.CLOSED, conditionValues.get(1), BoundType.CLOSED));
            default:
                throw new UnsupportedOperationException(operator.getExpression());
        }
    }
複製程式碼
  • getValues

positionValueMap:存放條件值(分片Value),positionIndexMap:這段邏輯似乎沒太看懂。。

private List<Comparable<?>> getValues(final List<Object> parameters) {
        List<Comparable<?>> result = new LinkedList<>(positionValueMap.values());
        for (Entry<Integer, Integer> entry : positionIndexMap.entrySet()) {
            Object parameter = parameters.get(entry.getValue());
            if (!(parameter instanceof Comparable<?>)) {
                throw new ShardingJdbcException("Parameter `%s` should extends Comparable for sharding value.", parameter);
            }
            if (entry.getKey() < result.size()) {
                result.add(entry.getKey(), (Comparable<?>) parameter);
            } else {
                result.add((Comparable<?>) parameter);
            }
        }
        return result;
    }
複製程式碼
  • ShardingStrategy#doStaticSharding:
/**
     * 計算靜態分片.
     *
     * @param sqlType SQL語句的型別
     * @param availableTargetNames 所有的可用分片資源集合
     * @param shardingValues 分片值集合
     * @return 分庫後指向的資料來源名稱集合
     */
    public Collection<String> doStaticSharding(final SQLType sqlType, final Collection<String> availableTargetNames, final Collection<ShardingValue<?>> shardingValues) {
        Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
        if (shardingValues.isEmpty()) {
            Preconditions.checkState(!isInsertMultiple(sqlType, availableTargetNames), "INSERT statement should contain sharding value.");
            result.addAll(availableTargetNames);
        } else {
            result.addAll(doSharding(shardingValues, availableTargetNames));
        }
        return result;
    }
複製程式碼

一般我們的表實現SingleKeyShardingAlgorithm類,定義我們自己的分片邏輯,返回計算出來的結果值

private Collection<String> doSharding(final Collection<ShardingValue<?>> shardingValues, final Collection<String> availableTargetNames) {
        if (shardingAlgorithm instanceof NoneKeyShardingAlgorithm) {
            return Collections.singletonList(((NoneKeyShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues.iterator().next()));
        }
        if (shardingAlgorithm instanceof SingleKeyShardingAlgorithm) {
            SingleKeyShardingAlgorithm<?> singleKeyShardingAlgorithm = (SingleKeyShardingAlgorithm<?>) shardingAlgorithm;
            ShardingValue shardingValue = shardingValues.iterator().next();
            switch (shardingValue.getType()) {
                case SINGLE:
                    return Collections.singletonList(singleKeyShardingAlgorithm.doEqualSharding(availableTargetNames, shardingValue));
                case LIST:
                    return singleKeyShardingAlgorithm.doInSharding(availableTargetNames, shardingValue);
                case RANGE:
                    return singleKeyShardingAlgorithm.doBetweenSharding(availableTargetNames, shardingValue);
                default:
                    throw new UnsupportedOperationException(shardingValue.getType().getClass().getName());
            }
        }
        if (shardingAlgorithm instanceof MultipleKeysShardingAlgorithm) {
            return ((MultipleKeysShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues);
        }
        throw new UnsupportedOperationException(shardingAlgorithm.getClass().getName());
    }
複製程式碼
  • 最後

過濾獲取真實的DataNode,組裝RoutingResult

private RoutingResult generateRoutingResult(final TableRule tableRule, final Collection<String> routedDataSources, final Collection<String> routedTables) {
        RoutingResult result = new RoutingResult();
        for (DataNode each : tableRule.getActualDataNodes(routedDataSources, routedTables)) {
            result.getTableUnits().getTableUnits().add(new TableUnit(each.getDataSourceName(), logicTableName, each.getTableName()));
        }
        return result;
    }
複製程式碼

最後:

小尾巴走一波,歡迎關注我的公眾號,不定期分享程式設計、投資、生活方面的感悟:)