1. 程式人生 > >sharding-jdbc之執行

sharding-jdbc之執行

【引用官網】為每個分片查詢維持一個獨立的資料庫連線,可以更加有效的利用多執行緒來提升執行效率。為每個資料庫連線開啟獨立的執行緒,可以將I/O所產生的消耗並行處理,連線模式(Connection Mode)的概念,將其劃分為記憶體限制模式(MEMORY_STRICTLY)和連線限制模式(CONNECTION_STRICTLY)這兩種型別

  • 記憶體限制模式:使用此模式的前提是,ShardingSphere對一次操作所耗費的資料庫連線數量不做限制。如果實際執行的SQL需要對某資料庫例項中的200張表做操作,則對每張表建立一個新的資料庫連線,並通過多執行緒的方式併發處理,以達成執行效率最大化。並且在SQL滿足條件情況下,優先選擇流式歸併,以防止出現記憶體溢位或避免頻繁垃圾回收情況
  • 連線限制模式:使用此模式的前提是,ShardingSphere嚴格控制對一次操作所耗費的資料庫連線數量。如果實際執行的SQL需要對某資料庫例項中的200張表做操作,那麼只會建立唯一的資料庫連線,並對其200張表序列處理。如果一次操作中的分片散落在不同的資料庫,仍然採用多執行緒處理對不同庫的操作,但每個庫的每次操作仍然只建立一個唯一的資料庫連線。這樣即可以防止對一次請求對資料庫連線佔用過多所帶來的問題。該模式始終選擇記憶體歸併

case: 本文主要以SELECT i.* FROM t_order o, t_order_item i WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2一個簡單查詢語句,來分析ss大致如何來執行sql,根據分片改寫後的sql,應該是demo_ds_slave_0:SELECT * FROM t_order_0 i, t_order_item_0 o WHERE o.order_id = i.order_id and o.order_id = 2 and o.user_id = 2 來執行

 

準備階段

1.初始化PreparedStatementExecutor#init,封裝Statement執行單元

public final class PreparedStatementExecutor extends AbstractStatementExecutor {

    @Getter
    private final boolean returnGeneratedKeys;

    public PreparedStatementExecutor(
            final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys, final ShardingConnection shardingConnection) {
        super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection);
        this.returnGeneratedKeys = returnGeneratedKeys;
    }

    /**
     * Initialize executor.
     *
     * @param routeResult route result
     * @throws SQLException SQL exception
     */
    public void init(final SQLRouteResult routeResult) throws SQLException {
        setSqlStatement(routeResult.getOptimizedStatement().getSQLStatement());
        //新增路由單元,即資料來源對應的sql單元
        getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
        //快取statement、引數
        cacheStatements();
    }

    private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<RouteUnit> routeUnits) throws SQLException {
        //執行封裝Statement執行單元
        return getSqlExecutePrepareTemplate().getExecuteUnitGroups(routeUnits, new SQLExecutePrepareCallback() {

            @Override
            public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
                return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
            }

            @Override
            public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final RouteUnit routeUnit, final ConnectionMode connectionMode) throws SQLException {
                return new StatementExecuteUnit(routeUnit, createPreparedStatement(connection, routeUnit.getSqlUnit().getSql()), connectionMode);
            }
        });
    }

    @SuppressWarnings("MagicConstant")
    private PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException {
        return returnGeneratedKeys ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
                : connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());
    }

   ... ...   
}

 

2.執行封裝Statement執行單元getSqlExecutePrepareTemplate().getExecuteUnitGroups

@RequiredArgsConstructor
public final class SQLExecutePrepareTemplate {

    private final int maxConnectionsSizePerQuery;

    /**
     * Get execute unit groups.
     *
     * @param routeUnits route units
     * @param callback SQL execute prepare callback
     * @return statement execute unit groups
     * @throws SQLException SQL exception
     */
    public Collection<ShardingExecuteGroup<StatementExecuteUnit>> getExecuteUnitGroups(final Collection<RouteUnit> routeUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        return getSynchronizedExecuteUnitGroups(routeUnits, callback);
    }

    private Collection<ShardingExecuteGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups(
            final Collection<RouteUnit> routeUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        //資料來源對應sql單元集合,即demo_ds_0:[select i.* from t_order_0 i, t_order_item_0 o where i.order_id = o.order_id and i.order_id = ?]
        Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(routeUnits);
        Collection<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>();

        for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
            //新增分片執行組
            result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback));
        }
        return result;
    }

    private Map<String, List<SQLUnit>> getSQLUnitGroups(final Collection<RouteUnit> routeUnits) {
        Map<String, List<SQLUnit>> result = new LinkedHashMap<>(routeUnits.size(), 1);
        for (RouteUnit each : routeUnits) {
            if (!result.containsKey(each.getDataSourceName())) {
                result.put(each.getDataSourceName(), new LinkedList<SQLUnit>());
            }
            result.get(each.getDataSourceName()).add(each.getSqlUnit());
        }
        return result;
    }

    private List<ShardingExecuteGroup<StatementExecuteUnit>> getSQLExecuteGroups(
            final String dataSourceName, final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        List<ShardingExecuteGroup<StatementExecuteUnit>> result = new LinkedList<>();
        //在maxConnectionSizePerQuery允許的範圍內,當一個連線需要執行的請求數量大於1時,意味著當前的資料庫連線無法持有相應的資料結果集,則必須採用記憶體歸併;
        //反之,當一個連線需要執行的請求數量等於1時,意味著當前的資料庫連線可以持有相應的資料結果集,則可以採用流式歸併

        //計算所需要的分割槽大小
        int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);
        //按照分割槽大小進行分割槽
        //事例:
        //sqlUnits = [1, 2, 3, 4, 5]
        //desiredPartitionSize = 2
        //則結果為:[[1, 2], [3,4], [5]]
        List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);
        //maxConnectionsSizePerQuery該引數表示一次查詢時每個資料庫所允許使用的最大連線數
        //根據maxConnectionsSizePerQuery來判斷使用連線模式
        //CONNECTION_STRICTLY 連線限制模式
        //MEMORY_STRICTLY 記憶體限制模式
        ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
        //獲取分割槽大小的連線
        List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size());
        int count = 0;
        //遍歷分割槽,將分割槽好的sql單元放到指定連線執行
        for (List<SQLUnit> each : sqlUnitPartitions) {
            result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));
        }
        return result;
    }

    private ShardingExecuteGroup<StatementExecuteUnit> getSQLExecuteGroup(final ConnectionMode connectionMode, final Connection connection, 
                                                                          final String dataSourceName, final List<SQLUnit> sqlUnitGroup, final SQLExecutePrepareCallback callback) throws SQLException {
        List<StatementExecuteUnit> result = new LinkedList<>();
        //遍歷sql單元
        for (SQLUnit each : sqlUnitGroup) {
            //回撥,建立statement執行單元
            result.add(callback.createStatementExecuteUnit(connection, new RouteUnit(dataSourceName, each), connectionMode));
        }
        //封裝成分片執行組
        return new ShardingExecuteGroup<>(result);
    }
}

 

執行階段

1.執行查詢sql

public final class PreparedStatementExecutor extends AbstractStatementExecutor {

    ... ...

    /**
     * Execute query.
     *
     * @return result set list
     * @throws SQLException SQL exception
     */
    public List<QueryResult> executeQuery() throws SQLException {
        //獲取當前是否異常值
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        //建立回撥例項
        //執行SQLExecuteCallback的execute方法
        SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {

            @Override
            protected QueryResult executeSQL(final RouteUnit routeUnit, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                return getQueryResult(statement, connectionMode);
            }
        };
        return executeCallback(executeCallback);
    }

    ... ...

   protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
        List<T> result = sqlExecuteTemplate.executeGroup((Collection) executeGroups, executeCallback);
        //執行完後重新整理分片元資料,比如建立表、修改表etc.
        refreshShardingMetaDataIfNeeded(connection.getShardingContext(), sqlStatement);
        return result;
    }

    ... ...
}

 

2.通過執行緒池分組執行,並回調callback

@RequiredArgsConstructor
public abstract class SQLExecuteCallback<T> implements ShardingGroupExecuteCallback<StatementExecuteUnit, T> {
    //資料庫型別
    private final DatabaseType databaseType;
    //是否異常
    private final boolean isExceptionThrown;

    @Override
    public final Collection<T> execute(final Collection<StatementExecuteUnit> statementExecuteUnits, final boolean isTrunkThread,
                                       final Map<String, Object> shardingExecuteDataMap) throws SQLException {
        Collection<T> result = new LinkedList<>();
        //遍歷statement執行單元
        for (StatementExecuteUnit each : statementExecuteUnits) {
            //執行新增返回結果T
            result.add(execute0(each, isTrunkThread, shardingExecuteDataMap));
        }
        return result;
    }

    private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread, final Map<String, Object> shardingExecuteDataMap) throws SQLException {
        //設定當前執行緒是否異常
        ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
        //根據url獲取資料來源元資料
        DataSourceMetaData dataSourceMetaData = databaseType.getDataSourceMetaData(statementExecuteUnit.getStatement().getConnection().getMetaData().getURL());
        //sql執行鉤子
        SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
        try {
            sqlExecutionHook.start(statementExecuteUnit.getRouteUnit(), dataSourceMetaData, isTrunkThread, shardingExecuteDataMap);
            //執行sql
            T result = executeSQL(statementExecuteUnit.getRouteUnit(), statementExecuteUnit.getStatement(), statementExecuteUnit.getConnectionMode());
            sqlExecutionHook.finishSuccess();
            return result;
        } catch (final SQLException ex) {
            sqlExecutionHook.finishFailure(ex);
            ExecutorExceptionHandler.handleException(ex);
            return null;
        }
    }

    protected abstract T executeSQL(RouteUnit routeUnit, Statement statement, ConnectionMode connectionMode) throws SQLException;
}

 

3.執行executeSQL,呼叫第三步的callback中的executeSQL,封裝ResultSet

public final class PreparedStatementExecutor extends AbstractStatementExecutor {

    ... ...

    private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException {
        PreparedStatement preparedStatement = (PreparedStatement) statement;
        ResultSet resultSet = preparedStatement.executeQuery();
        ShardingRule shardingRule = getConnection().getShardingContext().getShardingRule();
        //快取resultSet
        getResultSets().add(resultSet);
        //判斷ConnectionMode
        //如果是MEMORY_STRICTLY,使用流式StreamQueryResult;否則使用記憶體MemoryQueryResult
        return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet, shardingRule) 
                : new MemoryQueryResult(resultSet, shardingRule);
    }

    ... ...
}

 

本文主要有以下幾個重點:

1.封裝資料來源對應的sql單元,按資料來源遍歷執行

2.根據maxConnectionsSizePerQuery引數來計算sql單元分割槽

3.連線模式,根據maxConnectionsSizePerQuery引數、sql單元來計算具體使用記憶體限制模式還是連線限制模式

3.根據sql單元、連線模式來獲取對應資料來源連線

4.執行後根據連線模式封裝QueryResult,流式Result還