Shading - jdbc 原始碼分析(七) - sql 歸併
主要類:
- ResultSetFactory:分片結果集歸併工廠類,獲取組裝後的結果集(可以理解為原始的resultSet經過處理,生成的新的resultSet)
- AbstractDelegateResultSet :代理結果集抽象類
- IteratorReducerResultSet :迭代歸併的聚集結果集,對於多個resultset的結果進行迭代, 繼承AbstractDelegateResultSet
- WrapperResultSet:ShardingResultSets 的內部類,對原生resultSet包了下,重寫了了firstNext()、afterFirstNext()方法
- LimitCouplingResultSet: 分頁限制條件的連線結果集,用於需要對結果集做分頁處理的情況,繼承AbstractDelegateResultSet
- StreamingOrderByReducerResultSet:流式排序的聚集結果集,用於對結果集排序的處理,繼承AbstractDelegateResultSet
執行過程:
sql:
SELECT o.order_id FROM t_order o WHERE o.order_id in (1000,1200) order by user_id desc limit 10
- executeQuery:
呼叫ResultSetFactory,獲取組裝後的ResultSet,generateExecutor(sql).executeQuery() 屬於SQL執行部分,之前分析過,這裡就不再說了
public ResultSet executeQuery(final String sql) throws SQLException {
ResultSet result;
try {
result = ResultSetFactory.getResultSet(generateExecutor(sql).executeQuery(), routeResult.getSqlStatement());
} finally {
set CurrentResultSet(null);
}
setCurrentResultSet(result);
return result;
}
複製程式碼
- getResultSet():
/**
* 獲取結果集.
*
* @param resultSets 結果集列表
* @param sqlStatement SQL語句物件
* @return 結果集包裝
* @throws SQLException SQL異常
*/
public static ResultSet getResultSet(final List<ResultSet> resultSets, final SQLStatement sqlStatement) throws SQLException {
//例項化ShardingResultSets
ShardingResultSets shardingResultSets = new ShardingResultSets(resultSets);
log.debug("Sharding-JDBC: Sharding result sets type is '{}'", shardingResultSets.getType().toString());
//組裝結果集
switch (shardingResultSets.getType()) {
case EMPTY:
return buildEmpty(resultSets);
case SINGLE:
return buildSingle(shardingResultSets);
case MULTIPLE:
return buildMultiple(shardingResultSets, sqlStatement);
default:
throw new UnsupportedOperationException(shardingResultSets.getType().toString());
}
}
複製程式碼
2.1:例項化ShardingResultSets
public ShardingResultSets(final List<ResultSet> resultSets) throws SQLException {
this.resultSets = filterResultSets(resultSets);
type = generateType();
}
複製程式碼
對於分片執行後得到的ResultSet集合,過濾掉空的結果,對於非空,使用 WrapperResultSet 包裝起來
問題:WrapperResultSet是個內部類,為什麼還要專門新建一個內部類來處理下,直接用原生的不就行了麼?
答:WrapperResultSet 繼承了AbstractDelegateResultSet,這個類是被裝飾類(在呼叫ResultSet的next()方法獲取資料的時候,使用到了裝飾模式),同時這個類還重寫了firstNext() 和afterFirstNext()方法,獲取資料的時候會用到
private List<ResultSet> filterResultSets(final List<ResultSet> resultSets) throws SQLException {
List<ResultSet> result = new ArrayList<>(resultSets.size());
for (ResultSet each : resultSets) {
if (each.next()) {
result.add(new WrapperResultSet(each));
}
}
return result;
}
複製程式碼
根據resultSets 集合的大小來判斷是單結果集還是多結果集,多結果集的處理比較複雜(用到了裝飾模式),這裡指對於排序、分頁的處理
private Type generateType() {
if (resultSets.isEmpty()) {
return Type.EMPTY;
} else if (1 == resultSets.size()) {
return Type.SINGLE;
} else {
return Type.MULTIPLE;
}
}
複製程式碼
2.2:根據ShardingResultSets的type屬性構建ResultSet的子類
既然多結果集的情況比較複雜,我們就以複雜的例子來分析,上面的SQL也是分頁,排序都用上了。
private static ResultSet buildMultiple(final ShardingResultSets shardingResultSets, final SQLStatement sqlStatement) throws SQLException {
ResultSetMergeContext resultSetMergeContext = new ResultSetMergeContext(shardingResultSets, sqlStatement);
return buildCoupling(buildReducer(resultSetMergeContext), resultSetMergeContext);
}
複製程式碼
在分析多結果集之前,我們先來了解下裝飾模式,多結果集就是使用這個模式來對結果集進行排序、分頁的。(關於裝飾物件,我覺得這篇文章寫得不錯)
裝飾模式的應用
上面這幅圖是結果集類間的依賴關係。- ResultSet:抽象的構建角色,也可以理解為被裝飾的原始物件
- AbstractDelegateResultSet:Decorator,裝飾角色,內部維護一個抽象構建的引用,接受所有裝飾物件的請求,並轉發給真實的物件處理,這樣就可以在呼叫真實物件的方法前,增加一些新的功能
- IteratorReducerResultSet:具體的裝飾者,對於多個結果集,負責當一個結果集的資料處理完成後,切換到另外一個結果集上面(多個結果集遍歷)
- StreamingOrderByReducerResultSet:具體的裝飾者,內部維護一個PriorityQueue,負責對排序好的結果集消費
- LimitCouplingResultSet: 具體的裝飾者,看名字就知道了吧,處理多結果集的分頁
- WrapperResultSet:具體的裝飾者,主要負責移動到下一個資料
下面接著分析程式碼: 我們的SQL中帶有order by,所以返回StreamingOrderByReducerResultSet
- buildReducer:
private static ResultSet buildReducer(final ResultSetMergeContext resultSetMergeContext) throws SQLException {
//判斷分組歸併是否需要記憶體排序.
if (resultSetMergeContext.isNeedMemorySortForGroupBy()) {
resultSetMergeContext.setGroupByKeysToCurrentOrderByKeys();
return new MemoryOrderByReducerResultSet(resultSetMergeContext);
}
//判斷分組是否需要排序(帶有order by)
if (!resultSetMergeContext.getSqlStatement().getGroupByList().isEmpty() || !resultSetMergeContext.getSqlStatement().getOrderByList().isEmpty()) {
return new StreamingOrderByReducerResultSet(resultSetMergeContext);
}
return new IteratorReducerResultSet(resultSetMergeContext);
}
複製程式碼
StreamingOrderByReducerResultSet的建構函式:
public StreamingOrderByReducerResultSet(final ResultSetMergeContext resultSetMergeContext) throws SQLException {
//把resultSet傳遞到父類
super(resultSetMergeContext.getShardingResultSets().getResultSets());
//例項化PriorityQueue處理排序
delegateResultSetQueue = new PriorityQueue<>(getResultSets().size());
orderByKeys = resultSetMergeContext.getCurrentOrderByKeys();
}
複製程式碼
問題:為什麼要用PriorityQueue 優先順序佇列處理排序,而不用普通的list sort一下
回答:我認為主要有2個方面:1、佇列內部用連結串列維護的,在做排序的時候直接更改節點指標就可以,時間複雜度為O(1),陣列的話要做移位操作,時間複雜度O(n),所以連結串列看起來更合適。2、假設執行後有2個結果集A、B;現在對A、B結果集的資料進行排序(每個結果集本身已經是排序好的),用佇列的話,每次分別取2個結果集中的第一個資料放入佇列,每次只對其中2個數據排序,用完後便從佇列中移除(poll),這樣比較方便,並且每次排序也只是2個值比較,對於單個next取值的情況 節省記憶體(資料量大的話,排序很佔用記憶體的把)
- buildCoupling: SQL中帶有limit,並且只有一個order by 欄位,所以返回LimitCouplingResultSet
private static ResultSet buildCoupling(final ResultSet resultSet, final ResultSetMergeContext resultSetMergeContext) throws SQLException {
ResultSet result = resultSet;
//group by處理
if (!resultSetMergeContext.getSqlStatement().getGroupByList().isEmpty() || !resultSetMergeContext.getSqlStatement().getAggregationSelectItems().isEmpty()) {
result = new GroupByCouplingResultSet(result, resultSetMergeContext);
}
//判斷是否需要記憶體排序:什麼情況下需要?在多個order by 欄位的時候
if (resultSetMergeContext.isNeedMemorySortForOrderBy()) {
resultSetMergeContext.setOrderByKeysToCurrentOrderByKeys();
result = new MemoryOrderByCouplingResultSet(result, resultSetMergeContext);
}
//分頁處理
if (null != resultSetMergeContext.getSqlStatement().getLimit()) {
result = new LimitCouplingResultSet(result, resultSetMergeContext.getSqlStatement());
}
return result;
}
複製程式碼
至此,裝飾模式需要的類已經構建好了,分別是:LimitCouplingResultSet處理分頁、StreamingOrderByReducerResultSet處理排序、WrapperResultSet
resultSet.next():
AbstractDelegateResultSet 重寫了resultSet.next()方法,下面是重寫的邏輯:
@Override
public final boolean next() throws SQLException {
//beforeFirst 預設true,走firstNext
boolean result = beforeFirst ? firstNext() : afterFirstNext();
beforeFirst = false;
if (result) {
LoggerFactory.getLogger(this.getClass().getName()).debug(
"Access result set, total size is: {}, result set hashcode is: {}, offset is: {}", getResultSets().size(), delegate.hashCode(), ++offset);
}
return result;
}
複製程式碼
LimitCouplingResultSet#firstNext():
對於A、B 2個結果集,比如要查 10,15索引位的資料,那麼我們會把0,15索引位的結果查詢出來,然後再過濾掉結果集A 10索引位前的資料,剩下5個數據再從A、B結果集取
@Override
protected boolean firstNext() throws SQLException {
return skipOffset() && doNext();
}
//過濾offset索引位前的資料
private boolean skipOffset() throws SQLException {
for (int i = 0; i < limit.getOffset(); i++) {
// 如果沒有資料了,就返回false,說明A結果集沒有資料了,交給下一個裝飾類,切換到B結果集
if (!getDelegate().next()) {
return false;
}
}
return true;
}
//當rowNumber>rowCOunt,說明已經取夠了5條資料,此時可以返回了
private boolean doNext() throws SQLException {
return ++rowNumber <= limit.getRowCount() && getDelegate().next();
}
複製程式碼
分頁處理完,getDelegate().next() 呼叫StreamingOrderByReducerResultSet#next,StreamingOrderByReducerResultSet繼承了AbstractDelegateResultSet,所以也是走的上面重寫的next()邏輯。
- StreamingOrderByReducerResultSet#firstNext()
遍歷A、B 2個結果集,分別取出結果集中的第一個元素,放入佇列中,peek出第一個元素(此時的元素已經按照排序規則排好),setDelegate()切換包裝(排序後)的結果集,這樣下一個裝飾類獲取到的就是排序後的結果集
protected boolean firstNext() throws SQLException {
for (ResultSet each : getResultSets()) {
ResultSetOrderByWrapper wrapper = new ResultSetOrderByWrapper(each);
//wrapper#next()取出第一個元素
if (wrapper.next()) {
delegateResultSetQueue.offer(wrapper);
}
}
return doNext();
}
private boolean doNext() {
if (delegateResultSetQueue.isEmpty()) {
return false;
}
setDelegate(delegateResultSetQueue.peek().delegate);
log.trace("Chosen order by value: {}, current result set hashcode: {}", delegateResultSetQueue.peek().row, getDelegate().hashCode());
return true;
}
@RequiredArgsConstructor
private class ResultSetOrderByWrapper implements Comparable<ResultSetOrderByWrapper> {
private final ResultSet delegate;
//具有排序功能的資料行物件
private OrderByResultSetRow row;
boolean next() throws SQLException {
// 呼叫next()
boolean result = delegate.next();
//有值
if (result) {
//例項化 帶有排序值的行物件
row = new OrderByResultSetRow(delegate, orderByKeys);
}
return result;
}
//比較
@Override
public int compareTo(final ResultSetOrderByWrapper o) {
return row.compareTo(o.row);
}
}
複製程式碼
問:怎麼排序的?
答:ResultSetOrderByWrapper 實現了Comparable介面,我們呼叫next方法,例項化了OrderByResultSetRow 這一行物件,行物件把排序的欄位值取到,也重寫了Comparable介面,當我們把ResultSetOrderByWrapper物件塞到佇列裡,佇列會呼叫物件的compareTo方法,對佇列的資料進行重新排序,這樣取出來的第一個元素就是排好序後的元素。
排序相關程式碼:
public final class OrderByResultSetRow extends AbstractResultSetRow implements Comparable<OrderByResultSetRow> {
private final List<OrderBy> orderBies;
private final List<Comparable<?>> orderByValues;
public OrderByResultSetRow(final ResultSet resultSet, final List<OrderBy> orderBies) throws SQLException {
super(resultSet);
this.orderBies = orderBies;
orderByValues = loadOrderByValues();
}
//載入排序欄位的值
private List<Comparable<?>> loadOrderByValues() {
List<Comparable<?>> result = new ArrayList<>(orderBies.size());
for (OrderBy each : orderBies) {
Object value = getCell(each.getColumnIndex());
Preconditions.checkState(value instanceof Comparable, "Sharding-JDBC: order by value must extends Comparable");
result.add((Comparable<?>) value);
}
return result;
}
//重新排序規則
@Override
public int compareTo(final OrderByResultSetRow otherOrderByValue) {
for (int i = 0; i < orderBies.size(); i++) {
OrderBy thisOrderBy = orderBies.get(i);
int result = ResultSetUtil.compareTo(orderByValues.get(i), otherOrderByValue.orderByValues.get(i), thisOrderBy.getOrderByType());
if (0 != result) {
return result;
}
}
return 0;
}
}
複製程式碼
排好序後,AbstractDelegateResultSet 的ResultSet delegate屬性就是正確的結果集,呼叫getString()之類的方法獲取SQL結果。
@Override
public final String getString(final String columnLabel) throws SQLException {
return delegate.getString(columnLabel);
}
複製程式碼
最後:
小尾巴走一波,歡迎關注我的公眾號,不定期分享程式設計、投資、生活方面的感悟:)