資料庫連線池之Druid原始碼解析
一、Druid的使用
1.1、Springboot專案整合druid
1.1.1、配置maven
<dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.15</version </dependency>
1.1.2、新增資料來源相關配置
1 spring: 2 datasource:
druid: 3 url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&zeroDataTimeBehavior=convertToNull&useSSL=false 4 username: root 5 password: root 6 type: com.alibaba.druid.pool.DruidDataSource 7 initialSize: 5 8 maxInactive: 10 9 minIdle: 5 10 timeBetweenEvictionRunsMillis: 5000 11 minEvictableIdleTimeMillis: 10000 12 filters: stat,wall 13 testOnBorrow: false
1.1.3、定義DruidConfig配置檔案
1 package com.lucky.test.config; 2 3 import com.alibaba.druid.pool.DruidDataSource; 4 import org.springframework.boot.context.properties.ConfigurationProperties; 5 import org.springframework.context.annotation.Configuration; 6 7 import javax.sql.DataSource; 8 9 /** 10 * @Auther: Lucky 11 * @Date: 2020/12/14 下午8:16 12 * @Desc: 13 */ 14 @Configuration 15 public class DruidConfig { 16 17 @ConfigurationProperties(prefix = "spring.datasource") 18 public DataSource druidDataSource(){ 19 return new DruidDataSource(); 20 } 21 }
定義了DruidDataSource資料來源的bean之後,專案中使用的就是資料來源就是DruidDataSource了
1.2、Druid資料來源的配置
配置項 | 案例值 | 描述 |
url |
jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&zeroDataTimeBehavior=convertToNull&useSSL=false |
資料庫連線地址 |
username
|
root | 資料庫連線使用者名稱 |
password | 123456 | 資料庫連線使用者密碼 |
initialSize | 10 | 連線池初始化連線數 |
minIdle | 1 | 連線池最小連線數 |
maxActive | 20 | 連線池最大活躍連線數 |
maxWait | 60000 | 客戶端獲取連線等待超時時間,單位為毫秒,此處的超時時間和建立連線超時時間是不一樣的,客戶端獲取連線超時有可能是建立連線超時,也有可能是當前連線數達到最大值並且其他客戶端正在使用,客戶端一直排隊等待可用連線超時了,所以儘量避免慢SQL,否則一旦可用連線被佔用了且都在執行慢SQL,就會導致其他客戶端長時間無法獲取連線而超時 |
timeBetweenEvictionRunsMillis | 60000 | 連線空閒檢測間隔時長,單位為毫秒,當連線長時間空閒時,有定時任務會間隔間隔一段時間檢測一次,如果發現連線空閒時間足夠長,則關閉連線 |
minEvictableIdleTimeMillis | 60000 | 連線最小生成時間,雖然空閒連線會被關閉,但是並非所有空閒的連線都會關閉,而是要看連線空閒了多長時間,比如配置了60000毫秒,那麼當連線空閒超過1分鐘時才會被關閉,否則可以繼續空閒等待客戶端 |
validationQuery | select 'X' | 檢測SQL |
testwhileIdle | false | 空閒的時候檢測執行validtionQuery嚴重連線是否有效,開啟會消耗效能 |
testOnReturn | false | 歸還連線時執行validationQuery驗證連線是否有效,開啟會消耗效能 |
poolPreparedStatements | true | 是否開啟Prepared快取,開啟會提高重複查詢的效率,但是會消耗一定的記憶體 |
maxOpenPreparedStatements | 20 | 每個Connection的prepared快取語句數量 |
二、Druid原始碼解析
連線池的主要作用是提供連線給應用程式,所以需要實現資料來源DataSource介面,Druid提供的資料來源為DruidDataSource實現了DataSource介面,核心邏輯實際就是實現了DataSource介面的getConnection方法,在分析getConnecction方法實現邏輯之前,首先需要了解DruidDataSource的主要屬性,分別如下:
1. /** 初始化連線數,預設為0 */ 2 protected volatile int initialSize = DEFAULT_INITIAL_SIZE; 3 /** 最大連線數,預設是8 */ 4 protected volatile int maxActive = DEFAULT_MAX_ACTIVE_SIZE; 5 /** 最小空閒連線數,預設是0 */ 6 protected volatile int minIdle = DEFAULT_MIN_IDLE; 7 /** 最大空閒連線數,預設數8 */ 8 protected volatile int maxIdle = DEFAULT_MAX_IDLE; 9 /** 最大等待超時時間, 預設為-1,表示不會超時 */ 10 protected volatile long maxWait = DEFAULT_MAX_WAIT;
DruidDataSource的getConnection方法邏輯如下:
1 /** 獲取資料庫連線*/ 2 public DruidPooledConnection getConnection() throws SQLException { 3 return getConnection(maxWait); 4 } 5 6 public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException { 7 /** 初始化*/ 8 init(); 9 /** 初始化過濾器*/ 10 if (filters.size() > 0) { 11 FilterChainImpl filterChain = new FilterChainImpl(this); 12 return filterChain.dataSource_connect(this, maxWaitMillis); 13 } else { 14 /** 直接獲取連線*/ 15 return getConnectionDirect(maxWaitMillis); 16 } 17 }
獲取資料庫連線時首先需要對連線池進行初始化,然後才能從連線池中獲取連線,分別對應了方法init方法和getConnectionDirect方法,兩個方法邏輯分別如下
2.1、連線池的初始化
init方法邏輯如下:
1 /** 連線池初始化 */ 2 public void init() throws SQLException { 3 /** 如果已經初始化直接返回 */ 4 if (inited) { 5 return; 6 } 7 8 final ReentrantLock lock = this.lock; 9 try { 10 /*** 加鎖處理 */ 11 lock.lockInterruptibly(); 12 } catch (InterruptedException e) { 13 throw new SQLException("interrupt", e); 14 } 15 try { 16 /** 1.建立資料來源ID */ 17 this.id = DruidDriver.createDataSourceId(); 18 19 /** 2.初始化過濾器 */ 20 for (Filter filter : filters) { 21 filter.init(this); 22 } 23 24 /** 25 * 3.maxActive、maxActive、minIdle、initialSize等引數校驗以及JDBC等物件初始化 26 * */ 27 28 /** 4.初始化連線陣列,陣列大小為最大連線數*/ 29 connections = new DruidConnectionHolder[maxActive]; 30 31 SQLException connectError = null; 32 33 /** 5.根據初始化大小,初始化資料庫連線*/ 34 for (int i = 0, size = getInitialSize(); i < size; ++i) { 35 //1.建立連線 36 Connection conn = createPhysicalConnection(); 37 //2.將連線封裝成DruidConnectionHolder物件 38 DruidConnectionHolder holder = new DruidConnectionHolder(this, conn); 39 //3.將連線新增到連線陣列中 40 connections[poolingCount] = holder; 41 incrementPoolingCount();//連線池中連線數自增+1 42 } 43 44 /** 建立並開啟日誌執行緒 */ 45 createAndLogThread(); 46 /** 建立並開啟建立連線執行緒*/ 47 createAndStartCreatorThread(); 48 /** 建立並開啟銷燬連線執行緒*/ 49 createAndStartDestroyThread(); 50 /** 等待 建立連線執行緒 和 銷燬連線執行緒 全部開啟才算初始化完成 */ 51 initedLatch.await(); 52 53 }finally { 54 /** 標記已經初始化*/ 55 inited = true; 56 /** 釋放鎖*/ 57 lock.unlock(); 58 } 59 }
連線池初始化的邏輯主要如下:
1、判斷是否已經初始化,如果已經初始化直接跳出;如果沒有初始化則繼續初始化
2、防止併發初始化需要加鎖處理
3、初始化過濾器並進行初始化引數校驗
4、初始化連線陣列,並根據配置的初始化大小建立指定數量的連線存入陣列中,初始化的連線數就是傳入的引數值initialSIze的值
5、建立並開啟建立連線和銷燬連線的執行緒
6、標記初始化完成並釋放鎖
連線池初始化時會建立執指定數量的連線,並存入陣列中。但是通常情況下連線池中的連線數量不是固定不變的,通常需要隨著併發量提高要增加,隨著併發量小而減少。所以在初始化的時候分別建立了建立連線的執行緒和銷燬連線的執行緒,用於動態的建立連線和銷燬連線,從而達到連線池動態的增刪連線的效果。
2.1.1、建立連線的執行緒createAndStartCreatorThread方法原始碼解析
1 protected void createAndStartCreatorThread() { 2 if (createScheduler == null) { 3 String threadName = "Druid-ConnectionPool-Create-" + System.identityHashCode(this); 4 createConnectionThread = new CreateConnectionThread(threadName); 5 createConnectionThread.start(); 6 return; 7 } 8 9 initedLatch.countDown(); 10 }
該方法的主要作用就是建立了一個建立連線的執行緒CreateConnectionThread物件,並且啟動了執行緒,所以核心邏輯就是需要分析該執行緒主要的流程,邏輯如下:
1 /** 建立連線執行緒*/ 2 public class CreateConnectionThread extends Thread { 3 4 public CreateConnectionThread(String name){ 5 super(name); 6 this.setDaemon(true); 7 } 8 9 public void run() { 10 initedLatch.countDown(); 11 for (;;) { 12 try { 13 lock.lockInterruptibly(); 14 } catch (InterruptedException e2) { 15 break; 16 } 17 18 try { 19 /** 20 * poolingCount:連線池中的空閒連線數量 21 * notEmptyWaitThreadCount:等待連線的執行緒數量 22 * 當連線足夠時,睡眠執行緒 23 * */ 24 // 必須存線上程等待,才建立連線 25 if (poolingCount >= notEmptyWaitThreadCount) { 26 empty.await(); 27 } 28 29 // 防止建立超過maxActive數量的連線 30 /** 31 * activeCount: 活躍的連線數 32 * maxActive: 最大執行緒數 33 * 當活躍連線數 + 空閒連線數 >= 最大連線數時 睡眠執行緒 34 * */ 35 if (activeCount + poolingCount >= maxActive) { 36 empty.await(); 37 continue; 38 } 39 40 } catch (InterruptedException e) { 41 break; 42 } finally { 43 lock.unlock(); 44 } 45 46 /** 47 * 當等待連線的執行緒超過空閒執行緒;並且總連線數沒有超過最大連線數時,建立新連線 48 * */ 49 Connection connection = null; 50 51 try { 52 /** 1.建立新連線 */ 53 connection = createPhysicalConnection(); 54 } catch (SQLException e) { 55 LOG.error("create connection error", e); 56 break; 57 } 58 59 if (connection == null) { 60 continue; 61 } 62 /** 2.將連線放入連線池中 */ 63 put(connection); 64 } 65 } 66 }
邏輯並不複雜,就是在一個死迴圈中不斷判斷當前連線數是否夠用,並且是否超過最大上限,如果滿足條件就建立新的連線,並且將連線新增到連線池中。
在這裡有一個Condition物件empty,該物件主要用於監視當前的連線池是否需要建立連線了,如果不需要建立連線則呼叫await進行等待,等待連線不足時進行喚醒。
當連線建立之後,會呼叫put方法將連線放到連線陣列中,邏輯如下:
1 protected void put(Connection connection) { 2 DruidConnectionHolder holder = null; 3 try { 4 /** 1.將連線封裝成功DruidConnectionHolder物件 */ 5 holder = new DruidConnectionHolder(DruidDataSource.this, connection); 6 } catch (SQLException ex) { 7 lock.lock(); 8 try { 9 /** 10 * createScheduler 是建立連線的執行緒池 11 * createTaskCount 是當前需要建立連線的任務個數 12 * 當執行緒池不為空時,任務個數減1 13 * */ 14 if (createScheduler != null) { 15 createTaskCount--; 16 } 17 } finally { 18 lock.unlock(); 19 } 20 LOG.error("create connection holder error", ex); 21 return; 22 } 23 24 lock.lock(); 25 try { 26 /** 2.存入連線池陣列中 */ 27 connections[poolingCount] = holder; 28 /** 3.空閒連線數poolingCount自增*/ 29 incrementPoolingCount(); 30 if (poolingCount > poolingPeak) { 31 /** 4.超過峰值則記錄連線數量的峰值 */ 32 poolingPeak = poolingCount; 33 poolingPeakTime = System.currentTimeMillis(); 34 } 35 /** 3.喚醒notEmpty,因為該連線是非初始化建立而是動態額外新增的,所以需要喚醒銷燬執行緒準備銷燬該連線 */ 36 notEmpty.signal(); 37 notEmptySignalCount++; 38 39 if (createScheduler != null) { 40 createTaskCount--; 41 42 if (poolingCount + createTaskCount < notEmptyWaitThreadCount // 43 && activeCount + poolingCount + createTaskCount < maxActive) { 44 /** 如果連線數還是不足,則繼續喚醒empty */ 45 emptySignal(); 46 } 47 } 48 } finally { 49 lock.unlock(); 50 } 51 } 52 53 /** 喚醒empty */ 54 private void emptySignal() { 55 if (createScheduler == null) { 56 empty.signal(); 57 return; 58 } 59 60 if (createTaskCount >= maxCreateTaskCount) { 61 return; 62 } 63 64 if (activeCount + poolingCount + createTaskCount >= maxActive) { 65 return; 66 } 67 68 createTaskCount++; 69 CreateConnectionTask task = new CreateConnectionTask();//建立連線的Task邏輯和CreateConneactionThread執行緒的邏輯完全一致 70 createScheduler.submit(task); 71 }
2.2.1.2、銷燬連線的執行緒createAndStartDestroyThread方法原始碼解析
銷燬連線的執行緒方法邏輯基本和建立連線的邏輯相反,主要邏輯如下:
1 /** 銷燬連線執行緒 */ 2 public class DestroyConnectionThread extends Thread { 3 4 public DestroyConnectionThread(String name){ 5 super(name); 6 this.setDaemon(true); 7 } 8 9 public void run() { 10 initedLatch.countDown(); 11 12 for (;;) { 13 // 從前面開始刪除 14 try { 15 if (closed) { 16 break; 17 } 18 /** 如果設定了檢查間隔,則睡眠執行緒指定時長,否則就預設睡眠1秒*/ 19 if (timeBetweenEvictionRunsMillis > 0) { 20 Thread.sleep(timeBetweenEvictionRunsMillis); 21 } else { 22 Thread.sleep(1000); // 23 } 24 25 if (Thread.interrupted()) { 26 break; 27 } 28 /** 執行銷燬連線的任務 */ 29 destoryTask.run(); 30 } catch (InterruptedException e) { 31 break; 32 } 33 } 34 } 35 36 }
銷燬連線的任務交給了DestroyTask來實現,邏輯如下:
1 /** 銷燬連線任務*/ 2 public class DestroyTask implements Runnable { 3 4 @Override 5 public void run() { 6 /** 1.銷燬超過最大空閒時間的連線 */ 7 shrink(true); 8 9 /** 2.強制回收超過超時時間的連線*/ 10 if (isRemoveAbandoned()) { 11 removeAbandoned(); 12 } 13 } 14 }
銷燬連線的任務主要有兩個核心邏輯:
1、銷燬空閒連線
當一個連線長時間沒有被使用,如果不及時清理就會造成資源浪費,所以需要定時檢查空閒時間過長的連線進行斷開連線銷燬
2、回收超時連線
當一個連線被一個執行緒長時間佔有沒有被歸還,有可能是程式出故障了或是有漏洞導致吃吃沒有歸還連線,這樣就可能會導致連線池中的連線不夠用,所以需要定時檢查霸佔連線時間過長的執行緒,如果超過規定時間沒有歸還連線,則強制回收該連線。
銷燬空閒連線邏輯如下:
1 /** 連線空閒時間,預設為30分鐘 */ 2 public static final long DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS = 1000L * 60L * 30L; 3 4 /** 銷燬空閒連線 */ 5 public void shrink(boolean checkTime) { 6 /** 1.需要從連線池中去除的連線列表 */ 7 final List<DruidConnectionHolder> evictList = new ArrayList<DruidConnectionHolder>(); 8 try { 9 lock.lockInterruptibly(); 10 } catch (InterruptedException e) { 11 return; 12 } 13 14 try { 15 /** 2.獲取需要去除的個數 */ 16 final int checkCount = poolingCount - minIdle; 17 final long currentTimeMillis = System.currentTimeMillis(); 18 for (int i = 0; i < checkCount; ++i) { 19 DruidConnectionHolder connection = connections[i]; 20 /** 是否校驗連線的空閒時間*/ 21 if (checkTime) { 22 long idleMillis = currentTimeMillis - connection.getLastActiveTimeMillis(); 23 /** 3.1.如果連線空閒時間超過設定的值,則去除*/ 24 if (idleMillis >= minEvictableIdleTimeMillis) { 25 evictList.add(connection); 26 } else { 27 break; 28 } 29 } else { 30 /** 3.2.如果不校驗時間,則按順序去除*/ 31 evictList.add(connection); 32 } 33 } 34 35 int removeCount = evictList.size(); 36 /** 4.從陣列中將多餘的連線移除*/ 37 if (removeCount > 0) { 38 System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount); 39 Arrays.fill(connections, poolingCount - removeCount, poolingCount, null); 40 poolingCount -= removeCount; 41 } 42 } finally { 43 lock.unlock(); 44 } 45 46 /** 5.依次斷開被移除的連線 */ 47 for (DruidConnectionHolder item : evictList) { 48 Connection connection = item.getConnection(); 49 JdbcUtils.close(connection); 50 destroyCount.incrementAndGet(); 51 } 52 }
回收超時連線邏輯如下:
1 /** 強制回收連線 */ 2 public int removeAbandoned() { 3 int removeCount = 0; 4 5 long currrentNanos = System.nanoTime(); 6 7 /** 1.定義需要回收的連線列表*/ 8 List<DruidPooledConnection> abandonedList = new ArrayList<DruidPooledConnection>(); 9 10 synchronized (activeConnections) { 11 Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator(); 12 13 for (; iter.hasNext();) { 14 DruidPooledConnection pooledConnection = iter.next(); 15 if (pooledConnection.isRunning()) { 16 continue; 17 } 18 long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000); 19 /** 2.遍歷判斷超時未回收的連線,並加入列表中 */ 20 if (timeMillis >= removeAbandonedTimeoutMillis) { 21 iter.remove(); 22 pooledConnection.setTraceEnable(false); 23 abandonedList.add(pooledConnection); 24 } 25 } 26 } 27 28 if (abandonedList.size() > 0) { 29 /** 3.遍歷回收連線列表,進行連接回收*/ 30 for (DruidPooledConnection pooledConnection : abandonedList) { 31 synchronized (pooledConnection) { 32 if (pooledConnection.isDisable()) { 33 continue; 34 } 35 } 36 /** 3.1.強制斷開連線*/ 37 JdbcUtils.close(pooledConnection); 38 pooledConnection.abandond(); 39 removeAbandonedCount++; 40 removeCount++; 41 42 /** 3.2.日誌列印*/ 43 if (isLogAbandoned()) { 44 StringBuilder buf = new StringBuilder(); 45 buf.append("abandon connection, owner thread: "); 46 buf.append(pooledConnection.getOwnerThread().getName()); 47 buf.append(", connected time nano: "); 48 buf.append(pooledConnection.getConnectedTimeNano()); 49 buf.append(", open stackTrace\n"); 50 51 StackTraceElement[] trace = pooledConnection.getConnectStackTrace(); 52 for (int i = 0; i < trace.length; i++) { 53 buf.append("\tat "); 54 buf.append(trace[i].toString()); 55 buf.append("\n"); 56 } 57 58 LOG.error(buf.toString()); 59 } 60 } 61 } 62 return removeCount; 63 }
2.2、連線池中獲取連線
1 /** 從連線池中獲取連線 */ 2 public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException { 3 int notFullTimeoutRetryCnt = 0; 4 for (;;) { 5 // handle notFullTimeoutRetry 6 DruidPooledConnection poolableConnection; 7 try { 8 /** 1.獲取連線 */ 9 poolableConnection = getConnectionInternal(maxWaitMillis); 10 } catch (GetConnectionTimeoutException ex) { 11 if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) { 12 notFullTimeoutRetryCnt++; 13 if (LOG.isWarnEnabled()) { 14 LOG.warn("not full timeout retry : " + notFullTimeoutRetryCnt); 15 } 16 continue; 17 } 18 throw ex; 19 } 20 21 /** 2.判斷獲取的連線是否有效 */ 22 if (isTestOnBorrow()) { 23 boolean validate = testConnectionInternal(poolableConnection.getConnection()); 24 if (!validate) { 25 if (LOG.isDebugEnabled()) { 26 LOG.debug("skip not validate connection."); 27 } 28 /** 2.1 連線無效則拋棄連線 */ 29 Connection realConnection = poolableConnection.getConnection(); 30 discardConnection(realConnection); 31 continue; 32 } 33 } else { 34 Connection realConnection = poolableConnection.getConnection(); 35 if (realConnection.isClosed()) { 36 discardConnection(null); // 傳入null,避免重複關閉 37 continue; 38 } 39 /** 3.如果沒有判斷連線有效性,則判斷該連線是否空閒*/ 40 if (isTestWhileIdle()) { 41 final long currentTimeMillis = System.currentTimeMillis(); 42 final long lastActiveTimeMillis = poolableConnection.getConnectionHolder().getLastActiveTimeMillis(); 43 final long idleMillis = currentTimeMillis - lastActiveTimeMillis; 44 long timeBetweenEvictionRunsMillis = this.getTimeBetweenEvictionRunsMillis(); 45 if (timeBetweenEvictionRunsMillis <= 0) { 46 timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS; 47 } 48 /** 4.如連線空閒時間過長,則強制校驗連線的有效性 */ 49 if (idleMillis >= timeBetweenEvictionRunsMillis) { 50 boolean validate = testConnectionInternal(poolableConnection.getConnection()); 51 if (!validate) { 52 if (LOG.isDebugEnabled()) { 53 LOG.debug("skip not validate connection."); 54 } 55 discardConnection(realConnection); 56 continue; 57 } 58 } 59 } 60 } 61 /** 4.給連線新增監聽,超過時間未歸還,則強制回收該連線*/ 62 if (isRemoveAbandoned()) { 63 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); 64 poolableConnection.setConnectStackTrace(stackTrace); 65 poolableConnection.setConnectedTimeNano(); 66 poolableConnection.setTraceEnable(true); 67 68 synchronized (activeConnections) { 69 activeConnections.put(poolableConnection, PRESENT); 70 } 71 } 72 /** 5.設定是否自動提交 */ 73 if (!this.isDefaultAutoCommit()) { 74 poolableConnection.setAutoCommit(false); 75 } 76 return poolableConnection; 77 } 78 }
獲取連線的邏輯步驟不多,首先是從連線池中獲取連線,獲取到連線之後根據配置項判斷是否需要對連線進行有效性檢測,防止獲取到了一個無效的連線。
獲取連線的方法getConnectionInternal方法邏輯如下:
1 /** 獲取連線*/ 2 private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException { 3 /** 1.連線池狀態判斷*/ 4 if (closed) { 5 connectErrorCount.incrementAndGet(); 6 throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis)); 7 } 8 9 if (!enable) { 10 connectErrorCount.incrementAndGet(); 11 throw new DataSourceDisableException(); 12 } 13 14 final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait); 15 final int maxWaitThreadCount = getMaxWaitThreadCount(); 16 17 DruidConnectionHolder holder; 18 try { 19 lock.lockInterruptibly(); 20 } catch (InterruptedException e) { 21 connectErrorCount.incrementAndGet(); 22 throw new SQLException("interrupt", e); 23 } 24 25 try { 26 /** 2.判斷等待獲取連線的執行緒是否超過最大值 */ 27 if (maxWaitThreadCount > 0) { 28 if (notEmptyWaitThreadCount >= maxWaitThreadCount) { 29 connectErrorCount.incrementAndGet(); 30 throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count " 31 + lock.getQueueLength()); 32 } 33 } 34 35 connectCount++; 36 if (maxWait > 0) { 37 /** 3.1 如果設定超時時間,則堵塞指定時長獲取連線*/ 38 holder = pollLast(nanos); 39 } else { 40 /** 3.2 如果沒有設定超時時間,則堵塞獲取連線*/ 41 holder = takeLast(); 42 } 43 44 if (holder != null) { 45 activeCount++; 46 if (activeCount > activePeak) { 47 activePeak = activeCount; 48 activePeakTime = System.currentTimeMillis(); 49 } 50 } 51 } catch (InterruptedException e) { 52 connectErrorCount.incrementAndGet(); 53 throw new SQLException(e.getMessage(), e); 54 } catch (SQLException e) { 55 connectErrorCount.incrementAndGet(); 56 throw e; 57 } finally { 58 lock.unlock(); 59 } 60 61 /** 4.如果獲取不到連線,則列印錯誤日誌並拋異常 */ 62 if (holder == null) { 63 long waitNanos = waitNanosLocal.get(); 64 65 StringBuilder buf = new StringBuilder(); 66 buf.append("wait millis ")// 67 .append(waitNanos / (1000 * 1000))// 68 .append(", active " + activeCount)// 69 .append(", maxActive " + maxActive)// 70 ; 71 72 List<JdbcSqlStatValue> sqlList = this.getDataSourceStat().getRuningSqlList(); 73 for (int i = 0; i < sqlList.size(); ++i) { 74 if (i != 0) { 75 buf.append('\n'); 76 } else { 77 buf.append(", "); 78 } 79 JdbcSqlStatValue sql = sqlList.get(i); 80 buf.append("runningSqlCount "); 81 buf.append(sql.getRunningCount()); 82 buf.append(" : "); 83 buf.append(sql.getSql()); 84 } 85 86 String errorMessage = buf.toString(); 87 88 if (this.createError != null) { 89 throw new GetConnectionTimeoutException(errorMessage, createError); 90 } else { 91 throw new GetConnectionTimeoutException(errorMessage); 92 } 93 } 94 95 holder.incrementUseCount(); 96 /** 5.構造連線物件並返回 */ 97 DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder); 98 return poolalbeConnection; 99 }
獲取連線主要看有沒有設定超時時間,如果設定了超時時間則呼叫pollLast方法進行嘗試獲取連線,超時沒有獲取則返回空;takeLast方法是一直堵塞當前執行緒直到獲取連線成功才會返回。
1 /** 一直堵塞獲取連線*/ 2 DruidConnectionHolder takeLast() throws InterruptedException, SQLException { 3 try { 4 /** 1.如果當前空閒連線數為0 */ 5 while (poolingCount == 0) { 6 /** 2.傳送訊號等待建立連線*/ 7 emptySignal(); // send signal to CreateThread create connection 8 notEmptyWaitThreadCount++; 9 if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) { 10 notEmptyWaitThreadPeak = notEmptyWaitThreadCount; 11 } 12 try { 13 /** 等待訊號*/ 14 notEmpty.await(); // signal by recycle or creator 15 } finally { 16 notEmptyWaitThreadCount--; 17 } 18 notEmptyWaitCount++; 19 20 if (!enable) { 21 connectErrorCount.incrementAndGet(); 22 throw new DataSourceDisableException(); 23 } 24 } 25 } catch (InterruptedException ie) { 26 notEmpty.signal(); // propagate to non-interrupted thread 27 notEmptySignalCount++; 28 throw ie; 29 } 30 31 decrementPoolingCount(); 32 /** 獲取連線池最後一位的連線*/ 33 DruidConnectionHolder last = connections[poolingCount]; 34 /** 將陣列對應位置置為空*/ 35 connections[poolingCount] = null; 36 return last; 37 }
1 private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException { 2 long estimate = nanos; 3 4 for (;;) { 5 if (poolingCount == 0) { 6 emptySignal(); // send signal to CreateThread create connection 7 8 if (estimate <= 0) { 9 waitNanosLocal.set(nanos - estimate); 10 return null; 11 } 12 13 notEmptyWaitThreadCount++; 14 if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) { 15 notEmptyWaitThreadPeak = notEmptyWaitThreadCount; 16 } 17 18 try { 19 long startEstimate = estimate; 20 /** 等待訊號指定時長*/ 21 estimate = notEmpty.awaitNanos(estimate); // signal by 22 // recycle or 23 // creator 24 notEmptyWaitCount++; 25 notEmptyWaitNanos += (startEstimate - estimate); 26 27 if (!enable) { 28 connectErrorCount.incrementAndGet(); 29 throw new DataSourceDisableException(); 30 } 31 } catch (InterruptedException ie) { 32 notEmpty.signal(); // propagate to non-interrupted thread 33 notEmptySignalCount++; 34 throw ie; 35 } finally { 36 notEmptyWaitThreadCount--; 37 } 38 39 if (poolingCount == 0) { 40 if (estimate > 0) { 41 continue; 42 } 43 44 waitNanosLocal.set(nanos - estimate); 45 return null; 46 } 47 } 48 49 decrementPoolingCount(); 50 DruidConnectionHolder last = connections[poolingCount]; 51 connections[poolingCount] = null; 52 53 return last; 54 } 55 }
takeLast和pollLast的邏輯基本上一直,主要是看等待連線時是一直等待還是超時等待,一般都會設定超時時間,防止程式一直堵塞著。這裡又使用到了emptySignal和notEmptySignal, 後面仔細分析。
2.3、連線歸還到連線池
當程式使用完連線需要將連線歸還到執行緒池,通過會執行connection.close方法進行關閉,Druid中的連線物件為DruidPooledConnection,close方法中執行了回收的方法recycle(),該方法會將連接回收到連線池中,邏輯如下:
1 public void recycle() throws SQLException { 2 if (this.disable) { 3 return; 4 } 5 6 DruidConnectionHolder holder = this.holder; 7 if (holder == null) { 8 if (dupCloseLogEnable) { 9 LOG.error("dup close"); 10 } 11 return; 12 } 13 14 if (!this.abandoned) { 15 /** 獲取資料來源頭像 */ 16 DruidAbstractDataSource dataSource = holder.getDataSource(); 17 /** 執行資料來源的recycle方法進行連接回收*/ 18 dataSource.recycle(this); 19 } 20 21 this.holder = null; 22 conn = null; 23 transactionInfo = null; 24 closed = true; 25 }
主要步驟為先獲取該連線所屬的資料來源物件,然後直接執行DataSource物件的recycle方法進行連線的回收,DruidDataSource中的recycle方法邏輯如下:
1 /** 2 * 回收連線 3 */ 4 protected void recycle(DruidPooledConnection pooledConnection) throws SQLException { 5 final DruidConnectionHolder holder = pooledConnection.getConnectionHolder(); 6 7 if (holder == null) { 8 LOG.warn("connectionHolder is null"); 9 return; 10 } 11 12 final Connection physicalConnection = holder.getConnection(); 13 14 if (pooledConnection.isTraceEnable()) { 15 synchronized (activeConnections) { 16 if (pooledConnection.isTraceEnable()) { 17 Object oldInfo = activeConnections.remove(pooledConnection); 18 if (oldInfo == null) { 19 if (LOG.isWarnEnabled()) { 20 LOG.warn("remove abandonded failed. activeConnections.size " + activeConnections.size()); 21 } 22 } 23 pooledConnection.setTraceEnable(false); 24 } 25 } 26 } 27 28 final boolean isAutoCommit = holder.isUnderlyingAutoCommit(); 29 final boolean isReadOnly = holder.isUnderlyingReadOnly(); 30 final boolean testOnReturn = this.isTestOnReturn(); 31 32 try { 33 // check need to rollback? 34 if ((!isAutoCommit) && (!isReadOnly)) { 35 pooledConnection.rollback(); 36 } 37 38 //校驗回收執行緒和獲取執行緒是否一致,並對連線持有物件進行重置 39 boolean isSameThread = pooledConnection.getOwnerThread() == Thread.currentThread(); 40 if (!isSameThread) { 41 synchronized (pooledConnection) { 42 holder.reset(); 43 } 44 } else { 45 holder.reset(); 46 } 47 48 if (holder.isDiscard()) { 49 return; 50 } 51 52 /** 校驗連線*/ 53 if (testOnReturn) { 54 boolean validate = testConnectionInternal(physicalConnection); 55 if (!validate) { 56 JdbcUtils.close(physicalConnection); 57 58 destroyCount.incrementAndGet(); 59 60 lock.lock(); 61 try { 62 activeCount--; 63 closeCount++; 64 } finally { 65 lock.unlock(); 66 } 67 return; 68 } 69 } 70 71 if (!enable) { 72 /** 如果連線池不可用則丟棄連線*/ 73 discardConnection(holder.getConnection()); 74 return; 75 } 76 77 final long lastActiveTimeMillis = System.currentTimeMillis(); 78 lock.lockInterruptibly(); 79 try { 80 /** 統計資料修改*/ 81 activeCount--; 82 closeCount++; 83 /** 將連線新增到連線池陣列的尾部 */ 84 putLast(holder, lastActiveTimeMillis); 85 recycleCount++; 86 } finally { 87 lock.unlock(); 88 } 89 } catch (Throwable e) { 90 holder.clearStatementCache(); 91 92 if (!holder.isDiscard()) { 93 this.discardConnection(physicalConnection); 94 holder.setDiscard(true); 95 } 96 97 LOG.error("recyle error", e); 98 recycleErrorCount.incrementAndGet(); 99 } 100 }
1 void putLast(DruidConnectionHolder e, long lastActiveTimeMillis) { 2 e.setLastActiveTimeMillis(lastActiveTimeMillis); 3 /** 將連線放到連線池陣列的尾部 */ 4 connections[poolingCount] = e; 5 incrementPoolingCount(); 6 7 if (poolingCount > poolingPeak) { 8 poolingPeak = poolingCount; 9 poolingPeakTime = lastActiveTimeMillis; 10 } 11 /** 喚醒notEmpty */ 12 notEmpty.signal(); 13 notEmptySignalCount++; 14 }
三、Druid的實現細節
3.1、核心類
3.1.1、DruidDataSource類
DruidDataSource是Druid提供的資料類,實現了DataSource介面,實現了獲取連線的getConnection方法,用於應用程式使用的資料物件,內部持有連線的陣列DruidConnectionHolder[] connections表示連線池
3.1.2、DruidPooledConnection類
DruidPooledConnection表示資料庫連線物件,應用程式獲取DruidPooledConnection執行SQL,使用完畢呼叫close方法釋放連線
3.1.3、DruidConnectionHolder型別
DruidConnectionHolder是DruidPooledConnection類的封裝,表示連線池中持有的連線物件,連線池新增連線時實際是建立DruidConnectionHolder物件放入陣列中,獲取連線就是從陣列尾部獲取DruidConnectionHolder物件
3.2、ReentrantLock和Condition的使用
DruidDataSource內部有一個ReentrantLock lock物件和兩個Condition物件,分別為empty和notEmpty,主要用於連線的建立和銷燬的等待和通知。
資料庫連線池初始化的時候會初始化固定數量的連線,但是隨著應用程式的執行,資料庫連線的需求往往是動態變化的,比如初始化時建立了10個連線,但是高峰期的時候需要15個連線才可以滿足需求,此時連線池就需要動態的對連線池進行擴容,而等到高峰期過了之後,資料庫連線池還需要將多餘建立的5個連線進行釋放,不然在空閒時間也會佔據著連線造成資源的浪費。連線池中連線的動態增刪就是依靠了empty和notEmpty這兩個Condition物件。
empty用於通知建立連線,notEmpty用於通知應用獲取應用
初始化時啟動建立連線的執行緒,判斷當前是否需要建立連線,如果不需要建立則呼叫empty.await()方法進行等待,等待empty被喚醒之後進行建立連線,一旦empty被喚醒就會建立連線,建立完成之後通知notEmpty,讓使用者不再阻塞,拿到連線物件。
客戶端呼叫getConnection方法獲取連線時,如果當前沒有可用連線,則呼叫empty.signal()方法喚醒empty,並呼叫notEmpty.await()睡眠等待連線建立完成
3.3、Druid工作流程圖
1、 getConnection方法流程如下:
2、建立連線執行緒和銷燬連線執行緒流程如下: