shardingjdbc (七)-結果合併
一 序
單分片的SQL查詢不需要合併,多分片的情況在各分片排序完後,Sharding-JDBC 獲取到結果後,仍然需要再進一步排序。目前有 分頁、分組、排序、聚合列、迭代 五種場景需要做進一步處理。
public ResultSet executeQuery() throws SQLException { ResultSet result; try { Collection<PreparedStatementUnit> preparedStatementUnits = route(); List<ResultSet> resultSets = new PreparedStatementExecutor( getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery(); result = new ShardingResultSet(resultSets, new MergeEngine(resultSets, (SelectStatement) routeResult.getSqlStatement()).merge(), this); } finally { clearBatch(); } currentResultSet = result; return result; }
merger部分原始碼結構如上圖。核心是mergeEngine.
二 MergeEngine
MergeEngine,分片結果集歸併引擎。
private final DatabaseType databaseType; private final List<ResultSet> resultSets; private final SelectStatement selectStatement; private final Map<String, Integer> columnLabelIndexMap; public MergeEngine(final DatabaseType databaseType, final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException { this.databaseType = databaseType; this.resultSets = resultSets; this.selectStatement = selectStatement; // 獲得 查詢列名與位置對映 columnLabelIndexMap = getColumnLabelIndexMap(resultSets.get(0)); } private Map<String, Integer> getColumnLabelIndexMap(final ResultSet resultSet) throws SQLException { ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); // 元資料(包含查詢列資訊) Map<String, Integer> result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { result.put(SQLUtil.getExactlyValue(resultSetMetaData.getColumnLabel(i)), i); } return result; }
當 MergeEngine 被建立時,會傳入 resultSets 結果集集合,並根據其獲得 columnLabelIndexMap 查詢列名與位置對映。
MergeEngine.merge()方法作為入口提供查詢結果歸併功能,原始碼如下:
/** * Merge result sets. * * @return merged result set. * @throws SQLException SQL exception */ public ResultSetMerger merge() throws SQLException { selectStatement.setIndexForItems(columnLabelIndexMap); return decorate(build()); }
2.1 SelectStatement#setIndexForItems()
/** * Set index for select items. * * @param columnLabelIndexMap map for column label and index */ public void setIndexForItems(final Map<String, Integer> columnLabelIndexMap) { setIndexForAggregationItem(columnLabelIndexMap); setIndexForOrderItem(columnLabelIndexMap, orderByItems); setIndexForOrderItem(columnLabelIndexMap, groupByItems); }
部分查詢列是經過推到出來,在 SQL解析 過程中,未獲得到查詢列位置,需要通過該方法進行初始化。這裡可以結合SQL改寫部分進行理解。
#setIndexForAggregationItem() 處理 AVG聚合計算列 推匯出其對應的 SUM/COUNT 聚合計算列的位置:
private void setIndexForAggregationItem(final Map<String, Integer> columnLabelIndexMap) {
for (AggregationSelectItem each : getAggregationSelectItems()) {
Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel()), String.format("Can't find index: %s, please add alias for aggregate selections", each));
each.setIndex(columnLabelIndexMap.get(each.getColumnLabel()));
for (AggregationSelectItem derived : each.getDerivedAggregationSelectItems()) {
Preconditions.checkState(columnLabelIndexMap.containsKey(derived.getColumnLabel()), String.format("Can't find index: %s", derived));
derived.setIndex(columnLabelIndexMap.get(derived.getColumnLabel()));
}
}
}
#setIndexForOrderItem() 處理 ORDER BY / GROUP BY 列不在查詢列 推匯出的查詢列的位置:
private void setIndexForOrderItem(final Map<String, Integer> columnLabelIndexMap, final List<OrderItem> orderItems) {
for (OrderItem each : orderItems) {
if (-1 != each.getIndex()) {
continue;
}
Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel()), String.format("Can't find index: %s", each));
if (columnLabelIndexMap.containsKey(each.getColumnLabel())) {
each.setIndex(columnLabelIndexMap.get(each.getColumnLabel()));
}
}
}
2.2 ResultSetMerger
ResultSetMerger,歸併結果集介面。
這個圖主要是整理了關係。細節沒有加上。
因為umlplant外掛在eclipse上面只是輔助顯示某個類(如下圖)。不能整體展示層級關係。要是有好的辦法歡迎留言告知。
從 實現方式 上分成三種:
Stream 流式:AbstractStreamResultSetMerger
Memory 記憶體:AbstractMemoryResultSetMerger
Decorator 裝飾者:AbstractDecoratorResultSetMerger
Stream 流式:將資料遊標與結果集的遊標保持一致,順序的從結果集中一條條的獲取正確的資料。看完下文第三節 OrderByStreamResultSetMerger 可以形象的理解。
Memory 記憶體:需要將結果集的所有資料都遍歷並存儲在記憶體中,再通過記憶體歸併後,將記憶體中的資料偽裝成結果集返回。看完下文第五節 GroupByMemoryResultSetMerger 可以形象的理解。
Decorator 裝飾者:可以和前二者任意組合
build()方法原始碼如下:
public ResultSetMerger merge() throws SQLException {
selectStatement.setIndexForItems(columnLabelIndexMap);
return decorate(build());
}
private ResultSetMerger build() throws SQLException {
//分組或者聚合,個人覺得把裡面條件拆出來更直觀些
if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) {
if (selectStatement.isSameGroupByAndOrderByItems()) {
return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);
} else {
return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);
}
}
//如果select語句中有order by欄位,那麼需要OrderByStreamResultSetMerger對結果處理
if (!selectStatement.getOrderByItems().isEmpty()) {
return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems());
}
return new IteratorStreamResultSetMerger(resultSets);
}
build的作用是根據sql語句選擇多個不同的ResultSetMerger對結果進行合併處理。 private ResultSetMerger decorate(final ResultSetMerger resultSetMerger) throws SQLException {
Limit limit = selectStatement.getLimit();
if (null == limit) {
return resultSetMerger;
}
if (DatabaseType.MySQL == limit.getDatabaseType() || DatabaseType.PostgreSQL == limit.getDatabaseType() || DatabaseType.H2 == limit.getDatabaseType()) {
return new LimitDecoratorResultSetMerger(resultSetMerger, selectStatement.getLimit());
}
if (DatabaseType.Oracle == limit.getDatabaseType()) {
return new RowNumberDecoratorResultSetMerger(resultSetMerger, selectStatement.getLimit());
}
if (DatabaseType.SQLServer == limit.getDatabaseType()) {
return new TopAndRowNumberDecoratorResultSetMerger(resultSetMerger, selectStatement.getLimit());
}
return resultSetMerger;
}
decorate原始碼可見,如果SQL語句中有limist,還需要根據資料庫型別選擇不同的DecoratorResultSetMerger配合進行結果歸併;下面看看不同的實現方式。
2.2.1 AbstractStreamResultSetMerger
AbstractStreamResultSetMerger,流式歸併結果集抽象類,提供從當前結果集獲得行資料。public abstract class AbstractStreamResultSetMerger implements ResultSetMerger {
private ResultSet currentResultSet;
private boolean wasNull;
protected ResultSet getCurrentResultSet() throws SQLException {
if (null == currentResultSet) {
throw new SQLException("Current ResultSet is null, ResultSet perhaps end of next.");
}
return currentResultSet;
}
@Override
public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
Object result;
if (Object.class == type) {
result = getCurrentResultSet().getObject(columnIndex);
} else if (boolean.class == type) {
result = getCurrentResultSet().getBoolean(columnIndex);。。。
2.2.2 AbstractMemoryResultSetMerger
AbstractMemoryResultSetMerger,記憶體歸併結果集抽象類,提供從記憶體資料行物件( MemoryResultSetRow ) 獲得行資料。
public abstract class AbstractMemoryResultSetMerger implements ResultSetMerger {
private final Map<String, Integer> labelAndIndexMap;
@Setter
private MemoryResultSetRow currentResultSetRow;
private boolean wasNull;
@Override
public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
if (Blob.class == type || Clob.class == type || Reader.class == type || InputStream.class == type || SQLXML.class == type) {
throw new SQLFeatureNotSupportedException();
}
Object result = currentResultSetRow.getCell(columnIndex);
wasNull = null == result;
return result;
}
MemoryResultSetRow呼叫 #load() 方法,將當前結果集的一條行資料載入到記憶體。
public class MemoryResultSetRow {
private final Object[] data;
public MemoryResultSetRow(final ResultSet resultSet) throws SQLException {
data = load(resultSet);
}
private Object[] load(final ResultSet resultSet) throws SQLException {
int columnCount = resultSet.getMetaData().getColumnCount();
Object[] result = new Object[columnCount];
for (int i = 0; i < columnCount; i++) {
result[i] = resultSet.getObject(i + 1);
}
return result;
}
主要是獲取value的方式不同。
2.2.3 AbstractDecoratorResultSetMerger
AbstractDecoratorResultSetMerger,裝飾結果集歸併抽象類,通過呼叫其裝飾的歸併物件 #getValue() 方法獲得行資料。public abstract class AbstractDecoratorResultSetMerger implements ResultSetMerger {
private final ResultSetMerger resultSetMerger;
@Override
public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
return resultSetMerger.getValue(columnIndex, type);
}
3 OrderByStreamResultSetMerger
看完抽象類,看看子類的實現。
OrderByStreamResultSetMerger,基於 Stream 方式排序歸併結果集實現。
3.1 歸併演算法
歸併操作(merge),也叫歸併演算法,指的是將兩個已經排序的序列合併成一個序列的操作。歸併排序演算法依賴歸併操作。為啥這裡適合用歸併演算法呢?因為之前的各個分片的結果集已經排序。
舉個例子:order by order_id desc
public class OrderByStreamResultSetMerger extends AbstractStreamResultSetMerger {
@Getter(AccessLevel.NONE)
private final List<OrderItem> orderByItems;
private final Queue<OrderByValue> orderByValuesQueue;
private boolean isFirstNext;
public OrderByStreamResultSetMerger(final List<ResultSet> resultSets, final List<OrderItem> orderByItems) throws SQLException {
// sql中order by列的資訊,例項sql是order by order_id desc,即此處就是order_id
this.orderByItems = orderByItems;
// 初始化一個優先順序佇列,優先順序佇列中的元素會根據OrderByValue中compareTo()方法排序,並且SQL重寫後傳送到多少個目標實際表,List<ResultSet>的size就有多大,Queue的capacity就有多大;
this.orderByValuesQueue = new PriorityQueue<>(resultSets.size());
// 將結果壓入佇列中
orderResultSetsToQueue(resultSets);
isFirstNext = true;
}
private void orderResultSetsToQueue(final List<ResultSet> resultSets) throws SQLException {
// 遍歷resultSets--在多少個目標實際表上執行SQL,該集合的size就有多大
for (ResultSet each : resultSets) {
// 將ResultSet和排序列資訊封裝成一個OrderByValue型別
OrderByValue orderByValue = new OrderByValue(each, orderByItems);
// 如果值存在,那麼壓入佇列中
if (orderByValue.next()) {
orderByValuesQueue.offer(orderByValue);
}
}
// 重置currentResultSet的位置:如果佇列不為空,那麼將佇列的頂部(peek)位置設定為currentResultSet的位置
setCurrentResultSet(orderByValuesQueue.isEmpty() ? resultSets.get(0) : orderByValuesQueue.peek().getResultSet());
}
@Override
public boolean next() throws SQLException {
// 呼叫next()判斷是否還有值, 如果佇列為空, 表示沒有任何值, 那麼直接返回false
if (orderByValuesQueue.isEmpty()) {
return false;
}
// 如果佇列不為空, 那麼第一次一定返回true;即有結果可取(且將isFirstNext置為false,表示接下來的請求都不是第一次請求next()方法)
if (isFirstNext) {
isFirstNext = false;
return true;
}
// 從佇列中彈出第一個元素(因為是優先順序佇列,所以poll()返回的值,就是此次要取的值)
OrderByValue firstOrderByValue = orderByValuesQueue.poll();
// 如果它的next()存在,那麼將它的next()再新增到佇列中
if (firstOrderByValue.next()) {
orderByValuesQueue.offer(firstOrderByValue);
}
// 佇列中所有元素全部處理完後就返回false
if (orderByValuesQueue.isEmpty()) {
return false;
}
// 再次重置currentResultSet的位置為佇列的頂部位置;
setCurrentResultSet(orderByValuesQueue.peek().getResultSet());
return true;
}
看到這裡,我覺得我之前整理的Java 集合還沒完,還得補上PriorityQueue。
#offer():增加元素。增加時,會將該元素和已有元素們按照優先順序進行排序
#peek():獲得優先順序第一的元素
#pool():獲得優先順序第一的元素並移除
ResultSet 構建一個 OrderByValue 包含了排序資訊用於排序,
public final class OrderByValue implements Comparable<OrderByValue> {
@Getter
private final ResultSet resultSet;
private final List<OrderItem> orderByItems;
private List<Comparable<?>> orderValues;
/**
* iterate next data.
*
* @return has next data
* @throws SQLException SQL Exception
*/
public boolean next() throws SQLException {
boolean result = resultSet.next();
orderValues = result ? getOrderValues() : Collections.<Comparable<?>>emptyList();
return result;
}
private List<Comparable<?>> getOrderValues() throws SQLException {
List<Comparable<?>> result = new ArrayList<>(orderByItems.size());
for (OrderItem each : orderByItems) {
Object value = resultSet.getObject(each.getIndex());
Preconditions.checkState(null == value || value instanceof Comparable, "Order by value must implements Comparable");
result.add((Comparable<?>) value);
}
return result;
}
@Override
public int compareTo(final OrderByValue o) {
for (int i = 0; i < orderByItems.size(); i++) {
OrderItem thisOrderBy = orderByItems.get(i);
int result = ResultSetUtil.compareTo(orderValues.get(i), o.orderValues.get(i), thisOrderBy.getType(), thisOrderBy.getNullOrderType());
if (0 != result) {
return result;
}
}
return 0;
}
}
呼叫 OrderByValue#next() 方法時,獲得其對應結果集排在第一條的記錄,怎麼實現的呢?看看OrderByValue.next()方面裡面通過 #getOrderValues() 計算該記錄的排序欄位值。這樣兩個OrderByValue 通過 #compareTo() 方法可以比較兩個結果集的第一條記錄。
if (orderByValue.next()) { 處,新增到 PriorityQueue。因此,orderByValuesQueue.peek().getResultSet() 能夠獲得多個 ResultSet 中排在第一的。
通過呼叫 OrderByStreamResultSetMerger#next() 不斷獲得當前排在第一的記錄。#next() 每次呼叫後,實際做的是當前 ResultSet 的替換,以及當前的 ResultSet 的記錄指向下一條。可以結合next()程式碼註釋來理解。當然最好的還是debug完了之後花個圖看看。比如:
假設執行SQLSELECT o.* FROM t_order o where o.user_id=10 order by o.order_id desc limit 3會分發到兩個目標實際表,且第一個實際表返回的結果是1,3,5,7,9;第二個實際表返回的結果是2,4,6,8,10;那麼,經過OrderByStreamResultSetMerger的構造方法中的orderResultSetsToQueue()方法後,Queue<OrderByValue> orderByValuesQueue中包含兩個OrderByValue,一個是10,一個是9;接下來取值執行過程如下:
1. 取得10,並且10的next()是8,然後執行orderByValuesQueue.offer(8);,這時候orderByValuesQueue中包含8和9;
2. 取得9,並且9的next()是7,然後執行orderByValuesQueue.offer(7);,這時候orderByValuesQueue中包含7和8;
3. 取得8,並且8的next()是6,然後執行orderByValuesQueue.offer(6);,這時候orderByValuesQueue中包含7和6;
取值數量已經達到limit 3的限制(原始碼在LimitDecoratorResultSetMerger中的next()方法中),退出;
通過優先順序佇列不斷的poll,offset實現了排序,設計很巧妙啊。
4. 5 GroupByStreamResultSetMerger和GroupByMemoryResultSetMerger
這兩個篇幅較長,所以淡出拆出來整理。6. LimitDecoratorResultSetMerger
LimitDecoratorResultSetMerger,基於 Decorator 分頁結果集歸併實現。
public LimitDecoratorResultSetMerger(final ResultSetMerger resultSetMerger, final Limit limit) throws SQLException {
super(resultSetMerger);
// limit賦值(Limit物件包括limit m,n中的m和n兩個值)
this.limit = limit;
// 判斷是否會跳過所有的結果項,即判斷是否有符合條件的結果
skipAll = skipOffset();
}
private boolean skipOffset() throws SQLException {
// 假定limit.getOffsetValue()就是offset,例項sql中為limit 2,3,所以offset=2
for (int i = 0; i < limit.getOffsetValue(); i++) {
// 嘗試從OrderByStreamResultSetMerger生成的優先順序佇列中跳過offset個元素,如果.next()一直為true,表示有足夠符合條件的結果,那麼返回false;否則沒有足夠符合條件的結果,那麼返回true;即skilAll=true就表示跳過了所有沒有符合條件的結果;
if (!getResultSetMerger().next()) {
return true;
}
}
//行數: limit m,n的sql會被重寫為limit 0, m+n,所以limit.isRowCountRewriteFlag()為true,rowNumber的值為0;
rowNumber = 0 ;
return false;
}
@Override
public boolean next() throws SQLException {
// 如果skipAll為true,即跳過所有,表示沒有任何符合條件的值,那麼返回false
if (skipAll) {
return false;
}
if (limit.getRowCountValue() <0) {
return getResultSetMerger().next();
}
// 每次next()獲取值後,rowNumber自增,當自增rowCountValue次後,就不能再往下繼續取值了,因為條件limit 2,3(rowCountValue=3)限制了
return ++rowNumber <= limit.getRowCountValue() && getResultSetMerger().next();
}這是MySQL,PostgreSQL\H2實現方式,RowNumberDecoratorResultSetMerger是Oracle的。TopAndRowNumberDecoratorResultSetMerger是SQLserver的。對比下就是next()或者skipOffset實現方式不同。
7 IteratorStreamResultSetMerger
IteratorStreamResultSetMerger,基於 Stream 迭代歸併結果集實現。
public final class IteratorStreamResultSetMerger extends AbstractStreamResultSetMerger {
private final Iterator<ResultSet> resultSets;
public IteratorStreamResultSetMerger(final List<ResultSet> resultSets) {
this.resultSets = resultSets.iterator();
// 設定當前 ResultSet,這樣 #getValue() 能拿到記錄
setCurrentResultSet(this.resultSets.next());
}
@Override
public boolean next() throws SQLException {
// 當前 ResultSet 迭代下一條記錄
if (getCurrentResultSet().next()) {
return true;
}
if (!resultSets.hasNext()) {
return false;
}
// 獲得下一個ResultSet, 設定當前 ResultSet
setCurrentResultSet(resultSets.next());
boolean hasNext = getCurrentResultSet().next();
if (hasNext) {
return true;
}
while (!hasNext && resultSets.hasNext()) {
setCurrentResultSet(resultSets.next());
hasNext = getCurrentResultSet().next();
}
return hasNext;
}
}
參考:http://www.iocoder.cn/Sharding-JDBC/result-merger/
https://blog.csdn.net/feelwing1314/article/details/80237389