1. 程式人生 > >shardingjdbc (七)-結果合併

shardingjdbc (七)-結果合併

一 序

          單分片的SQL查詢不需要合併,多分片的情況在各分片排序完後,Sharding-JDBC 獲取到結果後,仍然需要再進一步排序。目前有 分頁、分組、排序、聚合列、迭代 五種場景需要做進一步處理。

    public ResultSet executeQuery() throws SQLException {
        ResultSet result;
        try {
            Collection<PreparedStatementUnit> preparedStatementUnits = route();
            List<ResultSet> resultSets = new PreparedStatementExecutor(
                    getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();
            result = new ShardingResultSet(resultSets, new MergeEngine(resultSets, (SelectStatement) routeResult.getSqlStatement()).merge(), this);
        } finally {
            clearBatch();
        }
        currentResultSet = result;
        return result;
    }


merger部分原始碼結構如上圖。核心是mergeEngine.

二 MergeEngine

MergeEngine,分片結果集歸併引擎。

private final DatabaseType databaseType;
private final List<ResultSet> resultSets;
private final SelectStatement selectStatement;
private final Map<String, Integer> columnLabelIndexMap;
    
public MergeEngine(final DatabaseType databaseType, final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {
   this.databaseType = databaseType;
   this.resultSets = resultSets;
   this.selectStatement = selectStatement;
   // 獲得 查詢列名與位置對映
   columnLabelIndexMap = getColumnLabelIndexMap(resultSets.get(0));
}

private Map<String, Integer> getColumnLabelIndexMap(final ResultSet resultSet) throws SQLException {
   ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); // 元資料(包含查詢列資訊)
   Map<String, Integer> result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
   for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
       result.put(SQLUtil.getExactlyValue(resultSetMetaData.getColumnLabel(i)), i);
   }
   return result;
}

當 MergeEngine 被建立時,會傳入 resultSets 結果集集合,並根據其獲得 columnLabelIndexMap 查詢列名與位置對映。

MergeEngine.merge()方法作為入口提供查詢結果歸併功能,原始碼如下:

    /**
     * Merge result sets.
     *
     * @return merged result set.
     * @throws SQLException SQL exception
     */
    public ResultSetMerger merge() throws SQLException {
        selectStatement.setIndexForItems(columnLabelIndexMap);
        return decorate(build());
    }

2.1 SelectStatement#setIndexForItems()

   /**
     * Set index for select items.
     * 
     * @param columnLabelIndexMap map for column label and index
     */
    public void setIndexForItems(final Map<String, Integer> columnLabelIndexMap) {
        setIndexForAggregationItem(columnLabelIndexMap);
        setIndexForOrderItem(columnLabelIndexMap, orderByItems);
        setIndexForOrderItem(columnLabelIndexMap, groupByItems);
    }

部分查詢列是經過推到出來,在 SQL解析 過程中,未獲得到查詢列位置,需要通過該方法進行初始化。這裡可以結合SQL改寫部分進行理解。

#setIndexForAggregationItem() 處理 AVG聚合計算列 推匯出其對應的 SUM/COUNT 聚合計算列的位置:

    private void setIndexForAggregationItem(final Map<String, Integer> columnLabelIndexMap) {
        for (AggregationSelectItem each : getAggregationSelectItems()) {
            Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel()), String.format("Can't find index: %s, please add alias for aggregate selections", each));
            each.setIndex(columnLabelIndexMap.get(each.getColumnLabel()));
            for (AggregationSelectItem derived : each.getDerivedAggregationSelectItems()) {
                Preconditions.checkState(columnLabelIndexMap.containsKey(derived.getColumnLabel()), String.format("Can't find index: %s", derived));
                derived.setIndex(columnLabelIndexMap.get(derived.getColumnLabel()));
            }
        }
    }

#setIndexForOrderItem() 處理 ORDER BY / GROUP BY 列不在查詢列 推匯出的查詢列的位置:

    private void setIndexForOrderItem(final Map<String, Integer> columnLabelIndexMap, final List<OrderItem> orderItems) {
        for (OrderItem each : orderItems) {
            if (-1 != each.getIndex()) {
                continue;
            }
            Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel()), String.format("Can't find index: %s", each));
            if (columnLabelIndexMap.containsKey(each.getColumnLabel())) {
                each.setIndex(columnLabelIndexMap.get(each.getColumnLabel()));
            }
        }
    }

2.2 ResultSetMerger

ResultSetMerger,歸併結果集介面。


這個圖主要是整理了關係。細節沒有加上。

因為umlplant外掛在eclipse上面只是輔助顯示某個類(如下圖)。不能整體展示層級關係。要是有好的辦法歡迎留言告知。


從 實現方式 上分成三種:
Stream 流式:AbstractStreamResultSetMerger
Memory 記憶體:AbstractMemoryResultSetMerger
Decorator 裝飾者:AbstractDecoratorResultSetMerger




Stream 流式:將資料遊標與結果集的遊標保持一致,順序的從結果集中一條條的獲取正確的資料。看完下文第三節 OrderByStreamResultSetMerger 可以形象的理解。
Memory 記憶體:需要將結果集的所有資料都遍歷並存儲在記憶體中,再通過記憶體歸併後,將記憶體中的資料偽裝成結果集返回。看完下文第五節 GroupByMemoryResultSetMerger 可以形象的理解。

Decorator 裝飾者:可以和前二者任意組合

build()方法原始碼如下:

public ResultSetMerger merge() throws SQLException {
        selectStatement.setIndexForItems(columnLabelIndexMap);
        return decorate(build());
    }    
    private ResultSetMerger build() throws SQLException {
        //分組或者聚合,個人覺得把裡面條件拆出來更直觀些
        if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) {
            if (selectStatement.isSameGroupByAndOrderByItems()) {
                return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);
            } else {
                return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);
            }
        }
        //如果select語句中有order by欄位,那麼需要OrderByStreamResultSetMerger對結果處理
        if (!selectStatement.getOrderByItems().isEmpty()) {
            return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems());
        }
        return new IteratorStreamResultSetMerger(resultSets);
    }
build的作用是根據sql語句選擇多個不同的ResultSetMerger對結果進行合併處理。
 private ResultSetMerger decorate(final ResultSetMerger resultSetMerger) throws SQLException {
        Limit limit = selectStatement.getLimit();
        if (null == limit) {
            return resultSetMerger;
        }
        if (DatabaseType.MySQL == limit.getDatabaseType() || DatabaseType.PostgreSQL == limit.getDatabaseType() || DatabaseType.H2 == limit.getDatabaseType()) {
            return new LimitDecoratorResultSetMerger(resultSetMerger, selectStatement.getLimit());
        }
        if (DatabaseType.Oracle == limit.getDatabaseType()) {
            return new RowNumberDecoratorResultSetMerger(resultSetMerger, selectStatement.getLimit());
        }
        if (DatabaseType.SQLServer == limit.getDatabaseType()) {
            return new TopAndRowNumberDecoratorResultSetMerger(resultSetMerger, selectStatement.getLimit());
        }
        return resultSetMerger;
    }

decorate原始碼可見,如果SQL語句中有limist,還需要根據資料庫型別選擇不同的DecoratorResultSetMerger配合進行結果歸併;下面看看不同的實現方式。

2.2.1 AbstractStreamResultSetMerger

AbstractStreamResultSetMerger,流式歸併結果集抽象類,提供從當前結果集獲得行資料。
public abstract class AbstractStreamResultSetMerger implements ResultSetMerger {
    
    private ResultSet currentResultSet;
    
    private boolean wasNull;
    
    protected ResultSet getCurrentResultSet() throws SQLException {
        if (null == currentResultSet) {
            throw new SQLException("Current ResultSet is null, ResultSet perhaps end of next.");
        }
        return currentResultSet;
    }
    
    @Override
    public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
        Object result;
        if (Object.class == type) {
            result = getCurrentResultSet().getObject(columnIndex);
        } else if (boolean.class == type) {
            result = getCurrentResultSet().getBoolean(columnIndex);。。。

2.2.2 AbstractMemoryResultSetMerger

AbstractMemoryResultSetMerger,記憶體歸併結果集抽象類,提供從記憶體資料行物件( MemoryResultSetRow ) 獲得行資料。

public abstract class AbstractMemoryResultSetMerger implements ResultSetMerger {
    
    private final Map<String, Integer> labelAndIndexMap;
    
    @Setter
    private MemoryResultSetRow currentResultSetRow;
    
    private boolean wasNull;
    
    @Override
    public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
        if (Blob.class == type || Clob.class == type || Reader.class == type || InputStream.class == type || SQLXML.class == type) {
            throw new SQLFeatureNotSupportedException();
        }
        Object result = currentResultSetRow.getCell(columnIndex);
        wasNull = null == result;
        return result;
    }

MemoryResultSetRow呼叫 #load() 方法,將當前結果集的一條行資料載入到記憶體。

public class MemoryResultSetRow {
    
    private final Object[] data;
    
    public MemoryResultSetRow(final ResultSet resultSet) throws SQLException {
        data = load(resultSet);
    }
    
    private Object[] load(final ResultSet resultSet) throws SQLException {
        int columnCount = resultSet.getMetaData().getColumnCount();
        Object[] result = new Object[columnCount];
        for (int i = 0; i < columnCount; i++) {
            result[i] = resultSet.getObject(i + 1);
        }
        return result;
    }

主要是獲取value的方式不同。

2.2.3 AbstractDecoratorResultSetMerger

AbstractDecoratorResultSetMerger,裝飾結果集歸併抽象類,通過呼叫其裝飾的歸併物件 #getValue() 方法獲得行資料。
public abstract class AbstractDecoratorResultSetMerger implements ResultSetMerger {
    
    private final ResultSetMerger resultSetMerger;
        
    @Override
    public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
        return resultSetMerger.getValue(columnIndex, type);
    }    

3 OrderByStreamResultSetMerger

看完抽象類,看看子類的實現。

OrderByStreamResultSetMerger,基於 Stream 方式排序歸併結果集實現。

3.1 歸併演算法

歸併操作(merge),也叫歸併演算法,指的是將兩個已經排序的序列合併成一個序列的操作。歸併排序演算法依賴歸併操作。為啥這裡適合用歸併演算法呢?因為之前的各個分片的結果集已經排序。

舉個例子:order by order_id desc 

public class OrderByStreamResultSetMerger extends AbstractStreamResultSetMerger {
    
    @Getter(AccessLevel.NONE)
    private final List<OrderItem> orderByItems;
    
    private final Queue<OrderByValue> orderByValuesQueue;
    
    private boolean isFirstNext;
    
    public OrderByStreamResultSetMerger(final List<ResultSet> resultSets, final List<OrderItem> orderByItems) throws SQLException {
    // sql中order by列的資訊,例項sql是order by order_id desc,即此處就是order_id
    this.orderByItems = orderByItems;
    // 初始化一個優先順序佇列,優先順序佇列中的元素會根據OrderByValue中compareTo()方法排序,並且SQL重寫後傳送到多少個目標實際表,List<ResultSet>的size就有多大,Queue的capacity就有多大;
    this.orderByValuesQueue = new PriorityQueue<>(resultSets.size());
    // 將結果壓入佇列中
    orderResultSetsToQueue(resultSets);
    isFirstNext = true;
}

private void orderResultSetsToQueue(final List<ResultSet> resultSets) throws SQLException {
    // 遍歷resultSets--在多少個目標實際表上執行SQL,該集合的size就有多大
    for (ResultSet each : resultSets) {
        // 將ResultSet和排序列資訊封裝成一個OrderByValue型別
        OrderByValue orderByValue = new OrderByValue(each, orderByItems);
        // 如果值存在,那麼壓入佇列中
        if (orderByValue.next()) {
            orderByValuesQueue.offer(orderByValue);
        }
    }
    // 重置currentResultSet的位置:如果佇列不為空,那麼將佇列的頂部(peek)位置設定為currentResultSet的位置
    setCurrentResultSet(orderByValuesQueue.isEmpty() ? resultSets.get(0) : orderByValuesQueue.peek().getResultSet());
}

@Override
public boolean next() throws SQLException {
    // 呼叫next()判斷是否還有值, 如果佇列為空, 表示沒有任何值, 那麼直接返回false
    if (orderByValuesQueue.isEmpty()) {
        return false;
    }
    // 如果佇列不為空, 那麼第一次一定返回true;即有結果可取(且將isFirstNext置為false,表示接下來的請求都不是第一次請求next()方法)
    if (isFirstNext) {
        isFirstNext = false;
        return true;
    }
    // 從佇列中彈出第一個元素(因為是優先順序佇列,所以poll()返回的值,就是此次要取的值)
    OrderByValue firstOrderByValue = orderByValuesQueue.poll();
    // 如果它的next()存在,那麼將它的next()再新增到佇列中
    if (firstOrderByValue.next()) {
        orderByValuesQueue.offer(firstOrderByValue);
    }
    // 佇列中所有元素全部處理完後就返回false
    if (orderByValuesQueue.isEmpty()) {
        return false;
    }
    // 再次重置currentResultSet的位置為佇列的頂部位置;
    setCurrentResultSet(orderByValuesQueue.peek().getResultSet());
    return true;
}

看到這裡,我覺得我之前整理的Java 集合還沒完,還得補上PriorityQueue。

#offer():增加元素。增加時,會將該元素和已有元素們按照優先順序進行排序
#peek():獲得優先順序第一的元素
#pool():獲得優先順序第一的元素並移除

ResultSet 構建一個 OrderByValue 包含了排序資訊用於排序,

public final class OrderByValue implements Comparable<OrderByValue> {
    
    @Getter
    private final ResultSet resultSet;
    
    private final List<OrderItem> orderByItems;
    
    private List<Comparable<?>> orderValues;
    
    /**
     * iterate next data.
     *
     * @return has next data
     * @throws SQLException SQL Exception
     */
    public boolean next() throws SQLException {
        boolean result = resultSet.next();
        orderValues = result ? getOrderValues() : Collections.<Comparable<?>>emptyList();
        return result;
    }
    
    private List<Comparable<?>> getOrderValues() throws SQLException {
        List<Comparable<?>> result = new ArrayList<>(orderByItems.size());
        for (OrderItem each : orderByItems) {
            Object value = resultSet.getObject(each.getIndex());
            Preconditions.checkState(null == value || value instanceof Comparable, "Order by value must implements Comparable");
            result.add((Comparable<?>) value);
        }
        return result;
    }
    
    @Override
    public int compareTo(final OrderByValue o) {
        for (int i = 0; i < orderByItems.size(); i++) {
            OrderItem thisOrderBy = orderByItems.get(i);
            int result = ResultSetUtil.compareTo(orderValues.get(i), o.orderValues.get(i), thisOrderBy.getType(), thisOrderBy.getNullOrderType());
            if (0 != result) {
                return result;
            }
        }
        return 0;
    }
}
呼叫 OrderByValue#next() 方法時,獲得其對應結果集排在第一條的記錄,怎麼實現的呢?
看看OrderByValue.next()方面裡面通過 #getOrderValues() 計算該記錄的排序欄位值。這樣兩個OrderByValue 通過 #compareTo() 方法可以比較兩個結果集的第一條記錄。

if (orderByValue.next()) { 處,新增到 PriorityQueue。因此,orderByValuesQueue.peek().getResultSet() 能夠獲得多個 ResultSet 中排在第一的。

通過呼叫 OrderByStreamResultSetMerger#next() 不斷獲得當前排在第一的記錄。#next() 每次呼叫後,實際做的是當前 ResultSet 的替換,以及當前的 ResultSet 的記錄指向下一條。可以結合next()程式碼註釋來理解。當然最好的還是debug完了之後花個圖看看。比如:

假設執行SQLSELECT o.* FROM t_order o where o.user_id=10 order by o.order_id desc limit 3會分發到兩個目標實際表,且第一個實際表返回的結果是1,3,5,7,9;第二個實際表返回的結果是2,4,6,8,10;那麼,經過OrderByStreamResultSetMerger的構造方法中的orderResultSetsToQueue()方法後,Queue<OrderByValue> orderByValuesQueue中包含兩個OrderByValue,一個是10,一個是9;接下來取值執行過程如下: 
1. 取得10,並且10的next()是8,然後執行orderByValuesQueue.offer(8);,這時候orderByValuesQueue中包含8和9; 
2. 取得9,並且9的next()是7,然後執行orderByValuesQueue.offer(7);,這時候orderByValuesQueue中包含7和8; 
3. 取得8,並且8的next()是6,然後執行orderByValuesQueue.offer(6);,這時候orderByValuesQueue中包含7和6; 
取值數量已經達到limit 3的限制(原始碼在LimitDecoratorResultSetMerger中的next()方法中),退出;

通過優先順序佇列不斷的poll,offset實現了排序,設計很巧妙啊。

4. 5 GroupByStreamResultSetMergerGroupByMemoryResultSetMerger

這兩個篇幅較長,所以淡出拆出來整理。

6. LimitDecoratorResultSetMerger

LimitDecoratorResultSetMerger,基於 Decorator 分頁結果集歸併實現。

public LimitDecoratorResultSetMerger(final ResultSetMerger resultSetMerger, final Limit limit) throws SQLException {
    super(resultSetMerger);
    // limit賦值(Limit物件包括limit m,n中的m和n兩個值)
    this.limit = limit;
    // 判斷是否會跳過所有的結果項,即判斷是否有符合條件的結果
    skipAll = skipOffset();
}

private boolean skipOffset() throws SQLException {
    // 假定limit.getOffsetValue()就是offset,例項sql中為limit 2,3,所以offset=2
    for (int i = 0; i < limit.getOffsetValue(); i++) {
        // 嘗試從OrderByStreamResultSetMerger生成的優先順序佇列中跳過offset個元素,如果.next()一直為true,表示有足夠符合條件的結果,那麼返回false;否則沒有足夠符合條件的結果,那麼返回true;即skilAll=true就表示跳過了所有沒有符合條件的結果;
        if (!getResultSetMerger().next()) {
            return true;
        }
    }
    //行數: limit m,n的sql會被重寫為limit 0, m+n,所以limit.isRowCountRewriteFlag()為true,rowNumber的值為0;
    rowNumber = 0 ;
    return false;
}

@Override
public boolean next() throws SQLException {
    // 如果skipAll為true,即跳過所有,表示沒有任何符合條件的值,那麼返回false
    if (skipAll) {
        return false;
    }
    if (limit.getRowCountValue() <0) {
        return getResultSetMerger().next();
    }    
// 每次next()獲取值後,rowNumber自增,當自增rowCountValue次後,就不能再往下繼續取值了,因為條件limit 2,3(rowCountValue=3)限制了
 return ++rowNumber <= limit.getRowCountValue() && getResultSetMerger().next();
}

這是MySQL,PostgreSQL\H2實現方式,RowNumberDecoratorResultSetMerger是Oracle的。TopAndRowNumberDecoratorResultSetMerger是SQLserver的。對比下就是next()或者skipOffset實現方式不同。

7 IteratorStreamResultSetMerger

 IteratorStreamResultSetMerger,基於 Stream 迭代歸併結果集實現。

public final class IteratorStreamResultSetMerger extends AbstractStreamResultSetMerger {
    
    private final Iterator<ResultSet> resultSets;
    
    public IteratorStreamResultSetMerger(final List<ResultSet> resultSets) {
        this.resultSets = resultSets.iterator();
        // 設定當前 ResultSet,這樣 #getValue() 能拿到記錄
        setCurrentResultSet(this.resultSets.next());
    }
    
    @Override
    public boolean next() throws SQLException {
    // 當前 ResultSet 迭代下一條記錄
        if (getCurrentResultSet().next()) {
            return true;
        }
        if (!resultSets.hasNext()) {
            return false;
        }
        // 獲得下一個ResultSet, 設定當前 ResultSet
        setCurrentResultSet(resultSets.next());
        boolean hasNext = getCurrentResultSet().next();
        if (hasNext) {
            return true;
        }
        while (!hasNext && resultSets.hasNext()) {
            setCurrentResultSet(resultSets.next());
            hasNext = getCurrentResultSet().next();
        }
        return hasNext;
    }
}

參考:http://www.iocoder.cn/Sharding-JDBC/result-merger/

https://blog.csdn.net/feelwing1314/article/details/80237389