資料庫分庫分表中介軟體 Sharding-JDBC 原始碼分析 —— SQL 路由(二)之分庫分表路由
本文主要基於 Sharding-JDBC 1.5.0 正式版
- 1. 概述
- 2. SQLRouteResult
- 3. 路由策略 x 演算法
- 4. SQL 路由
- 5. DatabaseHintSQLRouter
- 6. ParsingSQLRouter
- 6.1 SimpleRoutingEngine
- 6.2 ComplexRoutingEngine
- 6.3 CartesianRoutingEngine
- 6.3 ParsingSQLRouter 主#route()
1. 概述
本文分享分表分庫路由相關的實現。涉及內容如下:
- SQL 路由結果
- 路由策略 x 演算法
- SQL 路由器
內容順序如編號。
SQL 路由大體流程如下:
2. SQLRouteResult
經過 SQL解析、SQL路由後,產生SQL路由結果,即 SQLRouteResult。根據路由結果,生成SQL,執行SQL。
-
sqlStatement
:SQL語句物件,經過SQL解析的結果物件。 -
executionUnits
:SQL最小執行單元集合。SQL執行時,執行每個單元。 -
generatedKeys
:插入SQL語句生成的主鍵編號集合。目前不支援批量插入而使用集合的原因,猜測是為了未來支援批量插入做準備。
3. 路由策略 x 演算法
ShardingStrategy,分片策略。目前支援兩種分片:
分片資源:在分庫策略裡指的是庫,在分表策略裡指的是表。
【1】 計算靜態分片(常用)
// ShardingStrategy.java /** * 計算靜態分片. * @param sqlType SQL語句的型別 * @param availableTargetNames 所有的可用分片資源集合 * @param shardingValues 分片值集合 * @return 分庫後指向的資料來源名稱集合 */ public Collection<String> doStaticSharding(final SQLType sqlType, final Collection<String> availableTargetNames, final Collection<ShardingValue<?>> shardingValues) { Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); if (shardingValues.isEmpty()) { Preconditions.checkState(!isInsertMultiple(sqlType, availableTargetNames), "INSERT statement should contain sharding value."); // 插入不能有多資源物件 result.addAll(availableTargetNames); } else { result.addAll(doSharding(shardingValues, availableTargetNames)); } return result; } /** * 插入SQL 是否插入多個分片 * @param sqlType SQL型別 * @param availableTargetNames 所有的可用分片資源集合 * @return 是否 */ private boolean isInsertMultiple(final SQLType sqlType, final Collection<String> availableTargetNames) { return SQLType.INSERT == sqlType && availableTargetNames.size() > 1; }
- 插入SQL 需要有片鍵值,否則無法判斷單個分片資源。(Sharding-JDBC 目前僅支援單條記錄插入)
【2】計算動態分片
// ShardingStrategy.java
/**
* 計算動態分片.
* @param shardingValues 分片值集合
* @return 分庫後指向的分片資源集合
*/
public Collection<String> doDynamicSharding(final Collection<ShardingValue<?>> shardingValues) {
Preconditions.checkState(!shardingValues.isEmpty(), "Dynamic table should contain sharding value."); // 動態分片必須有分片值
Collection<String> availableTargetNames = Collections.emptyList();
Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
result.addAll(doSharding(shardingValues, availableTargetNames));
return result;
}
- 動態分片對應
TableRule.dynamic=true
- 動態分片必須有分片值
? 悶了,看起來兩者沒啥區別?答案在分片演算法上。我們先看 #doSharding()
方法的實現。
// ShardingStrategy.java
/**
* 計算分片
* @param shardingValues 分片值集合
* @param availableTargetNames 所有的可用分片資源集合
* @return 分庫後指向的分片資源集合
*/
private Collection<String> doSharding(final Collection<ShardingValue<?>> shardingValues, final Collection<String> availableTargetNames) {
// 無片鍵
if (shardingAlgorithm instanceof NoneKeyShardingAlgorithm) {
return Collections.singletonList(((NoneKeyShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues.iterator().next()));
}
// 單片鍵
if (shardingAlgorithm instanceof SingleKeyShardingAlgorithm) {
SingleKeyShardingAlgorithm<?> singleKeyShardingAlgorithm = (SingleKeyShardingAlgorithm<?>) shardingAlgorithm;
ShardingValue shardingValue = shardingValues.iterator().next();
switch (shardingValue.getType()) {
case SINGLE:
return Collections.singletonList(singleKeyShardingAlgorithm.doEqualSharding(availableTargetNames, shardingValue));
case LIST:
return singleKeyShardingAlgorithm.doInSharding(availableTargetNames, shardingValue);
case RANGE:
return singleKeyShardingAlgorithm.doBetweenSharding(availableTargetNames, shardingValue);
default:
throw new UnsupportedOperationException(shardingValue.getType().getClass().getName());
}
}
// 多片鍵
if (shardingAlgorithm instanceof MultipleKeysShardingAlgorithm) {
return ((MultipleKeysShardingAlgorithm) shardingAlgorithm).doSharding(availableTargetNames, shardingValues);
}
throw new UnsupportedOperationException(shardingAlgorithm.getClass().getName());
}
- 無分片鍵演算法:對應 NoneKeyShardingAlgorithm 分片演算法介面。
public interface NoneKeyShardingAlgorithm<T extends Comparable<?>> extends ShardingAlgorithm {
String doSharding(Collection<String> availableTargetNames, ShardingValue<T> shardingValue);
}
- 單片鍵演算法:對應 SingleKeyShardingAlgorithm 分片演算法介面。
public interface SingleKeyShardingAlgorithm<T extends Comparable<?>> extends ShardingAlgorithm {
String doEqualSharding(Collection<String> availableTargetNames, ShardingValue<T> shardingValue);
Collection<String> doInSharding(Collection<String> availableTargetNames, ShardingValue<T> shardingValue);
Collection<String> doBetweenSharding(Collection<String> availableTargetNames, ShardingValue<T> shardingValue);
}
ShardingValueType |
SQL 操作符 |
介面方法 |
---|---|---|
SINGLE |
= |
#doEqualSharding() |
LIST |
IN |
#doInSharding() |
RANGE |
BETWEEN |
#doBetweenSharding() |
多片鍵演算法:對應 MultipleKeysShardingAlgorithm 分片演算法介面。 |
||
public interface MultipleKeysShardingAlgorithm extends ShardingAlgorithm { Collection<String> doSharding(Collection<String> availableTargetNames, Collection<ShardingValue<?>> shardingValues);} |
- 多片鍵演算法:對應 MultipleKeysShardingAlgorithm 分片演算法介面。
-
public
interface
MultipleKeysShardingAlgorithm
extends
ShardingAlgorithm
{
-
Collection<String> doSharding(Collection<String> availableTargetNames,
Collection<ShardingValue<?>> shardingValues);
}
分片演算法類結構如下:
來看看 Sharding-JDBC 實現的無需分庫的分片演算法 NoneDatabaseShardingAlgorithm (NoneTableShardingAlgorithm 基本一模一樣):
public final class NoneDatabaseShardingAlgorithm implements SingleKeyDatabaseShardingAlgorithm<String>, MultipleKeysDatabaseShardingAlgorithm {
@Override
public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<ShardingValue<?>> shardingValues) {
return availableTargetNames;
}
@Override
public String doEqualSharding(final Collection<String> availableTargetNames, final ShardingValue<String> shardingValue) {
return availableTargetNames.isEmpty() ? null : availableTargetNames.iterator().next();
}
@Override
public Collection<String> doInSharding(final Collection<String> availableTargetNames, final ShardingValue<String> shardingValue) {
return availableTargetNames;
}
@Override
public Collection<String> doBetweenSharding(final Collection<String> availableTargetNames, final ShardingValue<String> shardingValue) {
return availableTargetNames;
}
}
-
一定要注意,NoneXXXXShardingAlgorithm 只適用於無分庫/表的需求,否則會是錯誤的路由結果。例如,
#doEqualSharding()
返回的是第一個分片資源。
再來看測試目錄下實現的餘數基偶分表演算法 ModuloTableShardingAlgorithm 的實現:
// com.dangdang.ddframe.rdb.integrate.fixture.ModuloTableShardingAlgorithm.java
public final class ModuloTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<Integer> {
@Override
public String doEqualSharding(final Collection<String> tableNames, final ShardingValue<Integer> shardingValue) {
for (String each : tableNames) {
if (each.endsWith(shardingValue.getValue() % 2 + "")) {
return each;
}
}
throw new UnsupportedOperationException();
}
@Override
public Collection<String> doInSharding(final Collection<String> tableNames, final ShardingValue<Integer> shardingValue) {
Collection<String> result = new LinkedHashSet<>(tableNames.size());
for (Integer value : shardingValue.getValues()) {
for (String tableName : tableNames) {
if (tableName.endsWith(value % 2 + "")) {
result.add(tableName);
}
}
}
return result;
}
@Override
public Collection<String> doBetweenSharding(final Collection<String> tableNames, final ShardingValue<Integer> shardingValue) {
Collection<String> result = new LinkedHashSet<>(tableNames.size());
Range<Integer> range = shardingValue.getValueRange();
for (Integer i = range.lowerEndpoint(); i <= range.upperEndpoint(); i++) {
for (String each : tableNames) {
if (each.endsWith(i % 2 + "")) {
result.add(each);
}
}
}
return result;
}
}
- 我們可以參考這個例子編寫自己的分片算喲 ?。
- 多片鍵分庫演算法介面實現例子:MultipleKeysModuloDatabaseShardingAlgorithm.java
? 來看看動態計算分片需要怎麼實現分片演算法。
// com.dangdang.ddframe.rdb.integrate.fixture.SingleKeyDynamicModuloTableShardingAlgorithm.java
public final class SingleKeyDynamicModuloTableShardingAlgorithm implements SingleKeyTableShardingAlgorithm<Integer> {
/**
* 表字首
*/
private final String tablePrefix;
@Override
public String doEqualSharding(final Collection<String> availableTargetNames, final ShardingValue<Integer> shardingValue) {
return tablePrefix + shardingValue.getValue() % 10;
}
@Override
public Collection<String> doInSharding(final Collection<String> availableTargetNames, final ShardingValue<Integer> shardingValue) {
Collection<String> result = new LinkedHashSet<>(shardingValue.getValues().size());
for (Integer value : shardingValue.getValues()) {
result.add(tablePrefix + value % 10);
}
return result;
}
@Override
public Collection<String> doBetweenSharding(final Collection<String> availableTargetNames, final ShardingValue<Integer> shardingValue) {
Collection<String> result = new LinkedHashSet<>(availableTargetNames.size());
Range<Integer> range = shardingValue.getValueRange();
for (Integer i = range.lowerEndpoint(); i <= range.upperEndpoint(); i++) {
result.add(tablePrefix + i % 10);
}
return result;
}
}
- 騷年,是不是明白了一些?動態表無需把真實表配置到 TableRule,而是通過分片演算法計算出真實表。
4. SQL 路由
SQLRouter,SQL 路由器介面,共有兩種實現:
- DatabaseHintSQLRouter:通過提示且僅路由至資料庫的SQL路由器
- ParsingSQLRouter:需要解析的SQL路由器
它們實現 #parse()
進行SQL解析, #route()
進行SQL路由。
RoutingEngine,路由引擎介面,共有四種實現:
- DatabaseHintRoutingEngine:基於資料庫提示的路由引擎
- SimpleRoutingEngine:簡單路由引擎
- CartesianRoutingEngine:笛卡爾積的庫表路由
- ComplexRoutingEngine:混合多庫表路由引擎
ComplexRoutingEngine 根據路由結果會轉化成 SimpleRoutingEngine 或 ComplexRoutingEngine。下文會看相應原始碼。
路由結果有兩種:
- RoutingResult:簡單路由結果
- CartesianRoutingResult:笛卡爾積路由結果
從圖中,我們已經能大概看到兩者有什麼區別,更具體的下文隨原始碼一起分享。
? SQLRouteResult 和 RoutingResult 有什麼區別?
- SQLRouteResult:整個SQL路由返回的路由結果
- RoutingResult:RoutingEngine返回路由結果
一下子看到這麼多"物件",可能有點緊張。不要緊張,我們一起在整理下。
路由器 |
路由引擎 |
路由結果 |
---|---|---|
DatabaseHintSQLRouter |
DatabaseHintRoutingEngine |
RoutingResult |
ParsingSQLRouter |
SimpleRoutingEngine |
RoutingResult |
ParsingSQLRouter |
CartesianRoutingEngine |
CartesianRoutingResult |
? 逗比博主給大家解決了"物件",是不是應該分享朋友圈。
5. DatabaseHintSQLRouter
DatabaseHintSQLRouter,基於資料庫提示的路由引擎。路由器工廠 SQLRouterFactory 建立路由器時,判斷到使用資料庫提示( Hint ) 時,建立 DatabaseHintSQLRouter。
// DatabaseHintRoutingEngine.java
public static SQLRouter createSQLRouter(final ShardingContext shardingContext) {
return HintManagerHolder.isDatabaseShardingOnly() ? new DatabaseHintSQLRouter(shardingContext) : new ParsingSQLRouter(shardingContext);
}
先來看下 HintManagerHolder、HintManager 部分相關的程式碼:
// HintManagerHolder.java
public final class HintManagerHolder {
/**
* HintManager 執行緒變數
*/
private static final ThreadLocal<HintManager> HINT_MANAGER_HOLDER = new ThreadLocal<>();
/**
* 判斷是否當前只分庫.
*
* @return 是否當前只分庫.
*/
public static boolean isDatabaseShardingOnly() {
return null != HINT_MANAGER_HOLDER.get() && HINT_MANAGER_HOLDER.get().isDatabaseShardingOnly();
}
/**
* 清理線索分片管理器的本地執行緒持有者.
*/
public static void clear() {
HINT_MANAGER_HOLDER.remove();
}
}
// HintManager.java
public final class HintManager implements AutoCloseable {
/**
* 庫分片值集合
*/
private final Map<ShardingKey, ShardingValue<?>> databaseShardingValues = new HashMap<>();
/**
* 只做庫分片
* {@link DatabaseHintRoutingEngine}
*/
@Getter
private boolean databaseShardingOnly;
/**
* 獲取線索分片管理器例項.
*
* @return 線索分片管理器例項
*/
public static HintManager getInstance() {
HintManager result = new HintManager();
HintManagerHolder.setHintManager(result);
return result;
}
/**
* 設定分庫分片值.
*
* <p>分片操作符為等號.該方法適用於只分庫的場景</p>
*
* @param value 分片值
*/
public void setDatabaseShardingValue(final Comparable<?> value) {
databaseShardingOnly = true;
addDatabaseShardingValue(HintManagerHolder.DB_TABLE_NAME, HintManagerHolder.DB_COLUMN_NAME, value);
}
}
那麼如果要使用 DatabaseHintSQLRouter,我們只需要 HintManager.getInstance().setDatabaseShardingValue(庫分片值)
即可。這裡有兩點要注意下:
-
HintManager#getInstance()
,每次獲取到的都是新的 HintManager,多次賦值需要小心。 -
HintManager#close()
,使用完需要去清理,避免下個請求讀到遺漏的執行緒變數。
看看 DatabaseHintSQLRouter 的實現:
// DatabaseHintSQLRouter.java
@Override
public SQLStatement parse(final String logicSQL, final int parametersSize) {
return new SQLJudgeEngine(logicSQL).judge(); // 只解析 SQL 型別
}
@Override
// TODO insert的SQL仍然需要解析自增主鍵
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
Context context = MetricsContext.start("Route SQL");
SQLRouteResult result = new SQLRouteResult(sqlStatement);
// 路由
RoutingResult routingResult = new DatabaseHintRoutingEngine(shardingRule.getDataSourceRule(), shardingRule.getDatabaseShardingStrategy(), sqlStatement.getType())
.route();
// SQL最小執行單元
for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {
result.getExecutionUnits().add(new SQLExecutionUnit(each.getDataSourceName(), logicSQL));
}
MetricsContext.stop(context);
if (showSQL) {
SQLLogger.logSQL(logicSQL, sqlStatement, result.getExecutionUnits(), parameters);
}
return result;
}
-
#parse()
只解析了 SQL 型別,即 SELECT / UPDATE / DELETE / INSERT 。 -
使用的分庫策略來自 ShardingRule,不是 TableRule,這個一定要留心。❓因為 SQL 未解析表名。因此,即使在 TableRule 設定了
actualTables
屬性也是沒有效果的。 - 目前不支援 Sharding-JDBC 的主鍵自增。❓因為 SQL 未解析自增主鍵。從程式碼上的
TODO
應該會支援。 -
HintManager.getInstance().setDatabaseShardingValue(庫分片值)
設定的庫分片值使用的是 EQUALS,因而分庫策略計算出來的只有一個庫分片,即 TableUnit 只有一個,SQLExecutionUnit 只有一個。
看看 DatabaseHintSQLRouter 的實現:
// DatabaseHintRoutingEngine.java
@Override
public RoutingResult route() {
// 從 Hint 獲得 分片鍵值
Optional<ShardingValue<?>> shardingValue = HintManagerHolder.getDatabaseShardingValue(new ShardingKey(HintManagerHolder.DB_TABLE_NAME, HintManagerHolder.DB_COLUMN_NAME));
Preconditions.checkState(shardingValue.isPresent());
log.debug("Before database sharding only db:{} sharding values: {}", dataSourceRule.getDataSourceNames(), shardingValue.get());
// 路由。表分片規則使用的是 ShardingRule 裡的。因為沒 SQL 解析。
Collection<String> routingDataSources = databaseShardingStrategy.doStaticSharding(sqlType, dataSourceRule.getDataSourceNames(), Collections.<ShardingValue<?>>singleton(shardingValue.get()));
Preconditions.checkState(!routingDataSources.isEmpty(), "no database route info");
log.debug("After database sharding only result: {}", routingDataSources);
// 路由結果
RoutingResult result = new RoutingResult();
for (String each : routingDataSources) {
result.getTableUnits().getTableUnits().add(new TableUnit(each, "", ""));
}
return result;
}
-
只呼叫
databaseShardingStrategy.doStaticSharding()
方法計算庫分片。 -
newTableUnit(each,"","")
的logicTableName
,actualTableName
都是空串,相信原因你已經知道。
6. ParsingSQLRouter
ParsingSQLRouter,需要解析的SQL路由器。
ParsingSQLRouter 使用 SQLParsingEngine 解析SQL。對SQL解析有興趣的同學可以看看拙作《Sharding-JDBC 原始碼分析 —— SQL 解析》。
// ParsingSQLRouter.java
public SQLStatement parse(final String logicSQL, final int parametersSize) {
SQLParsingEngine parsingEngine = new SQLParsingEngine(databaseType, logicSQL, shardingRule);
Context context = MetricsContext.start("Parse SQL");
SQLStatement result = parsingEngine.parse();
if (result instanceof InsertStatement) {
((InsertStatement) result).appendGenerateKeyToken(shardingRule, parametersSize);
}
MetricsContext.stop(context);
return result;
}
-
#appendGenerateKeyToken()
會在《SQL 改寫》分享
ParsingSQLRouter 在路由時,會根據表情況使用 SimpleRoutingEngine 或 CartesianRoutingEngine 進行路由。
private RoutingResult route(final List<Object> parameters, final SQLStatement sqlStatement) {
Collection<String> tableNames = sqlStatement.getTables().getTableNames();
RoutingEngine routingEngine;
if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames)) {
routingEngine = new SimpleRoutingEngine(shardingRule, parameters, tableNames.iterator().next(), sqlStatement);
} else {
// TODO 可配置是否執行笛卡爾積
routingEngine = new ComplexRoutingEngine(shardingRule, parameters, tableNames, sqlStatement);
}
return routingEngine.route();
}
- 當只進行一張表或者多表互為BindingTable關係時,使用 SimpleRoutingEngine 簡單路由引擎。多表互為BindingTable關係時,每張表的路由結果是相同的,所以只要計算第一張表的分片即可。
-
tableNames.iterator().next()
注意下,tableNames
變數是newTreeMap<>(String.CASE_INSENSITIVE_ORDER)
。所以SELECT*FROM t_order o join t_order_item i ON o.order_id=i.order_id
即使t_order_item
排在t_order
前面,tableNames.iterator().next()
返回的是t_order
。當t_order
和t_order_item
為 BindingTable關係 時,計算的是t_order
路由分片。 - BindingTable關係在 ShardingRule 的
tableRules
配置。配置該關係 TableRule 有如下需要遵守的規則:- 分片策略與演算法相同
- 資料來源配置物件相同
- 真實表數量相同
舉個例子:
- SQL :
SELECT*FROM t_order o join t_order_item i ON o.order_id=i.order_id
- 分庫分表情況:
multi_db_multi_table_01
├── t_order_0 ├── t_order_item_01
└── t_order_1 ├── t_order_item_02
├── t_order_item_03
├── t_order_item_04
multi_db_multi_table_02
├── t_order_0 ├── t_order_item_01
└── t_order_1 ├── t_order_item_02
├── t_order_item_03
├── t_order_item_04
最終執行的SQL如下:
SELECT * FROM t_order_item_01 i JOIN t_order_01 o ON o.order_id = i.order_id
SELECT * FROM t_order_item_01 i JOIN t_order_01 o ON o.order_id = i.order_id
SELECT * FROM t_order_item_02 i JOIN t_order_02 o ON o.order_id = i.order_id
SELECT * FROM t_order_item_02 i JOIN t_order_02 o ON o.order_id = i.order_id
-
t_order_item_03
、t_order_item_04
無法被查詢到。
下面我們看看 #isAllBindingTables()
如何實現多表互為BindingTable關係。
// ShardingRule.java
// 呼叫順序 #isAllBindingTables()=>#filterAllBindingTables()=>#findBindingTableRule()=>#findBindingTableRule()
/**
* 判斷邏輯表名稱集合是否全部屬於Binding表.
* @param logicTables 邏輯表名稱集合
*/
public boolean isAllBindingTables(final Collection<String> logicTables) {
Collection<String> bindingTables = filterAllBindingTables(logicTables);
return !bindingTables.isEmpty() && bindingTables.containsAll(logicTables);
}
/**
* 過濾出所有的Binding表名稱.
*/
public Collection<String> filterAllBindingTables(final Collection<String> logicTables) {
if (logicTables.isEmpty()) {
return Collections.emptyList();
}
Optional<BindingTableRule> bindingTableRule = findBindingTableRule(logicTables);
if (!bindingTableRule.isPresent()) {
return Collections.emptyList();
}
// 交集
Collection<String> result = new ArrayList<>(bindingTableRule.get().getAllLogicTables());
result.retainAll(logicTables);
return result;
}
/**
* 獲得包含<strong>任一</strong>在邏輯表名稱集合的binding表配置的邏輯表名稱集合
*/
private Optional<BindingTableRule> findBindingTableRule(final Collection<String> logicTables) {
for (String each : logicTables) {
Optional<BindingTableRule> result = findBindingTableRule(each);
if (result.isPresent()) {
return result;
}
}
return Optional.absent();
}
/**
* 根據邏輯表名稱獲取binding表配置的邏輯表名稱集合.
*/
public Optional<BindingTableRule> findBindingTableRule(final String logicTable) {
for (BindingTableRule each : bindingTableRules) {
if (each.hasLogicTable(logicTable)) {
return Optional.of(each);
}
}
return Optional.absent();
}
- 邏輯看起來比較長,目的是找到一條 BindingTableRule 包含所有邏輯表集合
- 不支援《傳遞關係》:配置 BindingTableRule 時,相同繫結關係一定要配置在一條,必須是
[a,b,c]
,而不能是[a,b],[b,c]
。
6.1 SimpleRoutingEngine
SimpleRoutingEngine,簡單路由引擎。
// SimpleRoutingEngine.java
// ... 超過微信30000字限制,省略程式碼。請點選原文閱讀。
- 可以使用 HintManager 設定庫分片值進行強制路由。
-
#getShardingValues()
我們看到了《SQL 解析(二)之SQL解析》分享的 Condition 物件。之前我們提到過Parser 半理解SQL的目的之一是:提煉分片上下文,此處即是該目的的體現。Condition 裡只放明確影響路由的條件,例如:order_id=1
,order_id IN(1,2)
,order_id BETWEEN(1,3)
,不放無法計算的條件,例如:o.order_id=i.order_id
。該方法裡,使用分片鍵從 Condition 查詢 分片值。? 是不是對 Condition 的認識更加清晰一丟丟落。
// SimpleRoutingEngine.java
// ... 超過微信30000字限制,省略程式碼。請點選原文閱讀。
- 可以使用 HintManager 設定表分片值進行強制路由。
- 根據
dynamic
屬性來判斷呼叫#doDynamicSharding()
還是#doStaticSharding()
計算分片。
// SimpleRoutingEngine.java
// ... 超過微信30000字限制,省略程式碼。請點選原文閱讀。
- 在 SimpleRoutingEngine 只生成了當前表的 TableUnits。如果存在與其互為BindingTable關係的表的 TableUnits 怎麼獲得?你可以想想噢,當然在後文《SQL 改寫》也會給出答案,看看和你想的是否一樣。
6.2 ComplexRoutingEngine
ComplexRoutingEngine,混合多庫表路由引擎。
// ComplexRoutingEngine.java
// ... 超過微信30000字限制,省略程式碼。請點選原文閱讀。
- ComplexRoutingEngine 計算每個邏輯表的簡單路由分片,路由結果交給 CartesianRoutingEngine 繼續路由形成笛卡爾積結果。
- 由於目前 ComplexRoutingEngine 路由前已經判斷全部表互為 BindingTable 關係,因而不會出現
result.size==1
,屬於防禦性程式設計。 - 部分表互為 BindingTable 關係時,ComplexRoutingEngine 不重複計算分片。
6.3 CartesianRoutingEngine
CartesianRoutingEngine,笛卡爾積的庫表路由。
實現邏輯上相對複雜,請保持耐心喲,? 其實目的就是實現連連看的效果:
- RoutingResult[0]
x
RoutingResult[1] ……x
RoutingResult[n- 1]x
RoutingResult[n] - 同庫 才可以進行笛卡爾積
// CartesianRoutingEngine.java
// ... 超過微信30000字限制,省略程式碼。請點選原文閱讀。
- 第一步,獲得同庫對應的邏輯表集合,即 Entry<資料來源(庫), Set<邏輯表>> entry。
- 第二步,遍歷資料來源(庫),獲得當前資料來源(庫)的路由表單元分組。
- 第三步,對路由表單元分組進行笛卡爾積,併合併到路由結果。
下面,我們一起逐步看看程式碼實現。
- SQL :
SELECT*FROM t_order o join t_order_item i ON o.order_id=i.order_id
- 分庫分表情況:
multi_db_multi_table_01
├── t_order_0 ├── t_order_item_01
└── t_order_1 ├── t_order_item_02
multi_db_multi_table_02
├── t_order_0 ├── t_order_item_01
└── t_order_1 ├── t_order_item_02
// 第一步
// CartesianRoutingEngine.java
/**
* 獲得同庫對應的邏輯表集合
*/
// ... 超過微信30000字限制,省略程式碼。請點選原文閱讀。
-
#getDataSourceLogicTablesMap()
返回如圖:
// 第二步
// CartesianRoutingEngine.java
private List<Set<String>> getActualTableGroups(final String dataSource, final Set<String> logicTables) {
List<Set<String>> result = new ArrayList<>(logicTables.size());
for (RoutingResult each : routingResults) {
result.addAll(each.getTableUnits().getActualTableNameGroups(dataSource, logicTables));
}
return result;
}
private List<Set<TableUnit>> toTableUnitGroups(final String dataSource, final List<Set<String>> actualTableGroups) {
List<Set<TableUnit>> result = new ArrayList<>(actualTableGroups.size());
for (Set<String> each : actualTableGroups) {
result.add(new HashSet<>(Lists.transform(new ArrayList<>(each), new Function<String, TableUnit>() {
@Override
public TableUnit apply(final String input) {
return findTableUnit(dataSource, input);
}
})));
}
return result;
}
-
#getActualTableGroups()
返回如圖:
-
#toTableUnitGroups()
返回如圖:
// CartesianRoutingEngine.java
private List<CartesianTableReference> getCartesianTableReferences(final Set<List<TableUnit>> cartesianTableUnitGroups) {
List<CartesianTableReference> result = new ArrayList<>(cartesianTableUnitGroups.size());
for (List<TableUnit> each : cartesianTableUnitGroups) {
result.add(new CartesianTableReference(each));
}
return result;
}
// CartesianRoutingResult.java
@Getter
private final List<CartesianDataSource> routingDataSources = new ArrayList<>();
void merge(final String dataSource, final Collection<CartesianTableReference> routingTableReferences) {
for (CartesianTableReference each : routingTableReferences) {
merge(dataSource, each);
}
}
private void merge(final String dataSource, final CartesianTableReference routingTableReference) {
for (CartesianDataSource each : routingDataSources) {
if (each.getDataSource().equalsIgnoreCase(dataSource)) {
each.getRoutingTableReferences().add(routingTableReference);
return;
}
}
routingDataSources.add(new CartesianDataSource(dataSource, routingTableReference));
}
-
Sets.cartesianProduct(tableUnitGroups)
返回如圖(Guava 工具庫真強大):
-
#getCartesianTableReferences()
返回如圖:
CartesianTableReference,笛卡爾積表路由組,包含多條 TableUnit,即 TableUnit[0] x
TableUnit[1] …… x
TableUnit[n]。例如圖中: t_order_01 x t_order_item_02
,最終轉換成 SQL 為 SELECT*FROM t_order_01 o join t_order_item_02 i ON o.order_id=i.order_id
。
-
#merge()
合併笛卡爾積路由結果。CartesianRoutingResult 包含多個 CartesianDataSource,因此需要將 CartesianTableReference 合併(新增)到對應的 CartesianDataSource。當然,目前在實現時已經是按照資料來源(庫)生成對應的 CartesianTableReference。
6.4 ParsingSQLRouter 主#route()
// ParsingSQLRouter.java
// ... 超過微信30000字限制,省略程式碼。請點選原文閱讀。
-
RoutingResultroutingResult=route(parameters,sqlStatement);
呼叫的就是上文分析的 SimpleRoutingEngine、ComplexRoutingEngine、CartesianRoutingEngine 的#route()
方法。 -
#processGeneratedKey()
、#processLimit()
、#rewrite()
、#generateSQL()
等會放在《SQL 改寫》 分享。