1. 程式人生 > 其它 >資料庫分庫分表中介軟體 Sharding-JDBC 原始碼分析 —— SQL 改寫

資料庫分庫分表中介軟體 Sharding-JDBC 原始碼分析 —— SQL 改寫

本文主要基於 Sharding-JDBC 1.5.0 正式版

  • 1. 概述
  • 2. SQLToken
  • 3.SQL 改寫
    • 3.4.1 分頁補充
    • 3.1 TableToken
    • 3.2 ItemsToken
    • 3.3 OffsetToken
    • 3.4 RowCountToken
    • 3.5 OrderByToken
    • 3.6 GeneratedKeyToken
  • 4. SQL 生成

1. 概述

前置閱讀:《SQL 解析(三)之查詢SQL》

本文分享SQL 改寫的原始碼實現。主要涉及兩方面:

  1. SQL 改寫:改寫 SQL,解決分庫分表後,查詢結果需要聚合,需要對 SQL 進行調整,例如分頁
  2. SQL 生成:生成分表分庫的執行 SQL

SQLRewriteEngine,SQL重寫引擎,實現 SQL 改寫、生成功能。從 Sharding-JDBC 1.5.0 版本,SQL 改寫進行了調整和大量優化。

1.4.x及之前版本,SQL改寫是在SQL路由之前完成的,在1.5.x中調整為SQL路由之後,因為SQL改寫可以根據路由至單庫表還是多庫表而進行進一步優化。

? 很多同學看完《SQL 解析-系列》 可能是一臉懵逼,特別對“SQL 半理解”

希望本文能給你一些啟發。

2. SQLToken

? SQLToken 在本文中很重要,所以即使在《SQL 解析-系列》已經分享過,我們也換個姿勢,再來一次。

SQLToken,SQL標記物件介面

。SQLRewriteEngine 基於 SQLToken 實現 SQL改寫。SQL解析器在 SQL解析過程中,很重要的一個目的是標記需要SQL改寫的部分,也就是 SQLToken。

各 SQLToken 生成條件如下(悲傷,做成表格形式排版是亂的):

  1. GeneratedKeyToken 自增主鍵標記物件
    • 插入SQL自增列不存在: INSERT INTO t_order(nickname)VALUES... 中沒有自增列 order_id
  2. TableToken 表標記物件
    • 查詢列的表別名: SELECT o.order_ido
    • 查詢的表名: SELECT*FROM t_ordert_order
  3. ItemsToken 選擇項標記物件
    • AVG查詢列: SELECT AVG(price)FROM t_orderAVG(price)
    • ORDER BY 欄位不在查詢列: SELECT order_id FROM t_order ORDER BY create_timecreate_time
    • GROUP BY 欄位不在查詢列: SELECT COUNT(order_id)FROM t_order GROUP BY user_iduser_id
    • 自增主鍵未在插入列中: INSERT INTO t_order(nickname)VALUES... 中沒有自增列 order_id
  4. OffsetToken 分頁偏移量標記物件
    • 分頁有偏移量,但不是佔位符 ?
  5. RowCountToken 分頁長度標記物件
    • 分頁有長度,但不是佔位符 ?
  6. OrderByToken 排序標記物件
    • 有 GROUP BY 條件,無 ORDER BY 條件: SELECT COUNT(*)FROM t_order GROUP BY order_idorder_id

3.SQL 改寫

SQLRewriteEngine#rewrite() 實現了 SQL改寫 功能。

// SQLRewriteEngine.java
/**
* SQL改寫.
* @param isRewriteLimit 是否重寫Limit
* @return SQL構建器
*/
public SQLBuilder rewrite(final boolean isRewriteLimit) {
   SQLBuilder result = new SQLBuilder();
   if (sqlTokens.isEmpty()) {
       result.appendLiterals(originalSQL);
       return result;
   }
   int count = 0;
   // 排序SQLToken,按照 beginPosition 遞增
   sortByBeginPosition();
   for (SQLToken each : sqlTokens) {
       if (0 == count) { // 拼接第一個 SQLToken 前的字串
           result.appendLiterals(originalSQL.substring(0, each.getBeginPosition()));
       }
       // 拼接每個SQLToken
       if (each instanceof TableToken) {
           appendTableToken(result, (TableToken) each, count, sqlTokens);
       } else if (each instanceof ItemsToken) {
           appendItemsToken(result, (ItemsToken) each, count, sqlTokens);
       } else if (each instanceof RowCountToken) {
           appendLimitRowCount(result, (RowCountToken) each, count, sqlTokens, isRewriteLimit);
       } else if (each instanceof OffsetToken) {
           appendLimitOffsetToken(result, (OffsetToken) each, count, sqlTokens, isRewriteLimit);
       } else if (each instanceof OrderByToken) {
           appendOrderByToken(result);
       }
       count++;
   }
   return result;
}
  • SQL改寫以 SQLToken 為間隔順序改寫。
    • 順序:呼叫 #sortByBeginPosition() 將 SQLToken 按照 beginPosition 升序
    • 間隔:遍歷 SQLToken,逐個拼接。

例如:


SQLBuilder,SQL構建器。下文會大量用到,我們看下實現程式碼。

public final class SQLBuilder {

    /**
     * 段集合
     */
    private final List<Object> segments;
    /**
     * 當前段
     */
    private StringBuilder currentSegment;

    public SQLBuilder() {
        segments = new LinkedList<>();
        currentSegment = new StringBuilder();
        segments.add(currentSegment);
    }

    /**
     * 追加字面量.
     *
     * @param literals 字面量
     */
    public void appendLiterals(final String literals) {
        currentSegment.append(literals);
    }

    /**
     * 追加表佔位符.
     *
     * @param tableName 表名稱
     */
    public void appendTable(final String tableName) {
        // 新增 TableToken
        segments.add(new TableToken(tableName));
        // 新建當前段
        currentSegment = new StringBuilder();
        segments.add(currentSegment);
    }

    public String toSQL(final Map<String, String> tableTokens) {
        // ... 省略程式碼,【SQL生成】處分享
    }

    @RequiredArgsConstructor
    private class TableToken {
        /**
         * 表名
         */
        private final String tableName;
    }
}

現在我們來逐個分析每種 SQLToken 的拼接實現。

3.1 TableToken

呼叫 #appendTableToken() 方法拼接。

// SQLRewriteEngine.java
/**
* 拼接 TableToken
*
* @param sqlBuilder SQL構建器
* @param tableToken tableToken
* @param count tableToken 在 sqlTokens 的順序
* @param sqlTokens sqlTokens
*/
private void appendTableToken(final SQLBuilder sqlBuilder, final TableToken tableToken, final int count, final List<SQLToken> sqlTokens) {
   // 拼接 TableToken
   String tableName = sqlStatement.getTables().getTableNames().contains(tableToken.getTableName()) ? tableToken.getTableName() : tableToken.getOriginalLiterals();
   sqlBuilder.appendTable(tableName);
   // 拼接 SQLToken 後面的字串
   int beginPosition = tableToken.getBeginPosition() + tableToken.getOriginalLiterals().length();
   int endPosition = sqlTokens.size() - 1 == count ? originalSQL.length() : sqlTokens.get(count + 1).getBeginPosition();
   sqlBuilder.appendLiterals(originalSQL.substring(beginPosition, endPosition));
}
  • 呼叫 SQLBuilder#appendTable() 拼接 TableToken。
  • sqlStatement.getTables().getTableNames().contains(tableToken.getTableName()) 目的是處理掉表名前後有的特殊字元,例如 SELECT*FROM't_order't_order 前後有 ' 符號。
// TableToken.java
/**
* 獲取表名稱.
*/
public String getTableName() {
   return SQLUtil.getExactlyValue(originalLiterals);
}

// SQLUtil.java
public static String getExactlyValue(final String value) {
   return null == value ? null : CharMatcher.anyOf("[]`'"").removeFrom(value);
}
  • 當 SQL 為 SELECT o.*FROM t_order o
    • TableToken 為查詢列前的表別名 o 時返回結果:
    • TableToken 為表名 t_order 時返回結果:

3.2 ItemsToken

呼叫 #appendItemsToken() 方法拼接。

// SQLRewriteEngine.java
/**
* 拼接 TableToken
*
* @param sqlBuilder SQL構建器
* @param itemsToken itemsToken
* @param count itemsToken 在 sqlTokens 的順序
* @param sqlTokens sqlTokens
*/
private void appendItemsToken(final SQLBuilder sqlBuilder, final ItemsToken itemsToken, final int count, final List<SQLToken> sqlTokens) {
   // 拼接 ItemsToken
   for (String item : itemsToken.getItems()) {
       sqlBuilder.appendLiterals(", ");
       sqlBuilder.appendLiterals(item);
   }
   // SQLToken 後面的字串
   int beginPosition = itemsToken.getBeginPosition();
   int endPosition = sqlTokens.size() - 1 == count ? originalSQL.length() : sqlTokens.get(count + 1).getBeginPosition();
   sqlBuilder.appendLiterals(originalSQL.substring(beginPosition, endPosition));
}
  • 第一種情況,AVG查詢列,SQL 為 SELECT AVG(order_id)FROM t_order o 時返回結果:
  • 第二種情況,ORDER BY 欄位不在查詢列,SQL 為 SELECT userId FROM t_order o ORDER BY order_id 時返回結果:
  • 第三種情況,GROUP BY 欄位不在查詢列,類似第二種情況,就不舉例子列。

3.3 OffsetToken

呼叫 #appendLimitOffsetToken() 方法拼接。

// SQLRewriteEngine.java
/**
* 拼接 OffsetToken
*
* @param sqlBuilder SQL構建器
* @param offsetToken offsetToken
* @param count offsetToken 在 sqlTokens 的順序
* @param sqlTokens sqlTokens
* @param isRewrite 是否重寫。當路由結果為單分片時無需重寫
*/
private void appendLimitOffsetToken(final SQLBuilder sqlBuilder, final OffsetToken offsetToken, final int count, final List<SQLToken> sqlTokens, final boolean isRewrite) {
   // 拼接 OffsetToken
   sqlBuilder.appendLiterals(isRewrite ? "0" : String.valueOf(offsetToken.getOffset()));
   // SQLToken 後面的字串
   int beginPosition = offsetToken.getBeginPosition() + String.valueOf(offsetToken.getOffset()).length();
   int endPosition = sqlTokens.size() - 1 == count ? originalSQL.length() : sqlTokens.get(count + 1).getBeginPosition();
   sqlBuilder.appendLiterals(originalSQL.substring(beginPosition, endPosition));
}
  • 當分頁跨分片時,需要每個分片都查詢後在記憶體中進行聚合。此時 isRewrite=true。為什麼是 "0" 開始呢?每個分片在 [0, offset) 的記錄可能屬於實際分頁結果,因而查詢每個分片需要從 0 開始。
  • 當分頁單分片時,則無需重寫,該分片執行的結果即是最終結果。SQL改寫在SQL路由之後就有這個好處。如果先改寫,因為沒辦法知道最終是單分片還是跨分片,考慮正確性,只能統一使用跨分片。

3.4 RowCountToken

呼叫 #appendLimitRowCount() 方法拼接。

// SQLRewriteEngine.java
private void appendLimitRowCount(final SQLBuilder sqlBuilder, final RowCountToken rowCountToken, final int count, final List<SQLToken> sqlTokens, final boolean isRewrite) {
   SelectStatement selectStatement = (SelectStatement) sqlStatement;
   Limit limit = selectStatement.getLimit();
   if (!isRewrite) { // 路由結果為單分片
       sqlBuilder.appendLiterals(String.valueOf(rowCountToken.getRowCount()));
   } else if ((!selectStatement.getGroupByItems().isEmpty() || // [1.1] 跨分片分組需要在記憶體計算,可能需要全部載入
           !selectStatement.getAggregationSelectItems().isEmpty()) // [1.2] 跨分片聚合列需要在記憶體計算,可能需要全部載入
           && !selectStatement.isSameGroupByAndOrderByItems()) { // [2] 如果排序一致,即各分片已經排序好結果,就不需要全部載入
       sqlBuilder.appendLiterals(String.valueOf(Integer.MAX_VALUE));
   } else { // 路由結果為多分片
       sqlBuilder.appendLiterals(String.valueOf(limit.isRowCountRewriteFlag() ? rowCountToken.getRowCount() + limit.getOffsetValue() : rowCountToken.getRowCount()));
   }
   // SQLToken 後面的字串
   int beginPosition = rowCountToken.getBeginPosition() + String.valueOf(rowCountToken.getRowCount()).length();
   int endPosition = sqlTokens.size() - 1 == count ? originalSQL.length() : sqlTokens.get(count + 1).getBeginPosition();
   sqlBuilder.appendLiterals(originalSQL.substring(beginPosition, endPosition));
}
  • [1.1] !selectStatement.getGroupByItems().isEmpty() 跨分片分組需要在記憶體計算,可能需要全部載入。如果不全部載入,部分結果被分頁條件錯誤結果,會導致結果不正確。
  • [1.2] !selectStatement.getAggregationSelectItems().isEmpty()) 跨分片聚合列需要在記憶體計算,可能需要全部載入。如果不全部載入,部分結果被分頁條件錯誤結果,會導致結果不正確。
  • [1.1][1.2],可能變成必須的前提是 GROUP BY 和 ORDER BY 排序不一致。如果一致,各分片已經排序完成,無需記憶體中排序。

3.4.1 分頁補充

OffsetToken、RowCountToken 只有在分頁對應位置非佔位符 ? 才存在。當對應位置是佔位符時,會對分頁條件對應的預編譯 SQL 佔位符引數進行重寫,整體邏輯和 OffsetToken、RowCountToken 是一致的

// ? ParsingSQLRouter#route() 呼叫 #processLimit() 

// ParsingSQLRouter.java
/**
* 處理分頁條件
*
* @see SQLRewriteEngine#appendLimitRowCount(SQLBuilder, RowCountToken, int, List, boolean) 
* @param parameters 佔位符對應引數列表
* @param selectStatement Select SQL語句物件
* @param isSingleRouting 是否單表路由
*/
private void processLimit(final List<Object> parameters, final SelectStatement selectStatement, final boolean isSingleRouting) {
   boolean isNeedFetchAll = (!selectStatement.getGroupByItems().isEmpty() // // [1.1] 跨分片分組需要在記憶體計算,可能需要全部載入
                               || !selectStatement.getAggregationSelectItems().isEmpty()) // [1.2] 跨分片聚合列需要在記憶體計算,可能需要全部載入
                           && !selectStatement.isSameGroupByAndOrderByItems(); // [2] 如果排序一致,即各分片已經排序好結果,就不需要全部載入
   selectStatement.getLimit().processParameters(parameters, !isSingleRouting, isNeedFetchAll);
}

// Limit.java
/**
* 填充改寫分頁引數.
* @param parameters 引數
* @param isRewrite 是否重寫引數
* @param isFetchAll 是否獲取所有資料
*/
public void processParameters(final List<Object> parameters, final boolean isRewrite, final boolean isFetchAll) {
   fill(parameters);
   if (isRewrite) {
       rewrite(parameters, isFetchAll);
   }
}
/**
* 將佔位符引數裡是分頁的引數賦值給 offset 、rowCount
* 賦值的前提條件是 offset、rowCount 是 佔位符
* @param parameters 佔位符引數
*/
private void fill(final List<Object> parameters) {
   int offset = 0;
   if (null != this.offset) {
       offset = -1 == this.offset.getIndex() ? getOffsetValue() : NumberUtil.roundHalfUp(parameters.get(this.offset.getIndex()));
       this.offset.setValue(offset);
   }
   int rowCount = 0;
   if (null != this.rowCount) {
       rowCount = -1 == this.rowCount.getIndex() ? getRowCountValue() : NumberUtil.roundHalfUp(parameters.get(this.rowCount.getIndex()));
       this.rowCount.setValue(rowCount);
   }
   if (offset < 0 || rowCount < 0) {
       throw new SQLParsingException("LIMIT offset and row count can not be a negative value.");
   }
}
/**
* 重寫分頁條件對應的引數
* @param parameters 引數
* @param isFetchAll 是否拉取所有
*/
private void rewrite(final List<Object> parameters, final boolean isFetchAll) {
   int rewriteOffset = 0;
   int rewriteRowCount;
   // 重寫
   if (isFetchAll) {
       rewriteRowCount = Integer.MAX_VALUE;
   } else if (rowCountRewriteFlag) {
       rewriteRowCount = null == rowCount ? -1 : getOffsetValue() + rowCount.getValue();
   } else {
       rewriteRowCount = rowCount.getValue();
   }
   // 引數設定
   if (null != offset && offset.getIndex() > -1) {
       parameters.set(offset.getIndex(), rewriteOffset);
   }
   if (null != rowCount && rowCount.getIndex() > -1) {
       parameters.set(rowCount.getIndex(), rewriteRowCount);
   }
}

3.5 OrderByToken

呼叫 #appendOrderByToken() 方法拼接。資料庫裡,當無 ORDER BY條件 而有 GROUP BY 條件時候,會使用 GROUP BY條件將結果升序排序:

  • SELECT order_id FROM t_order GROUP BY order_id 等價於 SELECT order_id FROM t_order GROUP BY order_id ORDER BY order_id ASC
  • SELECT order_id FROM t_order GROUP BY order_id DESC 等價於 SELECT order_id FROM t_order GROUP BY order_id ORDER BY order_id DESC
// ParsingSQLRouter.java
/**
* 拼接 OrderByToken
*
* @param sqlBuilder SQL構建器
*/
private void appendOrderByToken(final SQLBuilder sqlBuilder) {
   SelectStatement selectStatement = (SelectStatement) sqlStatement;
   // 拼接 OrderByToken
   StringBuilder orderByLiterals = new StringBuilder(" ORDER BY ");
   int i = 0;
   for (OrderItem each : selectStatement.getOrderByItems()) {
       if (0 == i) {
           orderByLiterals.append(each.getColumnLabel()).append(" ").append(each.getType().name());
       } else {
           orderByLiterals.append(",").append(each.getColumnLabel()).append(" ").append(each.getType().name());
       }
       i++;
   }
   orderByLiterals.append(" ");
   sqlBuilder.appendLiterals(orderByLiterals.toString());
}
  • 當 SQL 為 SELECT order_id FROM t_order o GROUP BY order_id 返回結果:

3.6 GeneratedKeyToken

前置閱讀:《SQL 解析(四)之插入SQL》

GeneratedKeyToken,和其它 SQLToken 不同,在 SQL解析 完進行處理。

// ParsingSQLRouter.java
@Override
public SQLStatement parse(final String logicSQL, final int parametersSize) {
   SQLParsingEngine parsingEngine = new SQLParsingEngine(databaseType, logicSQL, shardingRule);
   Context context = MetricsContext.start("Parse SQL");
   SQLStatement result = parsingEngine.parse();
   if (result instanceof InsertStatement) { // 處理 GenerateKeyToken
       ((InsertStatement) result).appendGenerateKeyToken(shardingRule, parametersSize);
   }
   MetricsContext.stop(context);
   return result;
}

// InsertStatement.java
/**
* 追加自增主鍵標記物件.
*
* @param shardingRule 分片規則
* @param parametersSize 引數個數
*/
public void appendGenerateKeyToken(final ShardingRule shardingRule, final int parametersSize) {
   // SQL 裡有主鍵列
   if (null != generatedKey) {
       return;
   }
   // TableRule 存在
   Optional<TableRule> tableRule = shardingRule.tryFindTableRule(getTables().getSingleTableName());
   if (!tableRule.isPresent()) {
       return;
   }
   // GeneratedKeyToken 存在
   Optional<GeneratedKeyToken> generatedKeysToken = findGeneratedKeyToken();
   if (!generatedKeysToken.isPresent()) {
       return;
   }
   // 處理 GenerateKeyToken
   ItemsToken valuesToken = new ItemsToken(generatedKeysToken.get().getBeginPosition());
   if (0 == parametersSize) {
       appendGenerateKeyToken(shardingRule, tableRule.get(), valuesToken);
   } else {
       appendGenerateKeyToken(shardingRule, tableRule.get(), valuesToken, parametersSize);
   }
   // 移除 generatedKeysToken
   getSqlTokens().remove(generatedKeysToken.get());
   // 新增 ItemsToken
   getSqlTokens().add(valuesToken);
}
  • 根據佔位符引數數量不同,呼叫的 #appendGenerateKeyToken()不同的:
  • 佔位符引數數量 = 0 時,直接生成分散式主鍵,保持無佔位符的做法。
// InsertStatement.java
private void appendGenerateKeyToken(final ShardingRule shardingRule, final TableRule tableRule, final ItemsToken valuesToken) {
   // 生成分散式主鍵
   Number generatedKey = shardingRule.generateKey(tableRule.getLogicTable());
   // 新增到 ItemsToken
   valuesToken.getItems().add(generatedKey.toString());
   // 增加 Condition,用於路由
   getConditions().add(new Condition(new Column(tableRule.getGenerateKeyColumn(), tableRule.getLogicTable()), new SQLNumberExpression(generatedKey)), shardingRule);
   // 生成 GeneratedKey
   this.generatedKey = new GeneratedKey(tableRule.getLogicTable(), -1, generatedKey);
}
  • 佔位符引數數量 > 0 時,生成自增列的佔位符,保持有佔位符的做法。
private void appendGenerateKeyToken(final ShardingRule shardingRule, final TableRule tableRule, final ItemsToken valuesToken, final int parametersSize) {
   // 生成佔位符
   valuesToken.getItems().add("?");
   // 增加 Condition,用於路由
   getConditions().add(new Condition(new Column(tableRule.getGenerateKeyColumn(), tableRule.getLogicTable()), new SQLPlaceholderExpression(parametersSize)), shardingRule);
   // 生成 GeneratedKey
   generatedKey = new GeneratedKey(tableRule.getGenerateKeyColumn(), parametersSize, null);
}
  • 因為 GenerateKeyToken 已經處理完,所以移除,避免 SQLRewriteEngine#rewrite() 二次改寫。另外,通過 ItemsToken 補充自增列。
  • 生成 GeneratedKey 會在 ParsingSQLRouter 進一步處理。
// ParsingSQLRouter.java
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
   final Context context = MetricsContext.start("Route SQL");
   SQLRouteResult result = new SQLRouteResult(sqlStatement);
   // 處理 插入SQL 主鍵欄位
   if (sqlStatement instanceof InsertStatement && null != ((InsertStatement) sqlStatement).getGeneratedKey()) {
       processGeneratedKey(parameters, (InsertStatement) sqlStatement, result);
   }
   // ... 省略部分程式碼
}  
/**
* 處理 插入SQL 主鍵欄位
* 當 主鍵編號 未生成時,{@link ShardingRule#generateKey(String)} 進行生成
* @param parameters 佔位符引數
* @param insertStatement Insert SQL語句物件
* @param sqlRouteResult SQL路由結果
*/
private void processGeneratedKey(final List<Object> parameters, final InsertStatement insertStatement, final SQLRouteResult sqlRouteResult) {
   GeneratedKey generatedKey = insertStatement.getGeneratedKey();
   if (parameters.isEmpty()) { // 已有主鍵,無佔位符,INSERT INTO t_order(order_id, user_id) VALUES (1, 100);
       sqlRouteResult.getGeneratedKeys().add(generatedKey.getValue());
   } else if (parameters.size() == generatedKey.getIndex()) { // 主鍵欄位不存在存在,INSERT INTO t_order(user_id) VALUES(?);
       Number key = shardingRule.generateKey(insertStatement.getTables().getSingleTableName()); // 生成主鍵編號
       parameters.add(key);
       setGeneratedKeys(sqlRouteResult, key);
   } else if (-1 != generatedKey.getIndex()) { // 主鍵欄位存在,INSERT INTO t_order(order_id, user_id) VALUES(?, ?);
       setGeneratedKeys(sqlRouteResult, (Number) parameters.get(generatedKey.getIndex()));
   }
}
/**
* 設定 主鍵編號 到 SQL路由結果
* @param sqlRouteResult SQL路由結果
* @param generatedKey 主鍵編號
*/
private void setGeneratedKeys(final SQLRouteResult sqlRouteResult, final Number generatedKey) {
   generatedKeys.add(generatedKey);
   sqlRouteResult.getGeneratedKeys().clear();
   sqlRouteResult.getGeneratedKeys().addAll(generatedKeys);
}
  • parameters.size()==generatedKey.getIndex() 處對應 #appendGenerateKeyToken()佔位符引數數量 > 0 情況,此時會生成分散式主鍵。? 該處是不是可以考慮把生成分散式主鍵挪到 #appendGenerateKeyToken(),這樣更加統一一些。

4. SQL 生成

SQL路由完後,會生成各資料分片的執行SQL

// ParsingSQLRouter.java
@Override
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
   SQLRouteResult result = new SQLRouteResult(sqlStatement);
   // 省略部分程式碼... 處理 插入SQL 主鍵欄位

   // 路由
   RoutingResult routingResult = route(parameters, sqlStatement);

   // 省略部分程式碼... SQL重寫引擎
   SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, logicSQL, sqlStatement);
   boolean isSingleRouting = routingResult.isSingleRouting();
   // 省略部分程式碼... 處理分頁
   // SQL 重寫
   SQLBuilder sqlBuilder = rewriteEngine.rewrite(!isSingleRouting);
   // 生成 ExecutionUnit
   if (routingResult instanceof CartesianRoutingResult) {
       for (CartesianDataSource cartesianDataSource : ((CartesianRoutingResult) routingResult).getRoutingDataSources()) {
           for (CartesianTableReference cartesianTableReference : cartesianDataSource.getRoutingTableReferences()) {
               // ? 生成 SQL
               result.getExecutionUnits().add(new SQLExecutionUnit(cartesianDataSource.getDataSource(), rewriteEngine.generateSQL(cartesianTableReference, sqlBuilder)));
           }
       }
   } else {
       for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {
           // ? 生成 SQL
           result.getExecutionUnits().add(new SQLExecutionUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder)));
       }
   }
   return result;
}
  • 呼叫 RewriteEngine#generateSQL() 生成執行SQL。對於笛卡爾積路由結果和簡單路由結果傳遞的引數略有不同:前者使用 CartesianDataSource ( CartesianTableReference ),後者使用路由表單元 ( TableUnit )。對路由結果不是很瞭解的同學,建議看下 《SQL 路由(二)之分庫分表路由》。

RewriteEngine#generateSQL() 對於笛卡爾積路由結果和簡單路由結果兩種情況,處理上大體是一致的:1. 獲得 SQL 相關邏輯表對應的真實表對映,2. 根據對映改寫 SQL 相關邏輯表真實表

// SQLRewriteEngine.java
/**
* 生成SQL語句.
* @param tableUnit 路由表單元
* @param sqlBuilder SQL構建器
* @return SQL語句
*/
public String generateSQL(final TableUnit tableUnit, final SQLBuilder sqlBuilder) {
   return sqlBuilder.toSQL(getTableTokens(tableUnit));
}  
/**
* 生成SQL語句.
* @param cartesianTableReference 笛卡爾積路由表單元
* @param sqlBuilder SQL構建器
* @return SQL語句
*/
public String generateSQL(final CartesianTableReference cartesianTableReference, final SQLBuilder sqlBuilder) {
   return sqlBuilder.toSQL(getTableTokens(cartesianTableReference));
}

// SQLRewriteEngine.java
// SQLBuilder.java
/**
* 生成SQL語句.
* @param tableTokens 佔位符集合(邏輯表與真實表對映)
* @return SQL語句
*/
public String toSQL(final Map<String, String> tableTokens) {
   StringBuilder result = new StringBuilder();
   for (Object each : segments) {
       if (each instanceof TableToken && tableTokens.containsKey(((TableToken) each).tableName)) {
           result.append(tableTokens.get(((TableToken) each).tableName));
       } else {
           result.append(each);
       }
   }
   return result.toString();
}
  • #toSQL() 結果如圖:

? 對 SQL改寫 是不是清晰很多了。


下面我們以笛卡爾積路由結果獲得 SQL 相關邏輯表對應的真實表對映為例子(簡單路由結果基本類似而且簡單)。

// SQLRewriteEngine.java
/**
* 獲得(笛卡爾積表路由組裡的路由表單元邏輯表 和 與其互為BindingTable關係的邏輯表)對應的真實表對映(邏輯表需要在 SQL 中存在)
* @param cartesianTableReference 笛卡爾積表路由組
* @return 集合
*/
private Map<String, String> getTableTokens(final CartesianTableReference cartesianTableReference) {
   Map<String, String> tableTokens = new HashMap<>();
   for (TableUnit each : cartesianTableReference.getTableUnits()) {
       tableTokens.put(each.getLogicTableName(), each.getActualTableName());
       // 查詢 BindingTableRule
       Optional<BindingTableRule> bindingTableRule = shardingRule.findBindingTableRule(each.getLogicTableName());
       if (bindingTableRule.isPresent()) {
           tableTokens.putAll(getBindingTableTokens(each, bindingTableRule.get()));
       }
   }
   return tableTokens;
}
/**
* 獲得 BindingTable 關係的邏輯表對應的真實表對映(邏輯表需要在 SQL 中存在)
* @param tableUnit 路由單元
* @param bindingTableRule Binding表規則配置物件
* @return 對映
*/
private Map<String, String> getBindingTableTokens(final TableUnit tableUnit, final BindingTableRule bindingTableRule) {
   Map<String, String> result = new HashMap<>();
   for (String eachTable : sqlStatement.getTables().getTableNames()) {
       if (!eachTable.equalsIgnoreCase(tableUnit.getLogicTableName()) && bindingTableRule.hasLogicTable(eachTable)) {
           result.put(eachTable, bindingTableRule.getBindingActualTable(tableUnit.getDataSourceName(), eachTable, tableUnit.getActualTableName()));
       }
   }
   return result;
}
  • 笛卡爾積表路由組( CartesianTableReference )包含多個路由表單元( TableUnit )。每個路由表單元需要遍歷。
  • 路由表單元本身包含邏輯表和真實表,直接新增到對映即可。
  • 互為 BindingTable 關係的表只計算一次路由分片,因此未計算的真實表需要以其對應的已計算的真實表去查詢,即 bindingTableRule.getBindingActualTable(tableUnit.getDataSourceName(),eachTable,tableUnit.getActualTableName()) 處邏輯。
// BindingTableRule.java
/**
* 根據其他Binding表真實表名稱獲取相應的真實Binding表名稱.
* 
* @param dataSource 資料來源名稱
* @param logicTable 邏輯表名稱
* @param otherActualTable 其他真實Binding表名稱
* @return 真實Binding表名稱
*/
public String getBindingActualTable(final String dataSource, final String logicTable, final String otherActualTable) {
   // 計算 otherActualTable 在其 TableRule 的 actualTable 是第幾個
   int index = -1;
   for (TableRule each : tableRules) {
       if (each.isDynamic()) {
           throw new UnsupportedOperationException("Dynamic table cannot support Binding table.");
       }
       index = each.findActualTableIndex(dataSource, otherActualTable);
       if (-1 != index) {
           break;
       }
   }
   Preconditions.checkState(-1 != index, String.format("Actual table [%s].[%s] is not in table config", dataSource, otherActualTable));
   // 計算 logicTable 在其 TableRule 的 第index 的 真實表
   for (TableRule each : tableRules) {
       if (each.getLogicTable().equalsIgnoreCase(logicTable)) {
           return each.getActualTables().get(index).getTableName();
       }
   }
   throw new IllegalStateException(String.format("Cannot find binding actual table, data source: %s, logic table: %s, other actual table: %s", dataSource, logicTable, otherActualTable));
}

可能看起來有些繞,我們看張圖:

友情提示:這裡不嫌囉嗦在提一句,互為 BindingTable 的表,配置 TableRule 時, actualTables 數量一定要一致,否則多出來的表,可能會無法被路由到。