1. 程式人生 > 其它 >Druid原始碼解析(二):連線池初始化(init 方法)

Druid原始碼解析(二):連線池初始化(init 方法)

  在獲取連線時,會執行初始化方法 init() ,使用 DruidDataSource的入口。

一、雙檢測保證併發安全與效能

  為了保證不會重複初始化並且保證效能,使用了類似雙重檢測鎖的方式來處理,第一次判斷 inited 標識,如果已經初始化,則則直接返回,如果沒有初始化,則使用ReentrantLock加鎖處理,加鎖成功,再次判斷inited標識,inited使用volitle修飾,這裡實際上就是使用了類似單例模式的雙重檢測鎖,第一次判斷保證效能,volitle保證可見性,加鎖防止併發,第二次判斷防止執行緒切換導致的多執行緒拿到鎖。

    public void init() throws
SQLException { if (inited) { return; } // 獲取驅動 // bug fixed for dead lock, for issue #2980 DruidDriver.getInstance(); // 加鎖,防止重複初始化 final ReentrantLock lock = this.lock; try { lock.lockInterruptibly(); } catch
(InterruptedException e) { throw new SQLException("interrupt", e); } boolean init = false; try { if (inited) { return; }

二、屬性生成與設定

  這一步沒有太多可解析的內容,包括:生成當前執行緒的堆疊呼叫資訊、生成資料來源 ID、設定JDBC的相關引數、設定 jdbcUrl、初始化filter、設定DB型別(如mysql、Oracle等)、設定cacheServerConfiguration屬性(是否快取SHOW VARIABLESSHOW COLLATION命令的結果)

            initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());

            this.id = DruidDriver.createDataSourceId();
            if (this.id > 1) {
                long delta = (this.id - 1) * 100000;
                this.connectionIdSeedUpdater.addAndGet(this, delta);
                this.statementIdSeedUpdater.addAndGet(this, delta);
                this.resultSetIdSeedUpdater.addAndGet(this, delta);
                this.transactionIdSeedUpdater.addAndGet(this, delta);
            }

            if (this.jdbcUrl != null) {
                this.jdbcUrl = this.jdbcUrl.trim();
                initFromWrapDriverUrl();
            }

            for (Filter filter : filters) {
                filter.init(this);
            }

            if (this.dbTypeName == null || this.dbTypeName.length() == 0) {
                this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null);
            }

            DbType dbType = DbType.of(this.dbTypeName);
            if (dbType == DbType.mysql
                    || dbType == DbType.mariadb
                    || dbType == DbType.oceanbase
                    || dbType == DbType.ads) {
                boolean cacheServerConfigurationSet = false;
                if (this.connectProperties.containsKey("cacheServerConfiguration")) {
                    cacheServerConfigurationSet = true;
                } else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
                    cacheServerConfigurationSet = true;
                }
                if (cacheServerConfigurationSet) {
                    this.connectProperties.put("cacheServerConfiguration", "true");
                }
            }

三、引數配置校驗

  這一步主要是校驗引數配置是否合理,例如最大活躍連線是否小於0、最大活躍連線是否小於最小連線等等。

            if (maxActive <= 0) {
                throw new IllegalArgumentException("illegal maxActive " + maxActive);
            }

            if (maxActive < minIdle) {
                throw new IllegalArgumentException("illegal maxActive " + maxActive);
            }

            if (getInitialSize() > maxActive) {
                throw new IllegalArgumentException("illegal initialSize " + this.initialSize + ", maxActive " + maxActive);
            }

            if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) {
                throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true");
            }

            if (maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis) {
                throw new SQLException("maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis");
            }

            if (keepAlive && keepAliveBetweenTimeMillis <= timeBetweenEvictionRunsMillis) {
                throw new SQLException("keepAliveBetweenTimeMillis must be grater than timeBetweenEvictionRunsMillis");
            }

四、載入 Filter 等初始化前期處理

  經過上面的校驗後,開始做初始化的前期處理:使用SPI機制載入 Filter 的實現類、處理驅動相關的配置、初始化校驗、初始化異常儲存、初始化 validConnectionChecker、檢驗連線查詢。

            // 初始化SPI
            initFromSPIServiceLoader();

            // 處理驅動相關的配置
            resolveDriver();

            // 初始化校驗
            initCheck();

            // 初始化異常儲存
            initExceptionSorter();

            // 根據不同資料庫初始化 validConnectionChecker
            initValidConnectionChecker();

            // 檢驗連線查詢 sql
            validationQueryCheck();

   1、初始化SPI

  SPI 機制是很多開源元件常用的擴充套件機制,例如在 JDK、dubbo、SpringBoot、Spring Cloud 中都有廣泛的使用,在 Druid 中也使用了 SPI 機制載入,載入所有配置的 Filter。

    private void initFromSPIServiceLoader() {
        if (loadSpifilterSkip) {
            return;
        }

        if (autoFilters == null) {
            List<Filter> filters = new ArrayList<Filter>();
            ServiceLoader<Filter> autoFilterLoader = ServiceLoader.load(Filter.class);

            for (Filter filter : autoFilterLoader) {
                AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class);
                if (autoLoad != null && autoLoad.value()) {
                    filters.add(filter);
                }
            }
            autoFilters = filters;
        }

        for (Filter filter : autoFilters) {
            if (LOG.isInfoEnabled()) {
                LOG.info("load filter from spi :" + filter.getClass().getName());
            }
            addFilter(filter);
        }
    }

  2、處理驅動相關的配置

  這個沒有什麼特別的地方,只是根據不同的資料庫驅動生成驅動的例項,對於 MockDriver、ClickHouse做了特殊處理。

    protected void resolveDriver() throws SQLException {
        if (this.driver == null) {
            if (this.driverClass == null || this.driverClass.isEmpty()) {
                this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
            }

            if (MockDriver.class.getName().equals(driverClass)) {
                driver = MockDriver.instance;
            } else if ("com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver".equals(driverClass)) {
                Properties info = new Properties();
                info.put("user", username);
                info.put("password", password);
                info.putAll(connectProperties);
                driver = new BalancedClickhouseDriver(jdbcUrl, info);
            } else {
                if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0)) {
                    throw new SQLException("url not set");
                }
                driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
            }
        } else {
            if (this.driverClass == null) {
                this.driverClass = driver.getClass().getName();
            }
        }
    }

  3、初始化校驗

  這個就是對於資料庫的各種校驗,例如Oracle的版本和簡單查詢驗證、db2的查詢驗證等

  4、初始化異常儲存

  用於設定 exceptionSorter 屬性,這是Druid連線池穩定性的保證,用於處理重大的不可恢復的異常,它是一個介面,不同的資料庫有不同的實現類,在這裡根據不同的驅動型別生成對應的 exceptionSorter。

  5、初始化 validConnectionChecker

  根據不同資料庫初始化 validConnectionChecker,這個類很重要,用來檢測連線池中連線的可用性,如果檢測連線不可用,則close掉;可用的話就繼續放回連線池中。

  6、檢驗連線查詢

  檢驗連線查詢 sql 是否正確執行。

五、設定 dataSourceStat 並初始化 holder的陣列

  生成 dataSourceStat:判斷dataSourceStat是否採用了全域性的dataSourceStat,如果使用了,則設定 dataSourceStat 的global屬性,然後設定資料庫型別,如果沒有采用了全域性的dataSourceStat,則新建立一個 dataSourceStat。

  設定 dataSourceStat 的重置標誌

  建立 DruidConnectionHolder 陣列,分別是所有連線的陣列 connections、被驅逐連線陣列 evictConnections、存活連線陣列 keepAliveConnections,三個陣列的長度都是最大連線數 maxActive。

            // dataSourceStat是否採用了Global。對dataSourceStat進行set。 初始化holder的陣列
            if (isUseGlobalDataSourceStat()) {
                dataSourceStat = JdbcDataSourceStat.getGlobal();
                if (dataSourceStat == null) {
                    dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbTypeName);
                    JdbcDataSourceStat.setGlobal(dataSourceStat);
                }
                if (dataSourceStat.getDbType() == null) {
                    dataSourceStat.setDbType(this.dbTypeName);
                }
            } else {
                dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbTypeName, this.connectProperties);
            }
            dataSourceStat.setResetStatEnable(this.resetStatEnable);

            connections = new DruidConnectionHolder[maxActive];
            evictConnections = new DruidConnectionHolder[maxActive];
            keepAliveConnections = new DruidConnectionHolder[maxActive];

六、初始化

  根據配置,進行非同步初始化或同步初始化:

    如果是非同步初始化,呼叫 submitCreateTask() 進行處理。

    如果非非同步初始化,則建立物理連線。使用 poolingCount < initialSize 控制建立連線的數量,但是在預設的的初始化過程中,如果不通過其他配置引數指定,這個條件不會被觸發,這可以看做是DruidDataSource的懶載入,只有真正需要Connection的時候,才會去建立物理的連線。

            // 判斷是否進行非同步初始化
            if (createScheduler != null && asyncInit) {
                //  如果非同步初始化,呼叫通過submitCreateTask進行
                for (int i = 0; i < initialSize; ++i) {
                    submitCreateTask(true);
                }
            } else if (!asyncInit) {
                //  如果poolingCount < initialSize,則建立物理連線。但是在預設的的初始化過程中,如果不通過其他配置引數指定,這個條件不會被觸發,這可以看做是DruidDataSource的懶載入,只有真正需要Connection的時候,才會去建立物理的連線。
                // init connections
                while (poolingCount < initialSize) {
                    try {
                        PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
                        DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
                        connections[poolingCount++] = holder;
                    } catch (SQLException ex) {
                        LOG.error("init datasource error, url: " + this.getUrl(), ex);
                        if (initExceptionThrow) {
                            connectError = ex;
                            break;
                        } else {
                            Thread.sleep(3000);
                        }
                    }
                }

                if (poolingCount > 0) {
                    poolingPeak = poolingCount;
                    poolingPeakTime = System.currentTimeMillis();
                }
            }

七、建立執行緒

  執行緒池初始化完成後,呼叫 createAndLogThread() 建立一個日誌執行緒,但是這個執行緒的條件timeBetweenLogStatsMillis大於0,如果這個引數沒有配置,日誌執行緒不會建立。

  呼叫 createAndStartCreatorThread() 方法建立一個建立連線的執行緒,並將建立的執行緒賦值給變數createConnectionThread

  建立 DestroyTask物件,同時建立DestroyConnectionThread執行緒並start,將建立的執行緒賦值給變數 destroyConnectionThread。

  如果keepAlive為true,還需呼叫submitCreateTask方法,將連線填充到minIdle,確保空閒的連線可用。

            // 建立日誌執行緒  但是這個執行緒的條件timeBetweenLogStatsMillis大於0,如果這個引數沒有配置,日誌執行緒不會建立。
            createAndLogThread();
            // 建立一個CreateConnectionThread物件,並啟動。初始化變數createConnectionThread。
            createAndStartCreatorThread();
            // 建立 DestroyTask物件。同時建立DestroyConnectionThread執行緒,並start,初始化destroyConnectionThread。
            createAndStartDestroyThread();

            // 確保上述兩個方法都執行完畢
            initedLatch.await();
            init = true;

            initedTime = new Date();

            // 註冊registerMbean
            registerMbean();

            if (connectError != null && poolingCount == 0) {
                throw connectError;
            }

            //  如果keepAlive為true,還需呼叫submitCreateTask方法,將連線填充到minIdle。確保空閒的連線可用。
            if (keepAlive) {
                // async fill to minIdle
                if (createScheduler != null) {
                    for (int i = 0; i < minIdle; ++i) {
                        submitCreateTask(true);
                    }
                } else {
                    this.emptySignal();
                }
            }

八、初始化完成

  所有的邏輯處理完成後,在 finally 中,會將初始化完成標誌 inited 設定為 true,同時釋放鎖,最後判斷 init和日誌的INFO狀態,列印一條init完成的日誌。

finally {
            // 修改inited為true,並解鎖。
            inited = true;
            lock.unlock();

            // 判斷init和日誌的INFO狀態,列印一條init完成的日誌。
            if (init && LOG.isInfoEnabled()) {
                String msg = "{dataSource-" + this.getID();

                if (this.name != null && !this.name.isEmpty()) {
                    msg += ",";
                    msg += this.name;
                }

                msg += "} inited";

                LOG.info(msg);
            }
        }

九、init流程總結

  init過程,對DruidDataSource進行了初始化操作,為了防止多執行緒併發場景下進行init操作,採用了Double Check的方式,配合ReentrentLock兩次判斷來實現。 詳細流程如下圖: