Sharding-Sphere初始化(三)
9. 執行StandardRoutingEngine#route獲取路由結果,獲取分表規則以及分庫分表對應的欄位列,獲取分庫分表對應的欄位對應的具體值,
public RoutingResult route() { TableRule tableRule = shardingRule.getTableRule(logicTableName); Collection<String> databaseShardingColumns = shardingRule.getDatabaseShardingStrategy(tableRule).getShardingColumns(); Collection<String> tableShardingColumns = shardingRule.getTableShardingStrategy(tableRule).getShardingColumns(); Collection<DataNode> routedDataNodes = new LinkedHashSet<>(); if (HintManagerHolder.isUseShardingHint()) { List<ShardingValue> databaseShardingValues = getDatabaseShardingValuesFromHint(databaseShardingColumns); List<ShardingValue> tableShardingValues = getTableShardingValuesFromHint(tableShardingColumns); Collection<DataNode> dataNodes = route(tableRule, databaseShardingValues, tableShardingValues); for (ShardingCondition each : shardingConditions.getShardingConditions()) { if (each instanceof InsertShardingCondition) { ((InsertShardingCondition) each).getDataNodes().addAll(dataNodes); } } routedDataNodes.addAll(dataNodes); } else { if (shardingConditions.getShardingConditions().isEmpty()) { routedDataNodes.addAll(route(tableRule, Collections.<ShardingValue>emptyList(), Collections.<ShardingValue>emptyList())); } else { for (ShardingCondition each : shardingConditions.getShardingConditions()) { List<ShardingValue> databaseShardingValues = getShardingValues(databaseShardingColumns, each); List<ShardingValue> tableShardingValues = getShardingValues(tableShardingColumns, each); Collection<DataNode> dataNodes = route(tableRule, databaseShardingValues, tableShardingValues); routedDataNodes.addAll(dataNodes); if (each instanceof InsertShardingCondition) { ((InsertShardingCondition) each).getDataNodes().addAll(dataNodes); } } } } return generateRoutingResult(routedDataNodes); }
路由到對應的資料節點,獲取到實際存在的資料庫源和表名,再根據xml檔案中配置的策略進行選擇StandardShardingStrategy#doSharding,PreciseShardingAlgorithm#doSharding選擇符合條件的資料節點。
private Collection<DataNode> route(final TableRule tableRule, final List<ShardingValue> databaseShardingValues, final List<ShardingValue> tableShardingValues) { Collection<String> routedDataSources = routeDataSources(tableRule, databaseShardingValues); Collection<DataNode> result = new LinkedList<>(); for (String each : routedDataSources) { result.addAll(routeTables(tableRule, each, tableShardingValues)); } return result; }
生成最後的路由結果,儲存對應的資料庫原,儲存邏輯表名和實際表名的對應關係。
10. 建立sql語句重寫引擎SQLRewriteEngine,重寫表名appendTablePlaceholder(result, (TableToken) each, count, sqlTokens);重寫()欄位列值appendInsertValuesToken(result, (InsertValuesToken) each, count, sqlTokens);
public SQLBuilder rewrite(final boolean isRewriteLimit) { SQLBuilder result = new SQLBuilder(parameters); if (sqlTokens.isEmpty()) { result.appendLiterals(originalSQL); return result; } int count = 0; sortByBeginPosition(); for (SQLToken each : sqlTokens) { if (0 == count) { result.appendLiterals(originalSQL.substring(0, each.getBeginPosition())); } if (each instanceof TableToken) { appendTablePlaceholder(result, (TableToken) each, count, sqlTokens); } else if (each instanceof SchemaToken) { appendSchemaPlaceholder(result, (SchemaToken) each, count, sqlTokens); } else if (each instanceof IndexToken) { appendIndexPlaceholder(result, (IndexToken) each, count, sqlTokens); } else if (each instanceof ItemsToken) { appendItemsToken(result, (ItemsToken) each, count, sqlTokens); } else if (each instanceof InsertValuesToken) { appendInsertValuesToken(result, (InsertValuesToken) each, count, sqlTokens); } else if (each instanceof RowCountToken) { appendLimitRowCount(result, (RowCountToken) each, count, sqlTokens, isRewriteLimit); } else if (each instanceof OffsetToken) { appendLimitOffsetToken(result, (OffsetToken) each, count, sqlTokens, isRewriteLimit); } else if (each instanceof OrderByToken) { appendOrderByToken(result, count, sqlTokens); } else if (each instanceof InsertColumnToken) { appendSymbolToken(result, (InsertColumnToken) each, count, sqlTokens); } count++; } return result; }
用SQLRewriteEngine#generateSQL組合成真正的sql語句,獲取邏輯實際表的對應關係,以及繫結表的關係。用SQLBuilder拼接sql,不屬於佔位符的直接拼接,根據邏輯表名獲取實際表名,拼接表名以及欄位名,解析()引數值以及具體引數值,組合成SQLUnit返回,最後把資料來源以及sql語句和具體的引數值包裝成SQLExecutionUnit放進SQLRouteResult返回。
public SQLUnit toSQL(final TableUnit tableUnit, final Map<String, String> logicAndActualTableMap, final ShardingRule shardingRule) {
List<Object> insertParameters = new LinkedList<>();
StringBuilder result = new StringBuilder();
for (Object each : segments) {
if (!(each instanceof ShardingPlaceholder)) {
result.append(each);
continue;
}
String logicTableName = ((ShardingPlaceholder) each).getLogicTableName();
String actualTableName = logicAndActualTableMap.get(logicTableName);
if (each instanceof TablePlaceholder) {
result.append(null == actualTableName ? logicTableName : actualTableName);
} else if (each instanceof SchemaPlaceholder) {
SchemaPlaceholder schemaPlaceholder = (SchemaPlaceholder) each;
Optional<TableRule> tableRule = shardingRule.tryFindTableRuleByActualTable(actualTableName);
if (!tableRule.isPresent() && Strings.isNullOrEmpty(shardingRule.getShardingDataSourceNames().getDefaultDataSourceName())) {
throw new ShardingException("Cannot found schema name '%s' in sharding rule.", schemaPlaceholder.getLogicSchemaName());
}
// TODO 目前只能找到真實資料來源名稱. 未來需要在初始化sharding rule時建立connection,並驗證連線是否正確,並獲取出真實的schema的名字, 然後在這裡替換actualDataSourceName為actualSchemaName
// TODO 目前actualDataSourceName必須actualSchemaName一樣,才能保證替換schema的場景不出錯, 如: show columns xxx
Preconditions.checkState(tableRule.isPresent());
result.append(tableRule.get().getActualDatasourceNames().iterator().next());
} else if (each instanceof IndexPlaceholder) {
IndexPlaceholder indexPlaceholder = (IndexPlaceholder) each;
result.append(indexPlaceholder.getLogicIndexName());
if (!Strings.isNullOrEmpty(actualTableName)) {
result.append("_");
result.append(actualTableName);
}
} else if (each instanceof InsertValuesPlaceholder) {
InsertValuesPlaceholder insertValuesPlaceholder = (InsertValuesPlaceholder) each;
List<String> expressions = new LinkedList<>();
for (ShardingCondition shardingCondition : insertValuesPlaceholder.getShardingConditions().getShardingConditions()) {
processInsertShardingCondition(tableUnit, (InsertShardingCondition) shardingCondition, expressions, insertParameters);
}
int count = 0;
for (String s : expressions) {
if (0 != count) {
result.append(", ");
}
result.append(s);
count++;
}
} else {
result.append(each);
}
}
if (insertParameters.isEmpty()) {
return new SQLUnit(result.toString(), new ArrayList<>(Collections.singleton(parameters)));
} else {
return new SQLUnit(result.toString(), new ArrayList<>(Collections.singleton(insertParameters)));
}
}
11. 繼續回到ShardingPreparedStatement#route,根據資料來源獲取資料庫連線AbstractConnectionAdapter#getConnection,放進連線快取中cachedConnections.put(dataSourceName, result);,執行鉤子方法等WrapperAdapter#replayMethodsInvocation,獲取PreparedStatement並儲存
for (SQLExecutionUnit each : routeResult.getExecutionUnits()) {
PreparedStatement preparedStatement = generatePreparedStatement(each);
routedStatements.add(preparedStatement);
replaySetParameter(preparedStatement, each.getSqlUnit().getParameterSets().get(0));
result.add(new PreparedStatementUnit(each, preparedStatement));
}
private PreparedStatement generatePreparedStatement(final SQLExecutionUnit sqlExecutionUnit) throws SQLException {
Connection connection = getConnection().getConnection(sqlExecutionUnit.getDataSource());
return returnGeneratedKeys ? connection.prepareStatement(sqlExecutionUnit.getSqlUnit().getSql(), Statement.RETURN_GENERATED_KEYS)
: connection.prepareStatement(sqlExecutionUnit.getSqlUnit().getSql(), resultSetType, resultSetConcurrency, resultSetHoldability);
}
使用反射方式構造設定引數的方法SetParameterMethodInvocation,執行對應的設定引數的方法invoke,把具體的引數值設定到preparedStatement中,最後返回PreparedStatementUnit。
private void recordSetParameter(final String methodName, final Class[] argumentTypes, final Object... arguments) {
try {
setParameterMethodInvocations.add(new SetParameterMethodInvocation(PreparedStatement.class.getMethod(methodName, argumentTypes), arguments, arguments[1]));
} catch (final NoSuchMethodException ex) {
throw new ShardingException(ex);
}
}
protected void replaySetParameter(final PreparedStatement preparedStatement, final List<Object> parameters) {
setParameterMethodInvocations.clear();
addParameters(parameters);
for (SetParameterMethodInvocation each : setParameterMethodInvocations) {
each.invoke(preparedStatement);
}
}
private void addParameters(final List<Object> parameters) {
for (int i = 0; i < parameters.size(); i++) {
recordSetParameter("setObject", new Class[]{int.class, Object.class}, i + 1, parameters.get(i));
}
}
12. 把preparedStatement放入執行緒池中開始執行,執行結果後清理一些本次快取的資料,批量資訊,引數資訊等等以及另外一些收尾工作。
public boolean execute() throws SQLException {
try {
Collection<PreparedStatementUnit> preparedStatementUnits = route();
return new PreparedStatementExecutor(
getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits).execute();
} finally {
JDBCShardingRefreshHandler.build(routeResult, connection).execute();
clearBatch();
}
}
public boolean execute() throws SQLException {
List<Boolean> result = executorEngine.execute(sqlType, preparedStatementUnits, new ExecuteCallback<Boolean>() {
@Override
public Boolean execute(final BaseStatementUnit baseStatementUnit) throws Exception {
return ((PreparedStatement) baseStatementUnit.getStatement()).execute();
}
});
if (null == result || result.isEmpty() || null == result.get(0)) {
return false;
}
return result.get(0);
}
ExecutorEngine#execute,取出第一個執行單元同步執行,然後非同步執行其他的執行單元,最後把所有的結果包裝成ListenableFuture返回,至此,insert語句就解析完畢了。
public <T> List<T> execute(
final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws SQLException {
if (baseStatementUnits.isEmpty()) {
return Collections.emptyList();
}
OverallExecutionEvent event = new OverallExecutionEvent(sqlType, baseStatementUnits.size());
EventBusInstance.getInstance().post(event);
Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();
BaseStatementUnit firstInput = iterator.next();
ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), executeCallback);
T firstOutput;
List<T> restOutputs;
try {
firstOutput = syncExecute(sqlType, firstInput, executeCallback);
restOutputs = restFutures.get();
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
event.setException(ex);
event.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
EventBusInstance.getInstance().post(event);
ExecutorExceptionHandler.handleException(ex);
return null;
}
event.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
EventBusInstance.getInstance().post(event);
List<T> result = Lists.newLinkedList(restOutputs);
result.add(0, firstOutput);
return result;
}
private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final ExecuteCallback<T> executeCallback,
final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
synchronized (baseStatementUnit.getStatement().getConnection()) {
T result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
List<AbstractExecutionEvent> events = new LinkedList<>();
for (List<Object> each : baseStatementUnit.getSqlExecutionUnit().getSqlUnit().getParameterSets()) {
events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
}
for (AbstractExecutionEvent event : events) {
EventBusInstance.getInstance().post(event);
}
try {
result = executeCallback.execute(baseStatementUnit);
} catch (final SQLException ex) {
for (AbstractExecutionEvent each : events) {
each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
each.setException(ex);
EventBusInstance.getInstance().post(each);
ExecutorExceptionHandler.handleException(ex);
}
return null;
}
for (AbstractExecutionEvent each : events) {
each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
EventBusInstance.getInstance().post(each);
}
return result;
}
}
private AbstractExecutionEvent getExecutionEvent(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<Object> parameters) {
AbstractExecutionEvent result;
if (SQLType.DQL == sqlType) {
result = new DQLExecutionEvent(baseStatementUnit.getSqlExecutionUnit().getDataSource(), baseStatementUnit.getSqlExecutionUnit().getSqlUnit(), parameters);
} else {
result = new DMLExecutionEvent(baseStatementUnit.getSqlExecutionUnit().getDataSource(), baseStatementUnit.getSqlExecutionUnit().getSqlUnit(), parameters);
}
return result;
}