commons-pool2原始碼走讀(四) 物件池實現GenericObjectPool
commons-pool2原始碼走讀(四) 物件池實現GenericObjectPool<T>
GenericObjectPool <T> 是一個可配置的ObjectPool實現。
當與適當的PooledObjectFactory組合使用時,GenericObjectPool為任意物件提供健壯的池功能。
您可以選擇性的配置池來檢查和可能回收池中的空閒物件,並確保有最少數量的空閒物件可用。這是由一個“空閒物件回收”執行緒(即BaseGenericObjectPool <T> 的Evictor)執行的,執行緒是非同步執行的。在配置這個可選特性時,應該謹慎使用。驅逐執行與客戶端執行緒爭用池中的物件,因此如果它們執行得太頻繁,可能會導致效能問題。
還可以配置池來檢測和刪除被洩漏的物件,比如一個從池中借出的物件,在超過removeAbandonedTimeout超時之前既不使用也不返回。移除洩漏的連線,可能發生在物件被借用時物件池已接近飽和,也可能是被回收執行緒檢查出,或者兩者都執行時。如果池物件實現了TrackedUse介面,那麼其最後一次使用時間使取決於getLastUsed方法;否則,是由物件從池中借出的時間決定。
實現注意:為了防止可能的死鎖,已經採取了謹慎措施,以確保在同步塊中不會發生對工廠方法的呼叫。這個類執行緒安全。
1、介面繼承、實現關係
GenericObjectPool <T> 實現了ObjectPool<T> 具備物件池的功能,同時 繼承了BaseGenericObjectPool<T> 的對於物件狀態管理和回收等功能。
2、建構函式
建構函式通過GenericObjectPoolConfig 和PooledObjectFactory來進行引數的初始化和物件工廠類的引入。
public GenericObjectPool(final PooledObjectFactory<T> factory,
final GenericObjectPoolConfig config) {
//父類BaseGenericObjectPool構造方法
super(config, ONAME_BASE, config.getJmxNamePrefix());
if (factory == null) {
jmxUnregister(); // tidy up
throw new IllegalArgumentException("factory may not be null");
}
this.factory = factory;
//空閒物件佇列,此佇列非JDK而是自行實現的一個佇列
idleObjects = new LinkedBlockingDeque<>(config.getFairness());
//覆蓋BaseGenericObjectPool裡面的配置引數
setConfig(config);
//初始化回收執行緒
startEvictor(getTimeBetweenEvictionRunsMillis());
}
3、相關屬性
// --- 可配置的屬性 -------------------------------------------------
//最大空閒數量
private volatile int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
//最小空閒數量
private volatile int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
//物件工廠
private final PooledObjectFactory<T> factory;
// --- 內部屬性 -------------------------------------------------
//池中所有的物件,只能是<=maxActive
private final Map<IdentityWrapper<T>, PooledObject<T>> allObjects =
new ConcurrentHashMap<>();
//已建立物件總數(不包含已銷燬的)
private final AtomicLong createCount = new AtomicLong(0);
//呼叫建立方法匯流排程數
private long makeObjectCount = 0;
//makeObjectCount 增長時併發鎖
private final Object makeObjectCountLock = new Object();
//空閒物件佇列
private final LinkedBlockingDeque<PooledObject<T>> idleObjects;
// JMX specific attributes
private static final String ONAME_BASE =
"org.apache.commons.pool2:type=GenericObjectPool,name=";
//洩漏物件回收配置引數
private volatile AbandonedConfig abandonedConfig = null;
4、 物件池方法實現
- 借用物件
整個流程為,檢查池是否關閉 –> 是否回收洩漏物件 –> 是否阻塞建立物件 –> 建立物件 –> 分配物件 –> 啟用物件 –> 校驗物件 –> 更改借用資訊 –> 返回物件
public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
//判斷物件池是否關閉:BaseGenericObjectPool.closed==true
assertOpen();
//如果回收洩漏的引數配置不為空,並且removeAbandonedOnBorrow引數配置為true
//並且Idle數量<2,Active數量>總數Total-3
//在借用時進行回收洩漏連線(會影響效能)
final AbandonedConfig ac = this.abandonedConfig;
if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
(getNumIdle() < 2) &&
(getNumActive() > getMaxTotal() - 3) ) {
//回收洩漏物件
removeAbandoned(ac);
}
PooledObject<T> p = null;
//copy blockWhenExhausted 防止其它執行緒更改getBlockWhenExhausted值造成併發問題
//借用物件時如果沒有是否阻塞直到有物件產生
final boolean blockWhenExhausted = getBlockWhenExhausted();
//建立成功標識
boolean create;
//記錄當前時間,用作記錄借用操作總共花費的時間
final long waitTime = System.currentTimeMillis();
//當物件為空時一直獲取
while (p == null) {
create = false;
//從雙端佇列彈出第一個隊首物件,為空返回null
p = idleObjects.pollFirst();
//如果為空則重新建立一個物件
if (p == null) {
//建立物件
p = create();
//p==null可能物件池達到上限不能繼續建立!
if (p != null) {
create = true;
}
}
//如果物件p還是為空則阻塞等待
if (blockWhenExhausted) {
if (p == null) {
if (borrowMaxWaitMillis < 0) {
//沒有超時時間則阻塞等待到有物件為止
p = idleObjects.takeFirst();
} else {
//有超時時間
p = idleObjects.pollFirst(borrowMaxWaitMillis,
TimeUnit.MILLISECONDS);
}
}
//達到超時時間,還未取到物件,則丟擲異常
if (p == null) {
throw new NoSuchElementException(
"Timeout waiting for idle object");
}
} else {
//未取到物件,則丟擲異常
if (p == null) {
throw new NoSuchElementException("Pool exhausted");
}
}
//呼叫PooledObject.allocate()方法分配物件
//[具體實現請看](https://blog.csdn.net/qq447995687/article/details/80413227)
if (!p.allocate()) {
p = null;
}
//分配成功
if (p != null) {
try {
//啟用物件,具體請看factory實現,物件重借出到歸還整個流程經歷的過程圖
factory.activateObject(p);
} catch (final Exception e) {
try {
destroy(p);
} catch (final Exception e1) {
// Ignore - activation failure is more important
}
p = null;
if (create) {
final NoSuchElementException nsee = new NoSuchElementException(
"Unable to activate object");
nsee.initCause(e);
throw nsee;
}
}
//物件建立成功,是否進行測試
if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {
boolean validate = false;
Throwable validationThrowable = null;
try {
//校驗物件,具體請看factory實現,物件重借出到歸還整個流程經歷的過程圖
validate = factory.validateObject(p);
} catch (final Throwable t) {
PoolUtils.checkRethrow(t);
validationThrowable = t;
}
//校驗不通過則銷燬物件
if (!validate) {
try {
destroy(p);
destroyedByBorrowValidationCount.incrementAndGet();
} catch (final Exception e) {
// Ignore - validation failure is more important
}
p = null;
if (create) {
final NoSuchElementException nsee = new NoSuchElementException(
"Unable to validate object");
nsee.initCause(validationThrowable);
throw nsee;
}
}
}
}
}
//更新物件借用狀態
updateStatsBorrow(p, System.currentTimeMillis() - waitTime);
return p.getObject();
}
建立物件
當借用時,空閒物件為空,並且未達到池最大數量,則會呼叫該方法重新建立一個空閒物件
private PooledObject<T> create() throws Exception {
int localMaxTotal = getMaxTotal();
// 如果最大數量為負數則設定為Integer的最大值
if (localMaxTotal < 0) {
localMaxTotal = Integer.MAX_VALUE;
}
// 是否建立成功的一個flag:
// - TRUE: 呼叫工廠類成功建立一個物件
// - FALSE: 返回空
// - null: 重複建立
Boolean create = null;
while (create == null) {
synchronized (makeObjectCountLock) {
//加上本次操作,總共建立個數
final long newCreateCount = createCount.incrementAndGet();
if (newCreateCount > localMaxTotal) {
//連線池容量已滿,不能繼續增長。在對最後一個物件的建立上,
//加入了設計較為精妙,需細細揣摩
createCount.decrementAndGet();
//呼叫建立物件方法執行緒數=0
if (makeObjectCount == 0) {
//容量已滿並且沒有執行緒呼叫makeObject()方法,
//表明沒有任何可能性再繼續建立物件,
//返回並等待歸還的空閒物件
create = Boolean.FALSE;
} else {
//其它執行緒呼叫makeObject()方法在建立物件了。
//如果繼續建立則可能超過物件池容量,不返回false,因為其它執行緒也在建立,
//但是是否能夠建立成功是未知的,如果其它執行緒沒能建立成功,
//則此執行緒可能會搶奪到繼續建立的權利。
//釋放鎖,等待其它執行緒建立結束並喚醒該執行緒
makeObjectCountLock.wait();
}
} else {
// 物件池未滿,從新建立一個物件
makeObjectCount++;
create = Boolean.TRUE;
}
}
}
//物件池容量達到上限,返回null重新等待其它執行緒歸還物件
if (!create.booleanValue()) {
return null;
}
final PooledObject<T> p;
try {
//建立一個新物件
p = factory.makeObject();
} catch (final Exception e) {
createCount.decrementAndGet();
throw e;
} finally {
//與上面wait()方法相呼應,
//如果上面丟擲了異常,喚醒其它執行緒爭奪繼續建立最後一個資源的權利
synchronized (makeObjectCountLock) {
makeObjectCount--;
makeObjectCountLock.notifyAll();
}
}
//設定洩漏引數,並加入呼叫堆疊
final AbandonedConfig ac = this.abandonedConfig;
if (ac != null && ac.getLogAbandoned()) {
p.setLogAbandoned(true);
// TODO: in 3.0, this can use the method defined on PooledObject
if (p instanceof DefaultPooledObject<?>) {
((DefaultPooledObject<T>) p).setRequireFullStackTrace(ac.getRequireFullStackTrace());
}
}
//將建立總數增加,並將物件放入 allObjects
createdCount.incrementAndGet();
allObjects.put(new IdentityWrapper<>(p.getObject()), p);
return p;
}
回收洩漏物件
private void removeAbandoned(final AbandonedConfig ac) {
// Generate a list of abandoned objects to remove
final long now = System.currentTimeMillis();
//超時時間=當前時間-配置的超時時間,如果一個物件的上次借用時間在此時間之前,
//說明上次借用後經過了removeAbandonedTimeout時間限制還未被歸還過即可能是洩漏的物件
final long timeout =
now - (ac.getRemoveAbandonedTimeout() * 1000L);
//洩漏的,需要移除的物件列表
final ArrayList<PooledObject<T>> remove = new ArrayList<>();
final Iterator<PooledObject<T>> it = allObjects.values().iterator();
//遍歷池中物件依次判斷是否需要移除
while (it.hasNext()) {
final PooledObject<T> pooledObject = it.next();
synchronized (pooledObject) {
//如果物件的狀態為已分配ALLOCATED ,並且已經超過洩漏定義時間則新增到需要移除佇列進行統一移除
if (pooledObject.getState() == PooledObjectState.ALLOCATED &&
pooledObject.getLastUsedTime() <= timeout) {
pooledObject.markAbandoned();
remove.add(pooledObject);
}
}
}
// 移除洩漏連線,如果配置了列印堆疊,則列印呼叫堆疊資訊
final Iterator<PooledObject<T>> itr = remove.iterator();
while (itr.hasNext()) {
final PooledObject<T> pooledObject = itr.next();
if (ac.getLogAbandoned()) {
pooledObject.printStackTrace(ac.getLogWriter());
}
try {
//銷燬物件
invalidateObject(pooledObject.getObject());
} catch (final Exception e) {
e.printStackTrace();
}
}
}
- invalidate物件
public void invalidateObject(final T obj) throws Exception {
//從所有物件中取出該物件,如果不存在則丟擲異常
final PooledObject<T> p = allObjects.get(new IdentityWrapper<>(obj));
if (p == null) {
if (isAbandonedConfig()) {
return;
}
throw new IllegalStateException(
"Invalidated object not currently part of this pool");
}
//如果物件不是無效狀態PooledObjectState.INVALID,則銷燬此物件
synchronized (p) {
if (p.getState() != PooledObjectState.INVALID) {
destroy(p);
}
}
//
ensureIdle(1, false);
}
銷燬物件
銷燬物件,並從池中移除,更新物件池已建立數量和總銷燬數量
private void destroy(final PooledObject<T> toDestroy) throws Exception {
toDestroy.invalidate();
idleObjects.remove(toDestroy);
allObjects.remove(new IdentityWrapper<>(toDestroy.getObject()));
try {
factory.destroyObject(toDestroy);
} finally {
destroyedCount.incrementAndGet();
createCount.decrementAndGet();
}
}
確保最小空閒數量
private void ensureIdle(final int idleCount, final boolean always) throws Exception {
//!idleObjects.hasTakeWaiters()如果idleObjects佇列還有執行緒等待獲取物件則由最後一個
//等待者確保最小空閒數量
if (idleCount < 1 || isClosed() || (!always && !idleObjects.hasTakeWaiters())) {
return;
}
//一直建立空閒物件知道空閒物件數量>總空閒數量閾值
while (idleObjects.size() < idleCount) {
final PooledObject<T> p = create();
if (p == null) {
// Can't create objects, no reason to think another call to
// create will work. Give up.
break;
}
//根據先進先出引數,新增物件到隊首或者隊尾
if (getLifo()) {
idleObjects.addFirst(p);
} else {
idleObjects.addLast(p);
}
}
//在此過程中如果連線池關閉則clear所有物件
if (isClosed()) {
clear();
}
}
- 歸還物件
歸還物件方法將適用完的物件從新放置回物件池中重複利用。其整個流程為:檢查是否存在 –> 檢查狀態是否正確 –> 是否在歸還時測試物件 –> 校驗物件 –> 鈍化(解除安裝)物件 –> 結束分配 –> 銷燬/歸還該物件 –> 更新連線池歸還資訊
public void returnObject(final T obj) {
final PooledObject<T> p = allObjects.get(new IdentityWrapper<>(obj));
if (p == null) {
//如果物件為空,並且沒有配置洩漏引數則丟擲異常,表明該物件不是連線池中的物件
if (!isAbandonedConfig()) {
throw new IllegalStateException(
"Returned object not currently part of this pool");
}
//如果物件為空,表明該物件是abandoned並且已被銷燬
return;
}
synchronized(p) {
final PooledObjectState state = p.getState();
//如果被歸還的物件不是已分配狀態,丟擲異常
if (state != PooledObjectState.ALLOCATED) {
throw new IllegalStateException(
"Object has already been returned to this pool or is invalid");
}
//更改狀態為returning,避免在此過程中被標記為被遺棄。
p.markReturning();
}
final long activeTime = p.getActiveTimeMillis();
//是否在歸還時測試該物件
if (getTestOnReturn()) {
//校驗物件
if (!factory.validateObject(p)) {
try {
//校驗不通過則destroy物件
destroy(p);
} catch (final Exception e) {
swallowException(e);
}
try {
//確保最小空閒數量
ensureIdle(1, false);
} catch (final Exception e) {
swallowException(e);
}
//更新連線池歸還資訊BaseGenericObjectPool#returnedCount,activeTimes
updateStatsReturn(activeTime);
return;
}
}
//校驗通過
try {
//鈍化(解除安裝)物件
factory.passivateObject(p);
} catch (final Exception e1) {
swallowException(e1);
try {
destroy(p);
} catch (final Exception e) {
swallowException(e);
}
try {
ensureIdle(1, false);
} catch (final Exception e) {
swallowException(e);
}
updateStatsReturn(activeTime);
return;
}
//結束分配,如果物件為ALLOCATED或者RETURNING更改物件為空閒IDLE狀態
//具體看org.apache.commons.pool2.impl.DefaultPooledObject#deallocate方法
if (!p.deallocate()) {
throw new IllegalStateException(
"Object has already been returned to this pool or is invalid");
}
final int maxIdleSave = getMaxIdle();
//如果物件池已經關閉或者空閒數量達到上限,則銷燬該物件
if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
try {
destroy(p);
} catch (final Exception e) {
swallowException(e);
}
} else {
//否則將歸還的物件新增到空閒佇列,連線池的最終目的:重用一個連線
if (getLifo()) {
idleObjects.addFirst(p);
} else {
idleObjects.addLast(p);
}
if (isClosed()) {
// Pool closed while object was being added to idle objects.
// Make sure the returned object is destroyed rather than left
// in the idle object pool (which would effectively be a leak)
clear();
}
}
updateStatsReturn(activeTime);
}
- clear連線池
依次銷燬每個連結
public void clear() {
PooledObject<T> p = idleObjects.poll();
while (p != null) {
try {
destroy(p);
} catch (final Exception e) {
swallowException(e);
}
p = idleObjects.poll();
}
}
- 回收物件
此方法實現了org.apache.commons.pool2.impl.BaseGenericObjectPool#evict 方法,用於回收執行緒回收空閒物件。
回收的整個流程為:判斷池是否關閉及是否有空閒物件 –> 根據策略獲得回收的條數 –> 判斷物件狀態開始進行回收 –> 根據回收策略EvictionPolicy判斷是否能夠回收 –> 如能回收則銷燬物件 –> 不能回收則判斷是否校驗物件 –> 啟用物件 –> 校驗物件 –> 鈍化物件 –> 結束回收更改物件狀態 –> 回收洩漏連線
public void evict() throws Exception {
assertOpen();
if (idleObjects.size() > 0) {
PooledObject<T> underTest = null;
final EvictionPolicy<T> evictionPolicy = getEvictionPolicy();
synchronized (evictionLock) {
//回收引數
final EvictionConfig evictionConfig = new EvictionConfig(
getMinEvictableIdleTimeMillis(),
getSoftMinEvictableIdleTimeMillis(),
getMinIdle());
//是否在回收時測試物件
final boolean testWhileIdle = getTestWhileIdle();
//根據getNumTests()對部分物件進行回收測試
for (int i = 0, m = getNumTests(); i < m; i++) {
//evictionIterator是空閒物件的一個迭代器,可以想象為idleObjects.iterator()
if (evictionIterator == null || !evictionIterator.hasNext()) {
evictionIterator = new EvictionIterator(idleObjects);
}
if (!evictionIterator.hasNext()) {
// Pool exhausted, nothing to do here
return;
}
//多執行緒併發時,有可能上面檢測到有物件,而另一個物件隨後將其借出
try {
underTest = evictionIterator.next();
} catch (final NoSuchElementException nsee) {
// 物件被其它執行緒借出
i--;
evictionIterator = null;
continue;
}
//根據狀態判斷是否能夠開始回收測試,並更改狀態,詳細實現請看原始碼走讀(一)
if (!underTest.startEvictionTest()) {
// Object was borrowed in another thread
// Don't count this as an eviction test so reduce i;
i--;
continue;
}
//根據回收策略判斷物件是否能夠被回收,單獨分析
boolean evict;
try {
//根據回收策略判斷物件是否能夠被回收
evict = evictionPolicy.evict(evictionConfig, underTest,
idleObjects.size());
} catch (final Throwable t) {
// Slightly convoluted as SwallowedExceptionListener
// uses Exception rather than Throwable
PoolUtils.checkRethrow(t);
swallowException(new Exception(t));
// Don't evict on error conditions
evict = false;
}
//如果能被回收則銷燬物件
if (evict) {
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
} else {
//不能被回收,則是否進行校驗,與借出流程相同,只不過該處只是校驗而沒有借出實際使用
if (testWhileIdle) {
boolean active = false;
try {
//物件已經被借出,直接啟用
factory.activateObject(underTest);
active = true;
} catch (final Exception e) {
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
}
if (active) {
//啟用成功進行校驗
if (!factory.validateObject(underTest)) {
//校驗不通過則銷燬物件
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
} else {
try {
//校驗通過則重新將物件鈍化(解除安裝)
factory.passivateObject(underTest);
} catch (final Exception e) {
destroy(underTest);
destroyedByEvictorCount.incrementAndGet();
}
}
}
}
//結束回收測試,更改物件狀態或者新增到空閒佇列,
//如果在此途中被借出,還需重新新增到idleObjects,具體實現請看原始碼走讀(一)
if (!underTest.endEvictionTest(idleObjects)) {
// TODO - May need to add code here once additional
// states are used
}
}
}
}
}
//配置了回收,則進行回收洩漏連線
final AbandonedConfig ac = this.abandonedConfig;
if (ac != null && ac.getRemoveAbandonedOnMaintenance()) {
removeAbandoned(ac);
}
}
返回有多少物件需要進行回收測試
private int getNumTests() {
final int numTestsPerEvictionRun = getNumTestsPerEvictionRun();
if (numTestsPerEvictionRun >= 0) {
return Math.min(numTestsPerEvictionRun, idleObjects.size());
}
return (int) (Math.ceil(idleObjects.size() /
Math.abs((double) numTestsPerEvictionRun)));
}