commons-pool物件池實現原理及使用(一)
我們知道頻繁的建立物件是一個相對較重的過程,jvm要載入類,初始化物件,為物件分配記憶體涉及到多個系統呼叫,這樣對一個高負載的系統來說是比較消耗資源的。這時我們可以將所需物件進行池化,apache開源的的commons-pool是一個比較經典的實現物件池化元件,筆者所在的公司在多個框架中都使用過這個元件,還有很多業界知名的中介軟體如Redis客戶端Jedis等都使用到了這個池化元件。我們雖然不必重複造輪子,但是我們也得了解這個輪子是怎麼造的。
接下來我們就從commons-pool的類層次設計、實現原理、使用這三個方面來研究一下commons-pool,我們這裡以commons-pool-1.6原始碼進行分析,原始碼和jar包可以在這個連結中下載
首先我們先解釋如下幾個概念:
物件池:就是我們要介紹的commons-pool的核心概念,類似於容器,用於存放我們需要池化的物件
物件池工廠:就是操作物件池的工程(類似於設計模式中的工程模式),這個工廠可以執行借出、歸還、銷燬等管理池物件的操作
池物件:就是我們需要池化的物件
池物件工廠:就是操作池物件的工廠,比如生成、驗證、啟用、鈍化、銷燬池物件
(1)commons-pool類的層次設計
這個是Commons-pool的原始碼包,只有兩個包org.apache.commons.pool這個包下大部分都是介面和抽象類,org.apache.commons.pool.impl這個是使用者可以直接使用的介面或抽象類的具體實現。
public class GenericObjectPool<T> extends BaseObjectPool<T> implements ObjectPool<T>
ObjectPool是一個頂層介面,裡面定義了物件池的一些基本操作,BaseObjectPool是一個抽象類對上層介面提供了預設實現,作為一個使用者層的預設緩衝實現,這樣使用者就不必全部實現頂層介面的所有方法,這種預設模式在很多地方都可以使用,GenericObjectPool是一個具體的物件池的實現。
public interface ObjectPool {
//從物件池中借出物件
public abstract Object borrowObject() throws Exception,NoSuchElementException, IllegalStateException;
//將物件歸還到物件池中
public abstract void returnObject(Object obj) throws Exception;
//使一個物件無效
public abstract void invalidateObject(Object obj) throws Exception;
//向物件池中新增物件
public abstract void addObject() throws Exception, IllegalStateException,UnsupportedOperationException;
//得到當前物件池中空閒的物件
public abstract int getNumIdle() throws UnsupportedOperationException;
//得到當前物件池已經借出的物件,這裡的啟用其實就是物件池借出物件的概念
public abstract int getNumActive() throws UnsupportedOperationException;
//清空物件池,並銷燬物件池中的所有物件
public abstract void clear() throws Exception,UnsupportedOperationException;
// 關閉物件池
public abstract void close() throws Exception;
/**
* @deprecated Method setFactory is deprecated
*這個方法已經棄用了,池物件工程需要在實現類的建構函式中傳入
*/
public abstract void setFactory(PoolableObjectFactory poolableobjectfactory)
throws IllegalStateException, UnsupportedOperationException;
}
public abstract class BasePoolableObjectFactory<T> implements PoolableObjectFactory<T>
這個是池物件類的設計繼承結構,PoolableObjectFactory<T>這是一個池物件的頂層介面,BasePoolableObjectFactory<T>這是一個抽象類提供了一些預設實現,我們在使用的時候即可以繼承BasePoolableObjectFactory<T>這個抽象類,也可以實現頂層介面PoolableObjectFactory<T>。public interface PoolableObjectFactory {
/**
* 物件池呼叫該方法 建立一個物件
*/
public abstract Object makeObject() throws Exception;
/**
* 歸還物件時 如果呼叫validateObject 驗證物件失敗,或者連線池被關閉,
* 又或者歸還物件時連線池中空閒的物件數量大於等於MaxIdel
* 若符合上述三種情況之一 都會呼叫這個方法銷燬物件
* 在驅逐執行緒啟動進行檢查是符合驅逐策略也會呼叫這個方法銷燬物件
*/
public abstract void destroyObject(Object obj) throws Exception;
/**
* testOnBorrow設定為true時,建立物件時呼叫該方法驗證物件的有效性,
* 如果無效直接丟擲異常throw new Exception("ValidateObject failed");
* testOnReturn設定為true時,歸還物件時 驗證物件是否還有效 比如連線是否還在,
* 如果無效了則不往物件池中放物件。
*/
public abstract boolean validateObject(Object obj);
/**
* 呼叫該方法啟用物件
*
*/
public abstract void activateObject(Object obj) throws Exception;
/**
* 歸還物件時 鈍化物件 比如某些物件用完之後需要休眠一段時間
*/
public abstract void passivateObject(Object obj) throws Exception;
}
(二)Commons-pool實現原理
commons-pool物件池暴露出來很多引數供使用者配置使用,下面就是物件池暴露的引數以及預設值。
public static class Config {
/**
* 物件池中最多可以空閒的數量,如果設定的太小,當負載很重時執行緒借對象的時候的時候會頻繁建立物件
* 這個值其實可以理解為 在物件池中快取多少個物件 當需要的物件個數大於maxIdle時再建立新物件
*/
public int maxIdle = GenericObjectPool.DEFAULT_MAX_IDLE;
/**
* 物件池中保留物件的最小的個數,設定為0 代表物件都可以外借
*/
public int minIdle = GenericObjectPool.DEFAULT_MIN_IDLE;
/**
* 這個值就是物件池的最大可以外借的數量
* 負數不限制 物件池可以外接的物件數量
*/
public int maxActive = GenericObjectPool.DEFAULT_MAX_ACTIVE;
/**
* 當物件池外借物件數量大約maxActive時 再來借對象的執行緒將會阻塞的時間
*/
public long maxWait = GenericObjectPool.DEFAULT_MAX_WAIT;
/**
* 當物件池外借物件數量大約maxActive時 再來借對象的執行緒時的行為
* 0 失敗;1阻塞;2繼續建立新物件
*/
public byte whenExhaustedAction = GenericObjectPool.DEFAULT_WHEN_EXHAUSTED_ACTION;
/**
* true 借對象時呼叫池物件工廠的驗證方法 驗證物件的有效性
*/
public boolean testOnBorrow = GenericObjectPool.DEFAULT_TEST_ON_BORROW;
/**
* true 返還物件時呼叫池物件工廠的驗證方法 驗證物件的有效性
*/
public boolean testOnReturn = GenericObjectPool.DEFAULT_TEST_ON_RETURN;
/**
* true 驅逐執行緒在對池中物件進行 檢測時會對池物件進行validateObject 驗證
* 如果驗證返回false,則將會銷燬該物件
*/
public boolean testWhileIdle = GenericObjectPool.DEFAULT_TEST_WHILE_IDLE;
/**
* 驅逐執行緒執行的時間間隔
*/
public long timeBetweenEvictionRunsMillis = GenericObjectPool.DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
/**
* 驅逐執行緒 每次執行驅逐的物件個數
*/
public int numTestsPerEvictionRun = GenericObjectPool.DEFAULT_NUM_TESTS_PER_EVICTION_RUN;
/**
* 池物件在物件池中的時間如果小於這個正數值 則不進行驅逐 ;大於這個值則進行驅逐檢測操作
* 如果這個值小於0 則不進行驅逐
*/
public long minEvictableIdleTimeMillis = GenericObjectPool.DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS;
/**
* 池物件在物件池中的時間如果小於這個正數值 則不進行驅逐;大於這個值則進行驅逐檢測操作,但是至少要保留minIdel物件在池中
* 如果這個值小於0 則不進行驅逐
*/
public long softMinEvictableIdleTimeMillis = GenericObjectPool.DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS;
/**
* true 物件池外借物件時按照物件入池是lifo的順序外借
* false 物件池外借物件時按照物件入池是fifo的順序外借
*/
public boolean lifo = GenericObjectPool.DEFAULT_LIFO;
}
這裡以GenericObjectPool<T>這個具體的實現來分析,據我分析程式碼來看這個類中最重要的三個成員域就是如下三個:
/** My pool.
*這個就是物件池 是一個基於遊標的雙向連結串列,空閒的池物件都會放到這裡面,
*如果設定了驅逐策略,驅逐執行緒會定期檢測這裡面空閒物件是否可以驅逐
*/
private CursorableLinkedList<ObjectTimestampPair<T>> _pool = null;
/** Eviction cursor - keeps track of idle object evictor position
*這個是驅逐執行緒使用,保證驅逐物件的順序不變
*/
private CursorableLinkedList<ObjectTimestampPair<T>>.Cursor _evictionCursor = null;
/**
* Used to track the order in which threads call {@link #borrowObject()} so
* that objects can be allocated in the order in which the threads requested
* them.
* 這是一個雙向連結串列,用於儲存使用者執行緒的借對象的請求,主要是保證對外借物件的公平性,
* 如果多個執行緒阻塞,當物件池內有物件可用時,解除阻塞執行緒的順序就是這個_allocationQueue域
* 保證的順序
*/
private final LinkedList<Latch<T>> _allocationQueue = new LinkedList<Latch<T>>();
接下來我們分析物件池中核心的三個方法,由於物件池是針對多執行緒使用的,大量方法都有加鎖保持資料同步。GenericObjectPool<T> 類內部有一個儲存外部執行緒請求的資料結構 private static final class Latch<T>,這是一個私用的靜態內部類。
/**
* 為_allocationQueue佇列中的請求分配物件,如果物件池中有物件則優先分配物件池中的物件,
* 如果物件池中沒有物件,在滿足當前可接物件小於最大可啟用物件的情況下設定可建立物件標記,
* 並解除一個阻塞的執行緒
*/
private synchronized void allocate() {
if (isClosed()) return;
// 如果物件池中有物件,優先分配物件池中的物件
//_numInternalProcessing這個表示當前正在處理的物件,這個物件的建立成功之後會計入啟用物件的數量
for (;;) {
if (!_pool.isEmpty() && !_allocationQueue.isEmpty()) {
Latch<T> latch = _allocationQueue.removeFirst();
latch.setPair( _pool.removeFirst());
_numInternalProcessing++;
synchronized (latch) {
latch.notify();
}
} else {
break;
}
}
// 如果物件池中沒有可以獲得物件,那就從請求佇列中獲取一個請求,解除響應的執行緒阻塞,建立一個新物件
// 當然條件就是滿足_numActive + _numInternalProcessing<_maxActive
for(;;) {
if((!_allocationQueue.isEmpty()) && (_maxActive < 0 || (_numActive + _numInternalProcessing) < _maxActive)) {
Latch<T> latch = _allocationQueue.removeFirst();
latch.setMayCreate(true);
_numInternalProcessing++;
synchronized (latch) {
latch.notify();
}
} else {
break;
}
}
}
/**
* 從物件池中接一個物件,這個方法程式碼看起來很亂,但是邏輯是很清楚的
* (1)物件池中沒有物件,一種情況是物件都外借了此時達到了maxActive限制的數量,物件池處於耗盡的狀態,
* 這時執行_whenExhaustedAction設定的行為,等待歸還物件。
* 另一種情況是當前外借物件小於maxActive,且沒有閒置的物件,這時設定可建立標誌,物件池建立物件,並呼叫池物件工程啟用和驗證物件。
* (2)物件池中有物件,呼叫池物件工程方法啟用並驗證物件直接返回
*/
@Override
public T borrowObject() throws Exception {
long starttime = System.currentTimeMillis();
Latch<T> latch = new Latch<T>();
byte whenExhaustedAction;
long maxWait;
synchronized (this) {
//分配本地變數副本
whenExhaustedAction = _whenExhaustedAction;
maxWait = _maxWait;
// 將借對象請求新增到請求佇列
_allocationQueue.add(latch);
}
//執行分配物件方法,主要就是根據當前物件池的具體情況,設定請求latch的_pair和_mayCreate的狀態
allocate();
for (;;) {
synchronized (this) {
assertOpen();
}
// 物件池中沒有物件可以分配
if (latch.getPair() == null) {
// 當前外借物件小於maxActive時,可以建立新物件
if (latch.mayCreate()) {
// allow new object to be created
} else {
// 如果沒有分配到物件,且當前不能建立新物件,表名當前物件池處於耗盡狀態
switch (whenExhaustedAction) {
case WHEN_EXHAUSTED_GROW:
// 允許物件增長
synchronized (this) {
//將請求移出請求佇列 之後在後面建立物件
if (latch.getPair() == null && !latch.mayCreate()) {
_allocationQueue.remove(latch);
_numInternalProcessing++;
}
}
break;
case WHEN_EXHAUSTED_FAIL:
synchronized (this) {
//如果期間已經分配了物件,則跳出呼叫池物件工場啟用 驗證物件
//如果沒有移出請求 丟擲異常
if (latch.getPair() != null || latch.mayCreate()) {
break;
}
_allocationQueue.remove(latch);
}
throw new NoSuchElementException("Pool exhausted");
case WHEN_EXHAUSTED_BLOCK:
try {
synchronized (latch) {
//阻塞執行緒
if (latch.getPair() == null
&& !latch.mayCreate()) {
if (maxWait <= 0) {
latch.wait();
} else {
// this code may be executed again after
// a notify then continue cycle
// so, need to calculate the amount of
// time to wait
final long elapsed = (System
.currentTimeMillis() - starttime);
final long waitTime = maxWait - elapsed;
if (waitTime > 0) {
latch.wait(waitTime);
}
}
} else {
break;
}
}
// see if we were awakened by a closing pool
if (isClosed() == true) {
throw new IllegalStateException("Pool closed");
}
} catch (InterruptedException e) {
boolean doAllocate = false;
synchronized (this) {
// Need to handle the all three possibilities
if (latch.getPair() == null
&& !latch.mayCreate()) {
// Case 1: latch still in allocation queue
// Remove latch from the allocation queue
_allocationQueue.remove(latch);
} else if (latch.getPair() == null
&& latch.mayCreate()) {
// Case 2: latch has been given permission
// to create
// a new object
_numInternalProcessing--;
doAllocate = true;
} else {
// Case 3: An object has been allocated
_numInternalProcessing--;
_numActive++;
returnObject(latch.getPair().getValue());
}
}
if (doAllocate) {
allocate();
}
Thread.currentThread().interrupt();
throw e;
}
if (maxWait > 0
&& ((System.currentTimeMillis() - starttime) >= maxWait)) {
synchronized (this) {
// Make sure allocate hasn't already assigned an
// object
// in a different thread or permitted a new
// object to be created
if (latch.getPair() == null
&& !latch.mayCreate()) {
// Remove latch from the allocation queue
_allocationQueue.remove(latch);
} else {
break;
}
}
throw new NoSuchElementException(
"Timeout waiting for idle object");
} else {
continue; // keep looping
}
default:
throw new IllegalArgumentException(
"WhenExhaustedAction property "
+ whenExhaustedAction
+ " not recognized.");
}
}
}
//如果沒有分配物件 則呼叫池物件工廠方法 建立 物件
boolean newlyCreated = false;
if (null == latch.getPair()) {
try {
T obj = _factory.makeObject();
latch.setPair(new ObjectTimestampPair<T>(obj));
newlyCreated = true;
} finally {
if (!newlyCreated) {
// object cannot be created
synchronized (this) {
_numInternalProcessing--;
// No need to reset latch - about to throw exception
}
allocate();
}
}
}
// 則呼叫池物件工廠方法 啟用 驗證物件
try {
_factory.activateObject(latch.getPair().value);
if (_testOnBorrow
&& !_factory.validateObject(latch.getPair().value)) {
throw new Exception("ValidateObject failed");
}
synchronized (this) {
_numInternalProcessing--;
_numActive++;
}
return latch.getPair().value;
} catch (Throwable e) {
PoolUtils.checkRethrow(e);
// object cannot be activated or is invalid
try {
_factory.destroyObject(latch.getPair().value);
} catch (Throwable e2) {
PoolUtils.checkRethrow(e2);
// cannot destroy broken object
}
synchronized (this) {
_numInternalProcessing--;
if (!newlyCreated) {
latch.reset();
_allocationQueue.add(0, latch);
}
}
allocate();
if (newlyCreated) {
throw new NoSuchElementException(
"Could not create a validated object, cause: "
+ e.getMessage());
} else {
continue; // keep looping
}
}
}
}