sharding-jdbc之執行
【引用官網】為每個分片查詢維持一個獨立的資料庫連線,可以更加有效的利用多執行緒來提升執行效率。為每個資料庫連線開啟獨立的執行緒,可以將I/O所產生的消耗並行處理,連線模式(Connection Mode)的概念,將其劃分為記憶體限制模式(MEMORY_STRICTLY)和連線限制模式(CONNECTION_STRICTLY)這兩種型別
- 記憶體限制模式:使用此模式的前提是,ShardingSphere對一次操作所耗費的資料庫連線數量不做限制。如果實際執行的SQL需要對某資料庫例項中的200張表做操作,則對每張表建立一個新的資料庫連線,並通過多執行緒的方式併發處理,以達成執行效率最大化。並且在SQL滿足條件情況下,優先選擇流式歸併,以防止出現記憶體溢位或避免頻繁垃圾回收情況
- 連線限制模式:使用此模式的前提是,ShardingSphere嚴格控制對一次操作所耗費的資料庫連線數量。如果實際執行的SQL需要對某資料庫例項中的200張表做操作,那麼只會建立唯一的資料庫連線,並對其200張表序列處理。如果一次操作中的分片散落在不同的資料庫,仍然採用多執行緒處理對不同庫的操作,但每個庫的每次操作仍然只建立一個唯一的資料庫連線。這樣即可以防止對一次請求對資料庫連線佔用過多所帶來的問題。該模式始終選擇記憶體歸併
case: 本文主要以SELECT i.* FROM t_order o, t_order_item i WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2一個簡單查詢語句,來分析ss大致如何來執行sql,根據分片改寫後的sql,應該是demo_ds_slave_0:SELECT * FROM t_order_0 i, t_order_item_0 o WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2 來執行
準備階段
1.初始化PreparedStatementExecutor#init,封裝Statement執行單元
public final class PreparedStatementExecutor extends AbstractStatementExecutor { @Getter private final boolean returnGeneratedKeys; public PreparedStatementExecutor( final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys, final ShardingConnection shardingConnection) { super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection); this.returnGeneratedKeys = returnGeneratedKeys; } /** * Initialize executor. * * @param routeResult route result * @throws SQLException SQL exception */ public void init(final SQLRouteResult routeResult) throws SQLException { setSqlStatement(routeResult.getOptimizedStatement().getSQLStatement()); //新增路由單元,即資料來源對應的sql單元 getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits())); //快取statement、引數 cacheStatements(); } private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<RouteUnit> routeUnits) throws SQLException { //執行封裝Statement執行單元 return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() { @Override public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException { return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize); } @Override public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException { return new StatementExecuteUnit(routeUnit, createPreparedStatement(connection, routeUnit.getSqlUnit().getSql()), connectionMode); } }); } @SuppressWarnings("MagicConstant") private PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException { return returnGeneratedKeys ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS) : connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability()); } ... ... }
2.執行封裝Statement執行單元getSqlExecutePrepareTemplate().getExecuteUnitGroups
@RequiredArgsConstructor
public final class SQLExecutePrepareTemplate {
private final int maxConnectionsSizePerQuery;
/**
* Get execute unit groups.
*
* @param routeUnits route units
* @param callback SQL execute prepare callback
* @return statement execute unit groups
* @throws SQLException SQL exception
*/
public Collection<ShardingExecuteGroup<StatementExecuteUnit>> getExecuteUnitGroups(final Collection<RouteUnit> routeUnits, final SQLExecutePrepareCallback callback) throws SQLException {
return getSynchronizedExecuteUnitGroups(routeUnits, callback);
}
private Collection<ShardingExecuteGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups(
final Collection<RouteUnit> routeUnits, final SQLExecutePrepareCallback callback) throws SQLException {
//資料來源對應sql單元集合,即demo_ds_0:[select i.* from t_order_0 i, t_order_item_0 o where i.order_id = o.order_id and i.order_id = ?]
Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(routeUnits);
Collection<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>();
for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
//新增分片執行組
result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback));
}
return result;
}
private Map<String, List<SQLUnit>> getSQLUnitGroups(final Collection<RouteUnit> routeUnits) {
Map<String, List<SQLUnit>> result = new LinkedHashMap<>(routeUnits.size(), 1);
for (RouteUnit each : routeUnits) {
if (!result.containsKey(each.getDataSourceName())) {
result.put(each.getDataSourceName(), new LinkedList<SQLUnit>());
}
result.get(each.getDataSourceName()).add(each.getSqlUnit());
}
return result;
}
private List<ShardingExecuteGroup<StatementExecuteUnit>> getSQLExecuteGroups(
final String dataSourceName, final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
List<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>();
//在maxConnectionSizePerQuery允許的範圍內,當一個連線需要執行的請求數量大於1時,意味著當前的資料庫連線無法持有相應的資料結果集,則必須採用記憶體歸併;
//反之,當一個連線需要執行的請求數量等於1時,意味著當前的資料庫連線可以持有相應的資料結果集,則可以採用流式歸併
//計算所需要的分割槽大小
int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);
//按照分割槽大小進行分割槽
//事例:
//sqlUnits = [1, 2, 3, 4, 5]
//desiredPartitionSize = 2
//則結果為:[[1, 2], [3,4], [5]]
List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);
//maxConnectionsSizePerQuery該引數表示一次查詢時每個資料庫所允許使用的最大連線數
//根據maxConnectionsSizePerQuery來判斷使用連線模式
//CONNECTION_STRICTLY 連線限制模式
//MEMORY_STRICTLY 記憶體限制模式
ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
//獲取分割槽大小的連線
List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size());
int count = 0;
//遍歷分割槽,將分割槽好的sql單元放到指定連線執行
for (List<SQLUnit> each : sqlUnitPartitions) {
result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));
}
return result;
}
private ShardingExecuteGroup<StatementExecuteUnit> getSQLExecuteGroup(final ConnectionMode connectionMode, final Connection connection,
final String dataSourceName, final List<SQLUnit> sqlUnitGroup, final SQLExecutePrepareCallback callback) throws SQLException {
List<StatementExecuteUnit> result = new LinkedList<>();
//遍歷sql單元
for (SQLUnit each : sqlUnitGroup) {
//回撥,建立statement執行單元
result.add(callback.createStatementExecuteUnit(connection, new RouteUnit(dataSourceName, each), connectionMode));
}
//封裝成分片執行組
return new ShardingExecuteGroup<>(result);
}
}
執行階段
1.執行查詢sql
public final class PreparedStatementExecutor extends AbstractStatementExecutor {
... ...
/**
* Execute query.
*
* @return result set list
* @throws SQLException SQL exception
*/
public List<QueryResult> executeQuery() throws SQLException {
//獲取當前是否異常值
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
//建立回撥例項
//執行SQLExecuteCallback的execute方法
SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
@Override
protected QueryResult executeSQL(final RouteUnit routeUnit, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
return getQueryResult(statement, connectionMode);
}
};
return executeCallback(executeCallback);
}
... ...
protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
List<T> result = sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback);
//執行完後重新整理分片元資料,比如建立表、修改表etc.
refreshShardingMetaDataIfNeeded(connection.getShardingContext(), sqlStatement);
return result;
}
... ...
}
2.通過執行緒池分組執行,並回調callback
@RequiredArgsConstructor
public abstract class SQLExecuteCallback<T> implements ShardingGroupExecuteCallback<StatementExecuteUnit, T> {
//資料庫型別
private final DatabaseType databaseType;
//是否異常
private final boolean isExceptionThrown;
@Override
public final Collection<T> execute(final Collection<StatementExecuteUnit> statementExecuteUnits, final boolean isTrunkThread,
final Map<String, Object> shardingExecuteDataMap) throws SQLException {
Collection<T> result = new LinkedList<>();
//遍歷statement執行單元
for (StatementExecuteUnit each : statementExecuteUnits) {
//執行新增返回結果T
result.add(execute0(each, isTrunkThread, shardingExecuteDataMap));
}
return result;
}
private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread, final Map<String, Object> shardingExecuteDataMap) throws SQLException {
//設定當前執行緒是否異常
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
//根據url獲取資料來源元資料
DataSourceMetaData dataSourceMetaData = databaseType.getDataSourceMetaData(statementExecuteUnit.getStatement().getConnection().getMetaData().getURL());
//sql執行鉤子
SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
try {
sqlExecutionHook.start(statementExecuteUnit.getRouteUnit(), dataSourceMetaData, isTrunkThread, shardingExecuteDataMap);
//執行sql
T result = executeSQL(statementExecuteUnit.getRouteUnit(), statementExecuteUnit.getStatement(), statementExecuteUnit.getConnectionMode());
sqlExecutionHook.finishSuccess();
return result;
} catch (final SQLException ex) {
sqlExecutionHook.finishFailure(ex);
ExecutorExceptionHandler.handleException(ex);
return null;
}
}
protected abstract T executeSQL(RouteUnit routeUnit, Statement statement, ConnectionMode connectionMode) throws SQLException;
}
3.執行executeSQL,呼叫第三步的callback中的executeSQL,封裝ResultSet
public final class PreparedStatementExecutor extends AbstractStatementExecutor {
... ...
private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException {
PreparedStatement preparedStatement = (PreparedStatement) statement;
ResultSet resultSet = preparedStatement.executeQuery();
ShardingRule shardingRule = getConnection().getShardingContext().getShardingRule();
//快取resultSet
getResultSets().add(resultSet);
//判斷ConnectionMode
//如果是MEMORY_STRICTLY,使用流式StreamQueryResult;否則使用記憶體MemoryQueryResult
return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet, shardingRule)
: new MemoryQueryResult(resultSet, shardingRule);
}
... ...
}
本文主要有以下幾個重點:
1.封裝資料來源對應的sql單元,按資料來源遍歷執行
2.根據maxConnectionsSizePerQuery引數來計算sql單元分割槽
3.連線模式,根據maxConnectionsSizePerQuery引數、sql單元來計算具體使用記憶體限制模式還是連線限制模式
3.根據sql單元、連線模式來獲取對應資料來源連線
4.執行後根據連線模式封裝QueryResult,流式Result還