Java 資料持久化系列之池化技術
在上一篇文章《Java 資料持久化系列之JDBC》中,我們瞭解到使用 JDBC 建立 Connection 可以執行對應的SQL,但是建立 Connection 會消耗很多資源,所以 Java 持久化框架中往往不直接使用 JDBC,而是在其上建立資料庫連線池層。
今天我們就先來了解一下池化技術的必要性、原理;然後使用 Apache-common-Pool2實現一個簡單的資料庫連線池;接著通過實驗,對比簡單連線池、HikariCP、Druid 等資料庫連線池的效能資料,分析實現高效能資料庫連線池的關鍵;最後分析 Pool2 的具體原始碼實現。
物件不是你想要,想要就能要
你我單身狗們經常調侃可以隨便 New 出一個物件,用完就丟。但是有些物件建立的代價比較大,比如執行緒、tcp連線、資料庫連線等物件。對於這些建立耗時較長,或者資源佔用較大(佔據作業系統資源,比如說執行緒,網路連線等)的物件,往往會引入池化來管理,減少頻繁建立物件的次數,避免建立物件時的耗時,提高效能。
我們就以資料庫連線 Connection 物件為例,詳細說明一下建立該物件花費的時間和資源。下面是MySQL Driver 建立 Connection 物件的方法,在呼叫 connect 方法建立 Connection 時,會與 MySQL 進行網路通訊,建立 TCP 連線,這是極其消耗時間的。
connection = driver.connect(URL, props);
使用 Apache-Common-Pool2實現簡易資料庫連線池
下面,我們以 Apache-Common-Pool2為例來看一下池化技術相關的抽象結構。
首先了解一下Pool2中三元一體的 ObjectPool,PooledObject 和 PooledObjectFactory,對他們的解釋如下:
- ObjectPool 就是物件池,提供了
borrowObject
和returnObject
等一系列函式。 - PooledObject 是池化物件的封裝類,負責記錄額外資訊,比如說物件狀態,物件建立時間,物件空閒時間,物件上次使用時間等。
- PooledObjectFactory 是負責管理池化物件生命週期的工廠類,提供
makeObject
,destroyObject
,activateObject
和validateObject
等一系列函式。
上述三者都有其基礎的實現類,分別是 GenericObjectPool,DefaultPooledObject 和 BasePooledObjectFactory。上一節實驗中的 SimpleDatasource 就是使用上述類實現的。
首先,你要實現一個繼承 BasePooledObjectFactory 的工廠類,提供管理池化物件生命週期的具體方法:
- makeObject:建立池化物件例項,並且使用 PooledObject 將其封裝。
- validateObject:驗證物件例項是否安全或者可用,比如說 Connection 是否還儲存連線狀態。
- activateObject:將池返回的物件例項進行重新初始化,比如說設定 Connection是否預設AutoCommit等。
- passivateObject:將返回給池的物件例項進行反初始化,比如說 Connection 中未提交的事務進行 Rollback等。
- destroyObject:銷燬不再被池需要的物件例項,比如說 Connection不再被需要時,呼叫其 close 方法。
具體的實現原始碼如下所示,每個方法都有詳細的註釋。
public class SimpleJdbcConnectionFactory extends BasePooledObjectFactory<Connection> {
....
@Override
public Connection create() throws Exception {
// 用於建立池化物件
Properties props = new Properties();
props.put("user", USER_NAME);
props.put("password", PASSWORD);
Connection connection = driver.connect(URL, props);
return connection;
}
@Override
public PooledObject<Connection> wrap(Connection connection) {
// 將池化物件進行封裝,返回DefaultPooledObject,這裡也可以返回自己實現的PooledObject
return new DefaultPooledObject<>(connection);
}
@Override
public PooledObject<Connection> makeObject() throws Exception {
return super.makeObject();
}
@Override
public void destroyObject(PooledObject<Connection> p) throws Exception {
p.getObject().close();
}
@Override
public boolean validateObject(PooledObject<Connection> p) {
// 使用 SELECT 1 或者其他sql語句驗證Connection是否可用,ConnUtils程式碼詳見Github中的專案
try {
ConnUtils.validateConnection(p.getObject(), this.validationQuery);
} catch (Exception e) {
return false;
}
return true;
}
@Override
public void activateObject(PooledObject<Connection> p) throws Exception {
Connection conn = p.getObject();
// 物件被借出,需要進行初始化,將其 autoCommit進行設定
if (conn.getAutoCommit() != defaultAutoCommit) {
conn.setAutoCommit(defaultAutoCommit);
}
}
@Override
public void passivateObject(PooledObject<Connection> p) throws Exception {
// 物件被歸還,進行回收或者處理,比如將未提交的事務進行回滾
Connection conn = p.getObject();
if(!conn.getAutoCommit() && !conn.isReadOnly()) {
conn.rollback();
}
conn.clearWarnings();
if(!conn.getAutoCommit()) {
conn.setAutoCommit(true);
}
}
}
接著,你就可以使用 BasePool 來從池中獲取物件,使用後歸還給池。
Connection connection = pool.borrowObject(); // 從池中獲取連線物件例項
Statement statement = connection.createStatement();
statement.executeQuery("SELECT * FROM activity");
statement.close();
pool.returnObject(connection); // 使用後歸還連線物件例項到池中
如上,我們就使用 Apache Common Pool2 實現了一個簡易的資料庫連線池。下面,我們先來使用 benchmark 驗證一下這個簡易資料庫連線池的效能,再分析 Pool2 的具體原始碼實現,
效能試驗
至此,我們分析完了 Pool2的相關原理和實現,下面就修改 Hikari-benchmark 對我們編寫的建議資料庫連線池進行效能測試。修改後的 benchmark 的地址為 https://github.com/ztelur/HikariCP-benchmark。
可以看到 hikari 和 Druid 兩個資料庫連線池的效能是最優的,而我們的簡易資料庫連線池效能排在末尾。在後續系列文章中會對比我們的簡易資料庫分析 Hikari 和 Druid 高效能的原因。下面我們先來看一下簡易資料庫連線池的具體實現。
Apache Common Pool2 原始碼分析
我們來簡要分析 Pool2 的原始碼( 2.8.0版本 )實現,瞭解池化技術的基本原理,為後續瞭解和分析 HikariCP 和 Druid 打下基礎,三者在設計思路具有互通之處。
通過前邊的例項,我們知道通過 borrowObject
和 returnObject
從物件池中接取或者歸還物件,進行這些操作時,封裝例項 PooledObject 的狀態也會發生變化,下面就沿著 PooledObject 狀態機的狀態變化路線來講解相關的程式碼實現。
上圖是 PooledObject 的狀態機示意圖,藍色元素代表狀態,紅色代表 ObjectPool的相關方法。PooledObject 的狀態有 IDLE、ALLOCATED、RETURNING、ABANDONED、INVALID、EVICTION 和 EVICTION_RETURN_TO_HEAD(所有狀態定義在 PooledObjectState 類中,有些狀態暫時未被使用,這裡不進行說明)。
主要涉及三部分的狀態變化,分別是 1、2、3的借出歸還狀態變化,4,5的標記拋棄刪除狀態變化以及6,7,8的檢測驅除狀態變化。後續會分小節詳細介紹這三部分的狀態變化。
在這些狀態變化過程中,不僅涉及 ObjectPool 的方法,也會呼叫 PooledObjectFactory 的方法進行相關操作。
上圖表明瞭在 PooledObject 狀態變化過程中涉及的 PooledObjectFactory 的方法。按照前文對 PooledObjectFactory 方法的描述,可以很容易的對應起來。比如說,在編號 1 的物件被借出過程中,先呼叫 invalidateObject 判斷物件可用性,然後呼叫 activeObject 將物件預設配置初始化。
借出歸還狀態變化
我們從 GenericObjectPool 的 borrowObject 方法開始瞭解。該方法可以傳入最大等待時間為引數,如果不傳則使用配置的預設最大等待時間,borrowObject 的原始碼如下所示(為了可讀性,對程式碼進行刪減)。
public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
// 1 根據 abandonedConfig 和其他檢測判斷是否要呼叫 removeAbandoned 方法進行標記刪除操作
....
PooledObject<T> p = null;
// 當暫時無法獲取物件時是否阻塞
final boolean blockWhenExhausted = getBlockWhenExhausted();
while (p == null) {
create = false;
// 2 先從 idleObjects 佇列中獲取, pollFisrt 是非阻塞的
p = idleObjects.pollFirst();
// 3 沒有則呼叫 create 方法建立一個新的物件
if (p == null) {
p = create();
}
// 4 blockWhenExhausted 為true,則根據 borrowMaxWaitMillis 進行阻塞操作
if (blockWhenExhausted) {
if (p == null) {
if (borrowMaxWaitMillis < 0) {
p = idleObjects.takeFirst(); // 阻塞到獲取物件為止
} else {
p = idleObjects.pollFirst(borrowMaxWaitMillis,
TimeUnit.MILLISECONDS); // 阻塞到最大等待時間或者獲取到物件
}
}
}
// 5 呼叫 allocate 進行狀態變化
if (!p.allocate()) {
p = null;
}
if (p != null) {
// 6 呼叫 activateObject 進行物件預設初始化,如果出現問題則呼叫 destroy
factory.activateObject(p);
// 7 如果配置了 TestOnBorrow,則呼叫 validateObject 進行可用性校驗,如果不通過則呼叫 destroy
if (getTestOnBorrow()) {
validate = factory.validateObject(p);
}
}
}
return p.getObject();
}
borrowObject 方法主要做了五步操作:
- 第一步是根據配置判斷是否要呼叫 removeAbandoned 方法進行標記刪除操作,這個後續小節再細講。
- 第二步是嘗試獲取或建立物件,由原始碼中2,3,4 步驟組成。
- 第三步是呼叫 allocate 進行狀態變更,轉換為 ALLOCATED 狀態,如原始碼中的 5 步驟。
- 第四步是呼叫 factory 的 activateObject 進行物件的初始化,如果出錯則呼叫 destroy 方法銷燬物件,如原始碼中的 6 步驟。
- 第五步是根據 TestOnBorrow 配置呼叫 factory 的 validateObject 進行物件可用性分析,如果不可用,則呼叫 destroy 方法銷燬物件,如原始碼中的 7 步驟。
我們對第二步進行一下細緻的分析。idleObjects 是儲存著所有 IDLE狀態 (也有可能是 EVICTION 狀態) PooledObject 的 LinkedBlockingDeque 物件。第二步中先呼叫其 pollFirst 方法從佇列頭獲取 PooledObject,如果未獲取到則呼叫 create 方法建立一個新的。
create 也可能未建立成功,則當 blockWhenExhausted 為 true 時,未獲取到物件需要一直阻塞,所以根據最大等待時間 borrowMaxWaitMillis 來呼叫 takeFirst 或者 pollFirst(time) 方法進行阻塞式獲取;當 blockWhenExhausted 為 false 時,則直接丟擲異常返回。
create 方法會判斷當前狀況下是否應該建立新的物件,主要是要防止建立的物件數量超過最大池物件數量。如果可以建立新物件,則呼叫 PooledObjectFactory 的 makeObject 方法進行新物件建立,然後根據 testOnCreate 配置來判斷是否呼叫 validateObject 方法進行校驗,原始碼如下所示。
private PooledObject<T> create() throws Exception {
int localMaxTotal = getMaxTotal(); // 獲取池物件最大數量
final long localStartTimeMillis = System.currentTimeMillis();
final long localMaxWaitTimeMillis = Math.max(getMaxWaitMillis(), 0); // 獲取最大等待時間
Boolean create = null;
// 一直等待到 create 被賦值,true代表要建立新物件,false代表不能建立
while (create == null) {
synchronized (makeObjectCountLock) {
final long newCreateCount = createCount.incrementAndGet();
if (newCreateCount > localMaxTotal) {
// pool已經滿或者正在建立的足夠達到最大數量的物件。
createCount.decrementAndGet();
if (makeObjectCount == 0) {
// 目前沒有其他的 makeObject 方法被呼叫,直接返回false
create = Boolean.FALSE;
} else {
// 目前有其他的 makeObject 方法被呼叫,但是可能失敗,所以等待一段時間再試試
makeObjectCountLock.wait(localMaxWaitTimeMillis);
}
} else {
// pool未滿 可以建立物件。
makeObjectCount++;
create = Boolean.TRUE;
}
}
// 執行超過 maxWaitTimeMillis 則返回false
if (create == null &&
(localMaxWaitTimeMillis > 0 &&
System.currentTimeMillis() - localStartTimeMillis >= localMaxWaitTimeMillis)) {
create = Boolean.FALSE;
}
}
// 如果 create 為false,返回 NULL
if (!create.booleanValue()) {
return null;
}
final PooledObject<T> p;
try {
// 呼叫 factory 的 makeObject 進行物件建立,並且按照 testOnCreate 配置呼叫 validateObject 方法
p = factory.makeObject();
if (getTestOnCreate() && !factory.validateObject(p)) {
// 這裡程式碼有問題,校驗不通過的物件沒有進行銷燬?
createCount.decrementAndGet();
return null;
}
} catch (final Throwable e) {
createCount.decrementAndGet();
throw e;
} finally {
synchronized (makeObjectCountLock) {
// 減少 makeObjectCount
makeObjectCount--;
makeObjectCountLock.notifyAll();
}
}
allObjects.put(new IdentityWrapper<>(p.getObject()), p);
return p;
}
需要注意的是 create 方法建立的物件並沒有第一時間加入到 idleObjects 佇列中,該物件將會在後續使用完畢呼叫 returnObject 方法時才會加入到佇列中。
接下來,我們看一下 returnObject 方法的實現。該方法主要做了六步操作:
- 第一步是呼叫 markReturningState 方法將狀態變更為 RETURNING。
- 第二步是根據 testOnReturn 配置呼叫 PooledObjectFactory 的 validateObject 方法進行可用性校驗。如果未通過校驗,則呼叫 destroy 消耗該物件,然後呼叫 ensureIdle 確保池中有 IDLE 狀態物件可用,如果沒有會呼叫 create 方法建立新的物件。
- 第三步是呼叫 PooledObjectFactory 的 passivateObject 方法進行反初始化操作。
- 第四步是呼叫 deallocate 將狀態變更為 IDLE。
- 第五步是檢測是否超過了最大 IDLE 物件數量,如果超過了則銷燬當前物件。
- 第六步是根據 LIFO (last in, first out) 配置將物件放置到佇列的首部或者尾部。
public void returnObject(final T obj) {
final PooledObject<T> p = allObjects.get(new IdentityWrapper<>(obj));
// 1 將狀態轉換為 RETURNING
markReturningState(p);
final long activeTime = p.getActiveTimeMillis();
// 2 根據配置,對例項進行可用性校驗
if (getTestOnReturn() && !factory.validateObject(p)) {
destroy(p);
// 因為刪除了一個物件,需要確保池內還有物件,如果沒有改方法會建立新物件
ensureIdle(1, false);
updateStatsReturn(activeTime);
return;
}
// 3 呼叫 passivateObject 將物件反初始化。
try {
factory.passivateObject(p);
} catch (final Exception e1) {
.... // 和上邊 validateObject 校驗失敗相同操作。
}
// 4 將狀態變更為 IDLE
if (!p.deallocate()) {
throw new IllegalStateException(
"Object has already been returned to this pool or is invalid");
}
final int maxIdleSave = getMaxIdle();
// 5 如果超過最大 IDLE 數量,則進行銷燬
if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
.... // 同上邊 validateObject 校驗失敗相同操作。
} else {
// 6 根據 LIFO 配置,將歸還的物件放置在佇列首部或者尾部。 這邊原始碼拼錯了。
if (getLifo()) {
idleObjects.addFirst(p);
} else {
idleObjects.addLast(p);
}
}
updateStatsReturn(activeTime);
}
下圖介紹了第六步兩種入佇列的場景,LIFO 為 true 時防止在佇列頭部;LIFO 為 false 時,防止在佇列尾部。要根據不同的池化物件選擇不同的場景。但是放置在尾部可以避免併發熱點,因為借對象和還物件都需要操作佇列頭,需要進行併發控制。
標記刪除狀態變化
標記刪除狀態變化操作主要通過 removeAbandoned 實現,它主要是檢查已經借出的物件是否需要刪除,防止物件被借出長時間未使用或者歸還所導致的池物件被耗盡的情況。
removeAbandoned 根據 AbandonedConfig 可能會在 borrowObject 或者 檢測驅除物件的 evict 方法執行時被呼叫。
public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
final AbandonedConfig ac = this.abandonedConfig;
// 當配置了 removeAbandonedOnBorrow 並且 當前 idle 物件數量少於2,活躍物件數量只比最大物件數量少3.
if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
(getNumIdle() < 2) &&
(getNumActive() > getMaxTotal() - 3) ) {
removeAbandoned(ac);
}
....
}
public void evict() throws Exception {
....
final AbandonedConfig ac = this.abandonedConfig;
// 設定了 removeAbandonedOnMaintenance
if (ac != null && ac.getRemoveAbandonedOnMaintenance()) {
removeAbandoned(ac);
}
}
removeAbandoned 使用典型的標記刪除策略:標記階段是先對所有的物件進行遍歷,如果該物件是 ALLOCATED 並且上次使用時間已經超過超時時間,則將其狀態變更為 ABANDONED 狀態,並加入到刪除佇列中;刪除階段則遍歷刪除佇列,依次呼叫 invalidateObject 方法刪除並銷燬物件。
private void removeAbandoned(final AbandonedConfig ac) {
// 收集需要 abandoned 的物件
final long now = System.currentTimeMillis();
// 1 根據配置的時間計算超時時間
final long timeout =
now - (ac.getRemoveAbandonedTimeout() * 1000L);
final ArrayList<PooledObject<T>> remove = new ArrayList<>();
final Iterator<PooledObject<T>> it = allObjects.values().iterator();
while (it.hasNext()) {
final PooledObject<T> pooledObject = it.next();
// 2 遍歷所有的物件,如果它是已經分配狀態,並且該物件的最近一次使用時間小於超時時間
synchronized (pooledObject) {
if (pooledObject.getState() == PooledObjectState.ALLOCATED &&
pooledObject.getLastUsedTime() <= timeout) {
// 3 將物件狀態更改為 ABANDONED,並加入到刪除佇列
pooledObject.markAbandoned();
remove.add(pooledObject);
}
}
}
// 4 遍歷刪除佇列
final Iterator<PooledObject<T>> itr = remove.iterator();
while (itr.hasNext()) {
final PooledObject<T> pooledObject = itr.next();
// 5 呼叫 invalidateObject 方法刪除物件
invalidateObject(pooledObject.getObject());
}
}
invalidateObject 方法直接呼叫了 destroy 方法,destroy 方法在上邊的原始碼分析中也反覆出現,它主要進行了四步操作:
- 1 將物件狀態變更為 INVALID。
- 2 將物件從佇列和集合中刪除。
- 3 呼叫 PooledObjectFactory 的 destroyObject 方法銷燬物件。
- 4 更新統計資料
private void destroy(final PooledObject<T> toDestroy) throws Exception {
// 1 將狀態變更為 INVALID
toDestroy.invalidate();
// 2 從佇列和池中刪除
idleObjects.remove(toDestroy);
allObjects.remove(new IdentityWrapper<>(toDestroy.getObject()));
// 3 呼叫 destroyObject 回收物件
try {
factory.destroyObject(toDestroy);
} finally {
// 4 更新統計資料
destroyedCount.incrementAndGet();
createCount.decrementAndGet();
}
}
檢測驅除狀態變化
檢測驅除狀態變化主要由 evict 方法操作,在後臺執行緒中獨立完成,主要檢測池中的 IDLE 狀態的空閒物件是否需要驅除,超時時間通過 EvictionConfig 進行配置。
驅逐者 Evictor,在 BaseGenericObjectPool 中定義,本質是由 java.util.TimerTask 定義的定時任務。
final void startEvictor(final long delay) {
synchronized (evictionLock) {
if (delay > 0) {
// 定時執行 evictor 執行緒
evictor = new Evictor();
EvictionTimer.schedule(evictor, delay, delay);
}
}
}
在 Evictor 執行緒中會呼叫 evict 方法,該方法主要是遍歷所有的 IDLE 物件,然後對每個物件執行檢測驅除操作,具體原始碼如下所示:
- 呼叫 startEvictionTest 方法將狀態更改為 EVICTED。
- 根據驅除策略和物件超時時間判斷是否要驅除。
- 如果需要被驅除則呼叫 destroy 方法銷燬物件。
- 如果設定了 testWhileIdle 則呼叫 PooledObject 的 validateObject 進行可用性校驗。
- 呼叫 endEvictionTest 將狀態更改為 IDLE。
public void evict() throws Exception {
if (idleObjects.size() > 0) {
....
final EvictionPolicy<T> evictionPolicy = getEvictionPolicy();
synchronized (evictionLock) {
for (int i = 0, m = getNumTests(); i < m; i++) {
// 1 遍歷所有 idle 的物件
try {
underTest = evictionIterator.next();
} catch (final NoSuchElementException nsee) {
}
// 2 呼叫 startEvictionTest 將狀態變更為 EVICTED
if (!underTest.startEvictionTest()) {
continue;
}
// 3 根據驅除策略判斷是否要驅除
boolean evict = evictionPolicy.evict(evictionConfig, underTest,
idleObjects.size());
if (evict) {
// 4 進行驅除
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
} else {
// 5 如果需要檢測,則進行可用性檢測
if (testWhileIdle) {
factory.activateObject(underTest);
factory.validateObject(underTest));
factory.passivateObject(underTest);
}
// 5 變更狀態為 IDLE
if (!underTest.endEvictionTest(idleObjects)) {
}
}
}
}
}
.... // abandoned 相關的操作
}
後記
後續會分析 Hikari 和 Druid 資料庫連線池的實現,請大家多多關注。
個人部落格,歡迎來玩
參考
- https://zhuanlan.zhihu.com/p/32204303
- https://juejin.im/post/5af026a06fb9a07ac47ff282
- 高效能連線池的技術細節 https://yq.aliyun.com/articles/59497
- apache common的通用池 http://www.victorchu.info/2019/01/05/%E4%BB%8Eapache-common-pool%E7%9C%8B%E5%A6%82%E4%BD%95%E5%86%99%E4%B8%80%E4%B8%AA%E9%80%9A%E7%94%A8%E6%B1%A0/
- 如何設計一個連線池:commons-pool2原始碼分 https://throwsnew.com/2017/06/12/commons-pool/
- https://zhuanlan.zhihu.com/p/32204303
- https://yq.aliyun.com/articles/59497](https://yq.aliyun.com/articles/59497