資料庫連線池之Hikari原始碼解析
Hikari連線池目前公認是效能最高的資料庫連線池,同時也是SpringBoot2.0以後預設使用的資料庫連線池。
一、Hikari的使用
1.1、Hikari相關配置
由於Springboot2.0預設就是使用的Hikari連線池,所以無需額外新增Hikari相關的maven依賴。只需要在application.yml新增對應的配置即可,如下:
spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNullspring.datasource.username=admin spring.datasource.password=admin spring.datasource.type=com.zaxxer.hikari.HikariDataSource spring.datasource.hikari.minimum-idle=5 spring.datasource.hikari.maximum-pool-size=15 spring.datasource.hikari.auto-commit=true spring.datasource.hikari.idle-timeout=30000 spring.datasource.hikari.pool-name=DatebookHikariCP spring.datasource.hikari.max-lifetime=1800000 spring.datasource.hikari.connection-timeout=30000 spring.datasource.hikari.connection-test-query=SELECT 1
1.2、Hikari配置詳解
配置項 | 案例值 | 描述 |
autoCommit | true | 是否自動提交 |
connectionTimeout | 30000 | 客戶端建立連線等待超時時間,如果30秒內沒有獲取連線則拋異常,不再繼續等待 |
idleTimeout | 60000 | 連線允許最長空閒時間,如果連線空閒時間超過1分鐘,則會被關閉 |
maxLifetime | 1800000 | 連線最長生命週期,當連線存活時間達到30分鐘之後會被關閉作退休處理 |
minimumIdle | 1 | 連線池中最小空閒連線數 |
maximumPoolSize | 10 | 連線池中最大連線數 |
readOnly | false | 從池中獲取的連線是否是隻讀模式 |
validationTimeout | 5000 | 測試連線是否空閒的間隔 |
leadDetectionThreshold | 60000 | 連線被佔用的超時時間,超過1分鐘客戶端沒有釋放連線則強制回收該連線,防止連線洩漏 |
二、Hikari原始碼解析
2.1、獲取連線
1、Hikari中的核心類為HikariDataSource,表示Hikari連線池中的資料來源,實現了DataSource介面的getConnection方法,getConnection方法原始碼如下:
1 /** 連線池物件 2 * fastPathPool 會在初始化時建立 3 * pool 是在獲取連線數建立 4 * volatile修飾pool導致每次讀pool都要從主存載入,每次寫也要寫回主存,效能不如沒volatile修飾的fastPathPool 5 * */ 6 private final HikariPool fastPathPool; 7 private volatile HikariPool pool; 8 9 /** 獲取連線*/ 10 public Connection getConnection() throws SQLException 11 { 12 if (isClosed()) { 13 throw new SQLException("HikariDataSource " + this + " has been closed."); 14 } 15 /** 如果fastPathPool存在則直接獲取連線 */ 16 if (fastPathPool != null) { 17 return fastPathPool.getConnection(); 18 } 19 /** 如果沒有fastPathPool 則建立HikariPool物件 */ 20 HikariPool result = pool; 21 if (result == null) { 22 synchronized (this) { 23 result = pool; 24 if (result == null) { 25 validate(); 26 LOGGER.info("{} - Starting...", getPoolName()); 27 try { 28 /** 初始化建立HikariPool物件*/ 29 pool = result = new HikariPool(this); 30 this.seal(); 31 } 32 catch (PoolInitializationException pie) { 33 // 34 } 35 } 36 } 37 } 38 /** 呼叫pool的getConnection()方法獲取連線*/ 39 return result.getConnection(); 40 }
1 public HikariDataSource(HikariConfig configuration) 2 { 3 configuration.validate(); 4 configuration.copyStateTo(this); 5 6 LOGGER.info("{} - Starting...", configuration.getPoolName()); 7 pool = fastPathPool = new HikariPool(this); 8 LOGGER.info("{} - Start completed.", configuration.getPoolName()); 9 10 this.seal(); 11 }
getConnection方法邏輯不多,主要是呼叫了HikariPool的getConnection()方法,而HikariDataSource中有兩個HikariPool物件,一個是fastPathPool是在HikariPool有參建構函式中建立, 如果沒有建立fastPathPool,那麼就會在getConnection方法時建立pool物件。
很顯然pool物件是由volatile關鍵字修飾的,而fastPathPool是final型別的,所以fastPathPool的效率會比pool要高,所以推薦使用HikariDataSource有參建構函式進行初始化。
2、由上可知獲取連線的邏輯是在HikariPool的getConnection方法中,繼續分析HikariPool的getConnection方法,原始碼如下:
1 /** 獲取連線*/ 2 public Connection getConnection(final long hardTimeout) throws SQLException 3 { 4 /** 獲取鎖*/ 5 suspendResumeLock.acquire(); 6 final long startTime = currentTime(); 7 8 try { 9 long timeout = hardTimeout; 10 do { 11 /** 從ConcurrentBag中借出一個PoolEntry物件 */ 12 PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS); 13 if (poolEntry == null) { 14 break; // We timed out... break and throw exception 15 } 16 17 final long now = currentTime(); 18 /** 判斷連線是否被標記為拋棄 或者 空閒時間過長, 是的話就關閉連線*/ 19 if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) { 20 closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE); 21 timeout = hardTimeout - elapsedMillis(startTime); 22 } 23 else { 24 metricsTracker.recordBorrowStats(poolEntry, startTime); 25 /** 通過Javassist建立代理連線*/ 26 return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now); 27 } 28 } while (timeout > 0L); 29 metricsTracker.recordBorrowTimeoutStats(startTime); 30 throw createTimeoutException(startTime); 31 } 32 catch (InterruptedException e) { 33 Thread.currentThread().interrupt(); 34 throw new SQLException(poolName + " - Interrupted during connection acquisition", e); 35 } 36 finally { 37 /** 釋放鎖*/ 38 suspendResumeLock.release(); 39 } 40 }
核心步驟只有兩步,一個是呼叫ConcurrentBag的borrow方法借用一個PoolEntry物件,第二步呼叫呼叫PoolEntry的createProxyConnection方法動態生成代理connection物件。
這裡涉及到了兩個核心的類,分別是ConcurrentBag和PoolEntry
3、PoolEntry
PoolEntry顧名思義是連線池的節點,實際也可以看作是一個Connection物件的封裝,連線池中儲存的連線就是以PoolEntry的方式進行儲存。
PoolEntry內部屬性如下:
屬性 | 型別 | 描述 |
connection | Connection | 資料庫連線 |
lastAccessed | long | 上一次訪問時間 |
lastBorrowed | long | 上一次借出時間 |
state | volatile int | 當前狀態 |
evict | volatile boolean | 是否該丟棄 |
openStatements | FastList | 開啟的statement集合 |
hikariPool | HikariPool | 關聯的HikariPool物件 |
isReadOnly | boolean | 是否只讀 |
isAutoCommit | boolean | 是否自動提交 |
4、ConcurrentBag
ConcurrentBag直意就是併發包,本質就是連線池的主體,儲存連線的封裝物件PoolEntry,另外做了併發控制來解決連線池的併發問題。
ConcurrentBag的內部屬性如下:
屬性 | 型別 | 描述 |
sharedList | CopyOnWriteArrayList | 存放著狀態為未使用、使用中和保留中三種狀態的PoolEntry物件 |
weakThreadLocals | boolean | 是否使用弱引用 |
threadList | ThreadLocal<List<Object>> | 存放當前執行緒的PoolEntry物件,如果當前執行緒再次借用則優先會從該列表中獲取,但是也可能會被其他執行緒借走 |
listener | IBagStateListener | 新增元素的監聽器 |
waiters | AtomicInteger | 當前等待的執行緒數 |
closed | volatile boolean | 是否關閉標識 |
handOffQueue | SynchronousQueue | 無容量阻塞佇列,插入操作需要等待刪除操作,刪除操作無需等待插入操作 |
5、從ConcurrentBag借出一個元素
ConcurrentBag實現了borrow方法,意思是從併發集合中借出一個元素,對於連線池而言實際就是從連線池中獲取一個連線,原始碼如下:
1 /** 借出一個物件 */ 2 public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException 3 { 4 /** 1.從ThreadLocal中獲取當前執行緒繫結的物件集合 */ 5 final List<Object> list = threadList.get(); 6 /** 1.1.如果當前執行緒變數中存在就直接從list中返回一個*/ 7 for (int i = list.size() - 1; i >= 0; i--) { 8 final Object entry = list.remove(i); 9 @SuppressWarnings("unchecked") 10 final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry; 11 if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { 12 return bagEntry; 13 } 14 } 15 16 /** 2.當前等待物件數量自增1 */ 17 final int waiting = waiters.incrementAndGet(); 18 try { 19 /** 3.遍歷當前快取的sharedList, 如果當前狀態為未使用,則通過CAS修改為已使用*/ 20 for (T bagEntry : sharedList) { 21 if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { 22 /** 4.如果當前等待執行緒不止1個,則給監聽中新增一個任務 */ 23 if (waiting > 1) { 24 listener.addBagItem(waiting - 1); 25 } 26 return bagEntry; 27 } 28 } 29 30 /** 4.如果當前快取的sharedList為空或者都在使用中,那麼給listener新增一個任務*/ 31 listener.addBagItem(waiting); 32 33 timeout = timeUnit.toNanos(timeout); 34 do { 35 final long start = currentTime(); 36 /** 5.從阻塞佇列中等待超時獲取元素 */ 37 final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS); 38 /** 6.如果獲取元素失敗或者獲取元素且使用成功則均返回 */ 39 if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { 40 return bagEntry; 41 } 42 43 timeout -= elapsedNanos(start); 44 } while (timeout > 10_000); 45 46 return null; 47 } 48 finally { 49 /** 6.等待執行緒數自減1 */ 50 waiters.decrementAndGet(); 51 } 52 }
從原始碼中可以發現方法中共有三個地方出現了return bagEntry,所以可以看出ConcurrentBag借出元素的地方是有三個來源的
第一步:從ThreadLocal中獲取
每個執行緒從ConcurrentBag中借出連線時都會建立一個ThreadLocal物件,值為一個List,預設大小為16,每次客戶端從ConcurrentBag中獲取連線時都會優先從ThreadLocal中嘗試獲取連線,獲取失敗才會走下一步獲取。
當客戶端將連線歸還給ConcurrentBag時,首先判斷當前是否有其他客戶端等待連線,如果有其他客戶端等待那麼就將連線給其他客戶端,如果沒有客戶端等待那麼將連線存入ThreadLocal中,每個ThreadLocal最多會儲存50個連線
Tips:使用ThreadLocal可能會存在記憶體洩露的風險,所以ConcurrentBag內部有一個屬性為booleal weakThreadLocals,當值為true時則ThreadLocal中的引用均是弱引用,在記憶體不足時GC的時候會被回收,避免了出現記憶體洩露的問題。
第二步:從sharedList中獲取
從ThreadLocal中獲取連線失敗之後,會再次嘗試從sharedList中獲取,sharedList集合存在初始化的PoolEntry。在ConcurrentBag初始化的,會初始化指定數量的PoolEntry物件存入sharedList,原始碼如下:
ConcurrentBag建構函式如下:
1 /** ConcurrentHand 2 * IBagStateListener bag狀態監聽器,HikariPool實現了IBagStateListener介面 3 * 所以構造器傳入的listener實際就是HikariPool物件 4 * */ 5 public ConcurrentBag(final IBagStateListener listener) 6 { 7 this.listener = listener; 8 //是否使用弱引用 9 this.weakThreadLocals = useWeakThreadLocals(); 10 //初始化阻塞佇列 11 this.handoffQueue = new SynchronousQueue<>(true); 12 //初始化等待連線數 13 this.waiters = new AtomicInteger(); 14 //初始化sharedList 15 this.sharedList = new CopyOnWriteArrayList<>(); 16 if (weakThreadLocals) { 17 this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16)); 18 } 19 else { 20 this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16)); 21 } 22 }
HikariPool內部屬性包含了ConcurrentBag物件,在HikariPool初始化時會建立ConcurrentBag物件,所以ConcurrentBag的建構函式是在HikariPool初始化時呼叫,HikariPool建構函式如下:
1 public HikariPool(final HikariConfig config) 2 { 3 super(config); 4 5 //初始化ConcurrentBag物件 6 this.connectionBag = new ConcurrentBag<>(this); 7 //建立SuspendResumeLock物件 8 this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK; 9 /** 初始化執行緒池,houseKeeping可以理解為保持空間充足的意思,空間也就是連線池,該執行緒池的作用就是保持連線池中合適的連線數的作用 */ 10 this.houseKeepingExecutorService = initializeHouseKeepingExecutorService(); 11 12 /** 設定屬性*/ 13 checkFailFast(); 14 15 if (config.getMetricsTrackerFactory() != null) { 16 setMetricsTrackerFactory(config.getMetricsTrackerFactory()); 17 } 18 else { 19 setMetricRegistry(config.getMetricRegistry()); 20 } 21 22 setHealthCheckRegistry(config.getHealthCheckRegistry()); 23 24 handleMBeans(this, true); 25 26 ThreadFactory threadFactory = config.getThreadFactory(); 27 /** 根據配置的最大連線數,建立連結串列型別阻塞佇列 */ 28 LinkedBlockingQueue<Runnable> addQueue = new LinkedBlockingQueue<>(config.getMaximumPoolSize()); 29 this.addConnectionQueue = unmodifiableCollection(addQueue); 30 /** 初始化建立連線執行緒池*/ 31 this.addConnectionExecutor = createThreadPoolExecutor(addQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardPolicy()); 32 /** 初始化關閉連線執行緒池*/ 33 this.closeConnectionExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); 34 35 this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService); 36 /** 建立保持連線池連線數量的任務*/ 37 this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS); 38 39 if (Boolean.getBoolean("com.zaxxer.hikari.blockUntilFilled") && config.getInitializationFailTimeout() > 1) { 40 addConnectionExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); 41 addConnectionExecutor.setMaximumPoolSize(Runtime.getRuntime().availableProcessors()); 42 43 final long startTime = currentTime(); 44 while (elapsedMillis(startTime) < config.getInitializationFailTimeout() && getTotalConnections() < config.getMinimumIdle()) { 45 quietlySleep(MILLISECONDS.toMillis(100)); 46 } 47 48 addConnectionExecutor.setCorePoolSize(1); 49 addConnectionExecutor.setMaximumPoolSize(1); 50 } 51 }
這裡有一個定時任務houseKeeperTask,該定時任務的作用是定時檢測連線池中連線的數量,執行的內容就是HouseKeep的run方法,邏輯如下:
1 private final class HouseKeeper implements Runnable 2 { 3 private volatile long previous = plusMillis(currentTime(), -housekeepingPeriodMs); 4 5 @Override 6 public void run() 7 { 8 try { 9 /** 讀取連線池配置 */ 10 connectionTimeout = config.getConnectionTimeout(); 11 validationTimeout = config.getValidationTimeout(); 12 leakTaskFactory.updateLeakDetectionThreshold(config.getLeakDetectionThreshold()); 13 catalog = (config.getCatalog() != null && !config.getCatalog().equals(catalog)) ? config.getCatalog() : catalog; 14 15 final long idleTimeout = config.getIdleTimeout(); 16 final long now = currentTime(); 17 18 // Detect retrograde time, allowing +128ms as per NTP spec. 19 if (plusMillis(now, 128) < plusMillis(previous, housekeepingPeriodMs)) { 20 logger.warn("{} - Retrograde clock change detected (housekeeper delta={}), soft-evicting connections from pool.", 21 poolName, elapsedDisplayString(previous, now)); 22 previous = now; 23 /** 關閉連線池中需要被丟棄的連線 */ 24 softEvictConnections(); 25 return; 26 } 27 else if (now > plusMillis(previous, (3 * housekeepingPeriodMs) / 2)) { 28 // No point evicting for forward clock motion, this merely accelerates connection retirement anyway 29 logger.warn("{} - Thread starvation or clock leap detected (housekeeper delta={}).", poolName, elapsedDisplayString(previous, now)); 30 } 31 32 previous = now; 33 34 String afterPrefix = "Pool "; 35 if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) { 36 logPoolState("Before cleanup "); 37 afterPrefix = "After cleanup "; 38 39 /** 獲取當前連線池中已經不是使用中的連線集合 */ 40 final List<PoolEntry> notInUse = connectionBag.values(STATE_NOT_IN_USE); 41 int toRemove = notInUse.size() - config.getMinimumIdle(); 42 for (PoolEntry entry : notInUse) { 43 /** 當前空閒的連線如果超過最大空閒時間idleTimeout則關閉空閒連線 */ 44 if (toRemove > 0 && elapsedMillis(entry.lastAccessed, now) > idleTimeout && connectionBag.reserve(entry)) { 45 closeConnection(entry, "(connection has passed idleTimeout)"); 46 toRemove--; 47 } 48 } 49 } 50 51 logPoolState(afterPrefix); 52 /** 填充連線池,保持連線池數量至少保持minimum個連線數量 */ 53 fillPool(); // Try to maintain minimum connections 54 } 55 catch (Exception e) { 56 logger.error("Unexpected exception in housekeeping task", e); 57 } 58 } 59 }
該定時任務主要是為了維護連線池中連線的數量,首先需要將被標記為需要丟棄的連線進行關閉,然後將空閒超時的連線進行關閉,最後當連線池中的連線少於最小值時就需要對連線池進行補充連線的操作。所以在初始化連線池時,初始化連線的操作就是在fillPool方法中實現的。fillPool方法原始碼如下:
1 /** 填充連線池 */ 2 private synchronized void fillPool() 3 { 4 /** 5 * 計算需要新增的連線數量 6 * config.getMaximumPoolSize - getTotalConnections() 表示連線池最大值-當前連線的數量=最多還可以建立的連線數 7 * config.getMinimumIdle() - getIdleConnections() 表示連線池最小值 - 當前空閒的連線數= 當前可以連線數 8 * Math.min計算得到最少需要的連線數 - addConnectionQueue.size() = 還需要建立連線的任務數量 9 * */ 10 final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections()) 11 - addConnectionQueue.size(); 12 for (int i = 0; i < connectionsToAdd; i++) { 13 /** 向建立連線執行緒池中提交建立連線的任務 */ 14 addConnectionExecutor.submit((i < connectionsToAdd - 1) ? poolEntryCreator : postFillPoolEntryCreator); 15 } 16 }
先計算需要建立的連線數量,向建立連線的執行緒池中提交任務 poolEntryCreator,建立最後一個任務時建立的是postFillPoolEntryCreator, 兩者沒有本質的區別,只是列印的日誌不一樣而已.
PoolEntryCreator建立PoolEntry物件的邏輯如下:
1 /** 建立PoolEntry物件執行緒 */ 2 private final class PoolEntryCreator implements Callable<Boolean> { 3 /** 4 * 日誌字首 5 */ 6 private final String loggingPrefix; 7 8 PoolEntryCreator(String loggingPrefix) { 9 this.loggingPrefix = loggingPrefix; 10 } 11 12 @Override 13 public Boolean call() { 14 long sleepBackoff = 250L; 15 /** 1.當前連線池狀態正常並且需求建立連線時 */ 16 while (poolState == POOL_NORMAL && shouldCreateAnotherConnection()) { 17 /** 2.建立PoolEntry物件 */ 18 final PoolEntry poolEntry = createPoolEntry(); 19 if (poolEntry != null) { 20 /** 3.將PoolEntry物件新增到ConcurrentBag物件中的sharedList中 */ 21 connectionBag.add(poolEntry); 22 logger.debug("{} - Added connection {}", poolName, poolEntry.connection); 23 if (loggingPrefix != null) { 24 logPoolState(loggingPrefix); 25 } 26 return Boolean.TRUE; 27 } 28 /** 睡眠指定時間*/ 29 quietlySleep(sleepBackoff); 30 sleepBackoff = Math.min(SECONDS.toMillis(10), Math.min(connectionTimeout, (long) (sleepBackoff * 1.5))); 31 } 32 // Pool is suspended or shutdown or at max size 33 return Boolean.FALSE; 34 } 35 }
createPoolEntry方法邏輯如下:
1 /** 建立PoolEntry物件 */ 2 private PoolEntry createPoolEntry() 3 { 4 try { 5 /** 1.初始化PoolEntry物件,會先建立Connection物件傳入PoolEntry的建構函式中 */ 6 final PoolEntry poolEntry = newPoolEntry(); 7 /** 2.獲取連線最大生命週期時長 */ 8 final long maxLifetime = config.getMaxLifetime(); 9 if (maxLifetime > 0) { 10 /** 3.獲取一個隨機值,防止PoolEntry同時建立同時被銷燬,新增隨機值錯開時間差 */ 11 final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0; 12 final long lifetime = maxLifetime - variance; 13 /** 4.給PoolEntry新增定時任務,當PoolEntry物件達到最大生命週期時間後觸發定時任務將連線標記為被拋棄 */ 14 poolEntry.setFutureEol(houseKeepingExecutorService.schedule( 15 () -> { 16 /** 5.達到最大生命週期,拋棄連線 */ 17 if (softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */)) { 18 /** 6.丟棄一個連線之後,呼叫addBagItem補充新的PoolEntry物件 */ 19 addBagItem(connectionBag.getWaitingThreadCount()); 20 } 21 }, 22 lifetime, MILLISECONDS)); 23 } 24 25 return poolEntry; 26 } 27 /** 異常捕獲*/ 28 catch (ConnectionSetupException e) { 29 if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently 30 logger.error("{} - Error thrown while acquiring connection from data source", poolName, e.getCause()); 31 lastConnectionFailure.set(e); 32 } 33 return null; 34 } 35 catch (SQLException e) { 36 if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently 37 logger.debug("{} - Cannot acquire connection from data source", poolName, e); 38 lastConnectionFailure.set(new ConnectionSetupException(e)); 39 } 40 return null; 41 } 42 catch (Exception e) { 43 if (poolState == POOL_NORMAL) { // we check POOL_NORMAL to avoid a flood of messages if shutdown() is running concurrently 44 logger.error("{} - Error thrown while acquiring connection from data source", poolName, e); 45 lastConnectionFailure.set(new ConnectionSetupException(e)); 46 } 47 return null; 48 } 49 }
首先建立一個新的PoolEntry物件,PoolEntry構造時會建立Connection物件,另外如果連線設定了最大生命週期時長,那麼需要給每個PoolEntry新增定時任務,為了防止多個PoolEntry同時建立同時被關閉,所以每個PoolEntry的最大生命週期時間都不一樣。當PoolEntry達到最大生命週期後會觸發softEvictConnection方法,將PoolEntry標記為需要被丟棄,另外由於拋棄了PoolEntry物件,所以需要重新呼叫addBagItem方法對PoolEntry物件進行補充。
第三步:通過IBagStateListener建立新的元素
由於第二步可知,IBagStateListener主要有一個addBagItem方法,HikariPool實現了addBagItem方法,方法原始碼如下:
1 public void addBagItem(final int waiting) 2 { 3 /** 判斷是否需要建立連線 */ 4 final boolean shouldAdd = waiting - addConnectionQueue.size() >= 0; // Yes, >= is intentional. 5 if (shouldAdd) { 6 /** 向建立連線執行緒池中提交建立連線的任務 */ 7 addConnectionExecutor.submit(poolEntryCreator); 8 } 9 }
總結:
從ConcurrentBag中獲取連線一共分成三步,首先從當前執行緒的ThreadLocal中獲取,如果有直接返回一個連線,如果ThreadLocal中沒有則從sharedList中獲取,sharedList可以理解為ConcurrentBag快取的連線池,每當建立了一個PoolEntry物件之後都會新增到sharedList中去,如果sharedList中的連線狀態都不是可用狀態,此時就需要通過IBagStateListener提交一個建立連線的任務,交給建立連線的執行緒池去執行,建立新的連線。
新的連線建立成功之後會將PoolEntry物件新增到無容量的阻塞佇列handoffQueue中,等待連線的執行緒不斷嘗試從handoffQueue佇列中獲取連線直到成功獲取或者超時返回。
2.2、釋放連線
當客戶端釋放連線時會呼叫collection的close方法,Hikari中的Connection使用的是代理連線ProxyConnection物件,呼叫close方法時會呼叫關聯的PoolEntry物件的回收方法recycle方法,PoolEntry的recycle方法原始碼如下:
1 void recycle(final long lastAccessed) 2 { 3 if (connection != null) { 4 this.lastAccessed = lastAccessed; 5 /** 呼叫HikariPool的recycle方法,回收當前PoolEntry物件 */ 6 hikariPool.recycle(this); 7 } 8 }
1 void recycle(final PoolEntry poolEntry) 2 { 3 metricsTracker.recordConnectionUsage(poolEntry); 4 /** 呼叫ConcurrentBag的回收方法 */ 5 connectionBag.requite(poolEntry); 6 }
1 /** 回收元素方法 */ 2 public void requite(final T bagEntry) 3 { 4 /** 1.設定狀態為未使用 */ 5 bagEntry.setState(STATE_NOT_IN_USE); 6 7 /** 2.如果當前存在等待執行緒,則優先將元素給等待執行緒 */ 8 for (int i = 0; waiters.get() > 0; i++) { 9 /** 2.1.將元素新增到無界阻塞佇列中,等待其他執行緒獲取 */ 10 if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) { 11 return; 12 } 13 else if ((i & 0xff) == 0xff) { 14 parkNanos(MICROSECONDS.toNanos(10)); 15 } 16 else { 17 /** 當前執行緒不再繼續執行 */ 18 yield(); 19 } 20 } 21 /** 3.如果當前連線沒有被其他執行緒使用,則新增到當前執行緒的ThreadLocal中 */ 22 final List<Object> threadLocalList = threadList.get(); 23 if (threadLocalList.size() < 50) { 24 threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry); 25 } 26 }
回收連線最終會呼叫ConcurrentBag的requite方法,方法邏輯不復雜,首先將PoolEntry元素狀態設定為未使用,然後判斷當前是否存在等待連線的執行緒,如果存在則將連線加入到無界阻塞佇列中去,由等待連線的執行緒從阻塞佇列中去獲取;
如果當前沒有等待連線的執行緒,則將連線新增到本地執行緒變數ThreadLocal中,等待當前執行緒下次獲取連線時直接從ThreadLocal中獲取。
三、Hikari連線池高效能的原因?
1、採用自定義的FastList替代了ArrayList,FastList的get方法去除了範圍檢查rangeCheck邏輯,並且remove方法是從尾部開始掃描的,而並不是從頭部開始掃描的。因為Connection的開啟和關閉順序通常是相反的
2、初始化時建立了兩個HikariPool物件,一個採用final型別定義,避免在獲取連線時才初始化,因為獲取連線時才初始化就需要做同步處理
3、Hikari建立連線是通過javassist動態位元組碼生成技術建立的,效能更好
4、從連線池中獲取連線時對於同一個執行緒在threadLocal中添加了快取,同一執行緒獲取連線時沒有併發操作
5、Hikari最大的特點是在高併發的情況下儘量的減少鎖競爭