sharding-jdbc系列之SQL執行(六)
前言
在前面我們介紹,通過SQL路由找到具體的執行表,通過SQL改寫生成具體的執行SQL, 拿到具體的結果之後,sharding-jdbc下一步是幹嘛呢,
下一步當然是SQL執行了。
route
程式碼入口: com.dangdang.ddframe.rdb.sharding.jdbc.core.statement.ShardingPreparedStatement
private Collection<PreparedStatementUnit> route() throws SQLException { Collection<PreparedStatementUnit> result = new LinkedList<>(); // SQL路由,SQL改寫都在這個route方法裡面 routeResult = routingEngine.route(getParameters()); // 迴圈執行單位(路由結果中,每個表的執行單位) for (SQLExecutionUnit each : routeResult.getExecutionUnits()) { // 獲取SQL型別 SQLType sqlType = routeResult.getSqlStatement().getType(); Collection<PreparedStatement> preparedStatements; if (SQLType.DDL == sqlType) { //CREATE , ALTER ,DROP ,TRUNCATE操作 都走這裡 preparedStatements = generatePreparedStatementForDDL(each); } else { // 增刪查改的SQL都走這裡 , 主要看這個方法 generatePreparedStatement preparedStatements = Collections.singletonList(generatePreparedStatement(each)); } // 將preparedStatements 放入集合,後續preparedStatement執行完成之後,會從routedStatements獲取結果 routedStatements.addAll(preparedStatements); for (PreparedStatement preparedStatement : preparedStatements) { // 更新PreparedStatement中的引數值 replaySetParameter(preparedStatement); // 構建一個PreparedStatementUnit執行單位,放入返回結果 result.add(new PreparedStatementUnit(each, preparedStatement)); } } return result; }
routedStatements.addAll(preparedStatements) :
將preparedStatements 放入集合,後續preparedStatement執行完成之後,會從routedStatements獲取結果 , 這行程式碼比較重要
private PreparedStatement generatePreparedStatement(final SQLExecutionUnit sqlExecutionUnit) throws SQLException { // 獲取資料庫連線 Connection connection = getConnection().getConnection(sqlExecutionUnit.getDataSource(), routeResult.getSqlStatement().getType()); // 構建 PreparedStatement return returnGeneratedKeys ? connection.prepareStatement(sqlExecutionUnit.getSql(), RETURN_GENERATED_KEYS) : connection.prepareStatement(sqlExecutionUnit.getSql(), resultSetType, resultSetConcurrency, resultSetHoldability); }
步驟說明:
1.拿到SQL路由,SQL改寫的結果,迴圈 結果集
2.判斷SQL型別
3.獲取資料庫連線,構建PreparedStatement
4.將PreparedStatement集合放入routedStatements , 方便後續獲取執行結果。
4.返回PreparedStatement結果
SQL執行
回到原始碼最開始的路口: com.dangdang.ddframe.rdb.sharding.jdbc.core.statement.ShardingPreparedStatement
@Override public boolean execute() throws SQLException { try { // 這個地方的route方法,就是我們上面講的,獲取到了PreparedStatementUnit集合 Collection<PreparedStatementUnit> preparedStatementUnits = route(); // 呼叫PreparedStatementExecutor的exceute方法進行SQL執行 return new PreparedStatementExecutor( getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits, getParameters()).execute(); } finally { clearBatch(); } }
public boolean execute() {
// 構建ExecuteCallback物件,然後通過executePreparedStatement方法進行執行
List<Boolean> result = executorEngine.executePreparedStatement(sqlType, preparedStatementUnits, parameters, new ExecuteCallback<Boolean>() {
@Override
public Boolean execute(final BaseStatementUnit baseStatementUnit) throws Exception {
return ((PreparedStatement) baseStatementUnit.getStatement()).execute();
}
});
// 執行結果為空,說明執行失敗,則返回false
if (null == result || result.isEmpty() || null == result.get(0)) {
return false;
}
// 執行成功,預設返回第一個PreparedStatement執行的結果。
return result.get(0);
}
至於為什麼取第一個的結果,下面會詳細講的,這個涉及到同步執行和非同步執行的問題
/**
* 執行PreparedStatement.
* @param sqlType SQL型別
* @param preparedStatementUnits 語句物件執行單元集合
* @param parameters 引數列表
* @param executeCallback 執行回撥函式
* @param <T> 返回值型別
* @return 執行結果
*/
public <T> List<T> executePreparedStatement(
final SQLType sqlType, final Collection<PreparedStatementUnit> preparedStatementUnits, final List<Object> parameters, final ExecuteCallback<T> executeCallback) {
return execute(sqlType, preparedStatementUnits, Collections.singletonList(parameters), executeCallback);
}
execute
private <T> List<T> execute(
final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
if (baseStatementUnits.isEmpty()) {
return Collections.emptyList();
}
Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();
BaseStatementUnit firstInput = iterator.next();
// 已經把第一個取出來了firstInput ,第二個任務開始所有 SQL任務 提交執行緒池【非同步】執行任務
ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);
T firstOutput;
List<T> restOutputs;
try {
// 第一個任務【同步】執行任務
firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
// 等待第二個任務開始所有 SQL任務完成 , ListenableFuture是繼承了Future的,他的get方法,只有在執行結果返回之後,才能get到值,否則一直阻塞
restOutputs = restFutures.get();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
ExecutorExceptionHandler.handleException(ex);
return null;
}
// 返回結果
List<T> result = Lists.newLinkedList(restOutputs);
result.add(0, firstOutput);
return result;
}
非同步執行
private <T> ListenableFuture<List<T>> asyncExecute(
final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
// 構建一個ListenableFuture集合
List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size());
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
//將實現了callable的任務放入到執行緒池中,得到一個帶有回撥機制的ListenableFuture例項,
//通過Futures.addCallback方法對得到的ListenableFuture例項進行監聽,一旦得到結果就進入到onSuccess方法中,
//在onSuccess方法中將查詢的結果存入到集合中
for (final BaseStatementUnit each : baseStatementUnits) {
// 呼叫executorService執行緒池,提交任務至執行緒池非同步化處理,submit返回一個ListenableFuture物件
result.add(executorService.submit(new Callable<T>() {
@Override
public T call() throws Exception {
// 執行SQL
return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap);
}
}));
}
//這裡將集合中的若干ListenableFuture形成一個新的ListenableFuture
//目的是為了當呼叫新的ListenableFuture.get()方法是非同步阻塞,
//直到所有的ListenableFuture都得到結果才繼續當前執行緒
//阻塞的時間取的是所有任務中用時最長的一個
return Futures.allAsList(result);
}
總結:
1.第一個任務是當前執行緒直接同步執行, 從第二個開始,其他任務都交給非同步執行緒來執行
2.restFutures.get()通過Future機制阻塞執行緒,等待所有執行緒執行完畢之後,才會獲取到結果,也就是說只有所有的任務執行成功之後,才會返回結果
ListenableFuture是guava的一個內部實現,後面會找相關的文章給大家分享一下。
SQL執行完成之後,結果會封裝在PreparedStatement物件裡面,通過getResultSet()方法,可以獲取到執行結果。
最終執行
private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets,
private AbstractExecutionEvent getExecutionEvent(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<Object> parameters) {
AbstractExecutionEvent result;
if (SQLType.DQL == sqlType) {
result = new DQLExecutionEvent(baseStatementUnit.getSqlExecutionUnit().getDataSource(), baseStatementUnit.getSqlExecutionUnit().getSql(), parameters);
} else {
result = new DMLExecutionEvent(baseStatementUnit.getSqlExecutionUnit().getDataSource(), baseStatementUnit.getSqlExecutionUnit().getSql(), parameters);
}
return result;
}final ExecuteCallback<T> executeCallback,
final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
synchronized (baseStatementUnit.getStatement().getConnection()) {
T result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
// 設定事件集合
List<AbstractExecutionEvent> events = new LinkedList<>();
if (parameterSets.isEmpty()) {
// 如果引數為空則放入一個空的引數集合,並且獲取執行事件
events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList()));
}
for (List<Object> each : parameterSets) {
// 將有引數的構建一個執行事件,放入集合
events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
}
for (AbstractExecutionEvent event : events) {
// 釋出執行事件,用於監聽事件的執行結果
EventBusInstance.getInstance().post(event);
}
try {
// SQL 執行
result = executeCallback.execute(baseStatementUnit);
} catch (final SQLException ex) {
// SQL執行 出現異常
for (AbstractExecutionEvent each : events) {
// 迴圈事件,將事件的執行結果設定為失敗
each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
each.setException(Optional.of(ex));
// 釋出事件
EventBusInstance.getInstance().post(each);
// 收集異常
ExecutorExceptionHandler.handleException(ex);
}
return null;
}
for (AbstractExecutionEvent each : events) {
// 執行的很成功,修改事件的執行狀態為SUCCESS
each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
// 釋出
EventBusInstance.getInstance().post(each);
}
return result;
}
}
private AbstractExecutionEvent getExecutionEvent(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<Object> parameters) {
AbstractExecutionEvent result;
if (SQLType.DQL == sqlType) { //查詢語句
result = new DQLExecutionEvent(baseStatementUnit.getSqlExecutionUnit().getDataSource(), baseStatementUnit.
getSqlExecutionUnit().getSql(), parameters);
} else { //其他操作
result = new DMLExecutionEvent(baseStatementUnit.getSqlExecutionUnit().getDataSource(), baseStatementUnit.
getSqlExecutionUnit().getSql(), parameters);
}
return result;
}
SQLType
public enum SQLType {
/**
* 查詢語句
*
* <p>Such as {@code SELECT}.</p>
*/
DQL,
/**
* 增加,修改,刪除語句
*
* <p>Such as {@code INSERT}, {@code UPDATE}, {@code DELETE}.</p>
*/
DML,
/**
* CREATE , ALTER ,DROP ,TRUNCATE操作
*
* <p>Such as {@code CREATE}, {@code ALTER}, {@code DROP}, {@code TRUNCATE}.</p>
*/
DDL
}