commons-pool2
轉載請註明源出處:http://www.cnblogs.com/lighten/p/7375611.html
1.前言
本章介紹一下常用基礎Jar包commons-pools2,最近使用到了thrift作為rpc服務通訊,但是沒有找到其提供的連接池。百度了一下官方貌似沒有提供,需要自己實現,所以根據網上的實現方法通過使用commons-pool2包來構建自己的thrift連接池服務。完成後,順便研究了一下commons-pool2的實現(怕使用不當有坑),也就有了這篇文章。
2.commons-pool2
既然名字中有個2也就意味著是第二個版本了,重新命名也就意味著和原版本並不兼容。這是apache提供的一個開源包,在很多地方都會使用,2015年7月之後就不再進行更新了,目前最新版本是2.4.2(最近突然又更新了目前在2.5.0版本)。其準確來說應該是一個對象池,不過是常用在於連接上而已。
commons-pool2的類並不多,全部42個類如下:
包的結構簡單,類不多,而且還有大量的接口和內部類,所以實際上需要關註的類沒幾個,commons-pool2使用起來也就方便了。
2.1 常見配置
上面也說了,該類是一個基礎包,被很多其它jar包使用,常用於對象池的管理,所以其一些配置,在寫代碼的時候也會經常接觸到類似的,可能是封裝過的,也可能是版本1的,不過大同小異,主要是思路。了解了配置的具體作用,再結合代碼就能夠了解這類jar包是如何寫的了。
通用的配置都在GenericObjectPoolConfig類中:
maxTotal:對象池中最多允許的對象數,默認8(可能超過,不過超過後使用完了就會銷毀,後面源碼會介紹相關機制)
maxIdle:對象池中最多允許存在的空閑對象,默認8
minIdle:池中最少要保留的對象數,默認0
lifo:是否使用FIFO先進先出的模式獲取對象(空閑對象都在一個隊列中),默認為true使用先進先出,false是先進後出
fairness:是否使用公平鎖,默認false(公平鎖是線程安全中的概念,true的含義是誰先等待獲取鎖,隨先在鎖釋放的時候獲取鎖,如非必要,一般不使用公平鎖,會影響性能)
maxWaitMillis:從池中獲取一個對象最長的等待時間,默認-1,含義是無限等,超過這個時間還未獲取空閑對象,就會拋出異常。
minEvictableIdleTimeMillis:最小的驅逐時間,單位毫秒,默認30分鐘。這個用於驅逐線程,對象空閑時間超過這個時間,意味著此時系統不忙碌,會減少對象數量。
evictorShutdownTimeoutMillis:驅逐線程關閉的超時時間,默認10秒。
softMinEvictableIdleTimeMillis:也是最小的驅逐時間,但是會和另一個指標minIdle一同使用,滿足空閑時間超過這個設置,且當前空閑數量比設置的minIdle要大,會銷毀該對象。所以,通常該值設置的比minEvictableIdleTimeMillis要小。
numTestsPerEvictionRun:驅逐線程運行每次測試的對象數量,默認3個。驅逐線程就是用來檢查對象空閑狀態,通過設置的對象數量等參數,保持對象的活躍度和數量,其是一個定時任務,每次不是檢查所有的對象,而是抽查幾個,這個就是用於抽查。
evictionPolicyClassName:驅逐線程使用的策略類名,之前的minEvictableIdleTimeMillis和softMinEvictableIdleTimeMillis就是默認策略DefaultEvictionPolicy的實現,可以自己實現策略。
testOnCreate:在創建對象的時候是否檢測對象,默認false。後續會結合代碼說明是如何檢測的。
testOnBorrow:在獲取空閑對象的時候是否檢測對象是否有效,默認false。這個通常會設置成true,一般希望獲取一個可用有效的對象吧。
testOnReturn:在使用完對象放回池中時是否檢測對象是否仍有效,默認false。
testWhileIdle:在空閑的時候是否檢測對象是否有效,這個發生在驅逐線程執行時。
timeBetweenEvictionRunsMillis:驅逐線程的執行周期,上面說過該線程是個定時任務。默認-1,即不開啟驅逐線程,所以與之相關的參數是沒有作用的。
blockWhenExhausted:在對象池耗盡時是否阻塞,默認true。false的話超時就沒有作用了。
jmxEnabled:是否允許jmx的方式創建一個配置實例,默認true。
jmxNamePrefix:jmx默認的前綴名,默認為pool
jmxNameBase:jmx默認的base name,默認為null,意味著池提供一個名稱。
2.2 基本實現
上面的配置已經說明了一些內容了,此節介紹對象池的一個基礎實現思路。
首先作為一個對象池,我們需要從池中借對象,借完了要還,還要能創建對象存入池中,校驗對象是否還能使用。這個就是一個對象池的基本定義了:
commons-pool2還提供了一種控制細粒度更高的對象池KeyedObjectPool<K,V>。其根據關鍵字來維護不同的池,在某些場景十分有用,這裏不對其做詳細介紹,弄明白了一般的線程池,對這種池擴展也就有了思路。從接口到抽象類到具體實現類池經歷了下面幾個類:ObjectPool->BaseObjectPool->SoftReferenceObjectPool,這種用的比較少,看名次也知道是軟引用的池,另一種是BaseGenericObjectPool->GenericObjectPool。通常我們會繼承GenericObjectPool來設計自己的對象池。
有了池之後,我們需要一個創建池對象的類,這個就是工廠類:PooledObjectFactory。其要提供一個對象的生命周期的各個操作,包括創建、銷毀、校驗有效性、激活和鈍化對象。
同樣,其提供了一個抽象類BasePooledObjectFactory,這個就沒有具體的實現類了,因為涉及到不同對象,不同對象的管理方法不同,不好抽象。我們需要做的就是繼承BasePooledObjectFactory對象,實現其未實現或者空實現的方法了,即上述截圖的方法。
有了池來管理對象使用,有了工廠來管理對象的生命周期,一般而言也就夠了。但是還有一個重要的環節就是將池與工廠連接起來的對象的定義,所以要進行抽象。這就是PooledObject的作用了,其定義的一系列方法,將我們的對象和池以及工廠,通過這個接口關聯了起來。實現類是DefaultPooledObject,通常使用這個就夠了,不需要擴展。如果業務上對對象有更細致的控制,可以繼承或者直接自己實現PooledObject。
到此也就剩下一個驅逐線程沒介紹了,其維持著池的健康,或者說是活力。以上就是一個對象池的基本內容:池本身,創建對象的工廠,清潔池的驅逐線程,關聯池和工廠和自己創建的對象的池對象規範。這樣也就沒幾個類沒介紹過了,剩下的就是UsageTracking,看名稱也能知道是幹啥的了,後面介紹詳細流程的時候會順帶一提,用的不多。
2.3 具體實現
下面結合具體代碼構建一個對象池,來說明該池的是怎麽工作的。首先該對象的基本定義如下:需要一個對象類型Student,需要一個連接池管理對象CommonObjectPool,需要一個制造對象的工廠StudentFactory。其大體代碼如下:
public class Student { private String name; private int age; public Student(String name, int age) { this.name = name; this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "Student{" + "name=‘" + name + ‘\‘‘ + ", age=" + age + ‘}‘; } } public class StudentFactory extends BasePooledObjectFactory<Student> { private Random random = new Random(); public Student create() throws Exception { int age = random.nextInt(100); Student student = new Student("commons", age); System.out.println("創建對象:" + student); return student; } public PooledObject<Student> wrap(Student obj) { return new DefaultPooledObject<Student>(obj); } @Override public void destroyObject(PooledObject<Student> p) throws Exception { System.out.println("銷毀對象:" + p.getObject()); super.destroyObject(p); } @Override public boolean validateObject(PooledObject<Student> p) { System.out.println("校驗對象是否可用:" + p.getObject()); return super.validateObject(p); } @Override public void activateObject(PooledObject<Student> p) throws Exception { System.out.println("激活鈍化的對象系列操作:" + p.getObject()); super.activateObject(p); } @Override public void passivateObject(PooledObject<Student> p) throws Exception { System.out.println("鈍化未使用的對象:" + p.getObject()); super.passivateObject(p); } } public class CommonObjectPool extends GenericObjectPool<Student> { public CommonObjectPool(PooledObjectFactory<Student> factory, GenericObjectPoolConfig config, AbandonedConfig abandonedConfig) { super(factory, config, abandonedConfig); } }
使用方法如下:
public class Test { public static void main(String[] args) { StudentFactory studentFactory = new StudentFactory(); GenericObjectPoolConfig config = new GenericObjectPoolConfig(); AbandonedConfig abandonedConfig = new AbandonedConfig(); CommonObjectPool pool = new CommonObjectPool(studentFactory, config, abandonedConfig); Student student = null; try { student = pool.borrowObject(); System.out.println(student); } catch (Exception e) { e.printStackTrace(); } finally { if(student != null) pool.returnObject(student); } } }
下面結合源碼和例子講解執行過程,先是通過對象工廠類和配置初始化了一個pool,pool的初始化操作代碼如下:
public GenericObjectPool(final PooledObjectFactory<T> factory, final GenericObjectPoolConfig config) { super(config, ONAME_BASE, config.getJmxNamePrefix()); if (factory == null) { jmxUnregister(); // tidy up throw new IllegalArgumentException("factory may not be null"); } this.factory = factory; idleObjects = new LinkedBlockingDeque<>(config.getFairness()); setConfig(config); startEvictor(getTimeBetweenEvictionRunsMillis()); }
主要的工作就是設置工廠類,配置,開啟驅逐線程。下面先介紹驅逐線程的工作機制:
final void startEvictor(final long delay) { synchronized (evictionLock) { if (null != evictor) { EvictionTimer.cancel(evictor, evictorShutdownTimeoutMillis, TimeUnit.MILLISECONDS); evictor = null; evictionIterator = null; } if (delay > 0) { evictor = new Evictor(); EvictionTimer.schedule(evictor, delay, delay); } } }
如果設置過了就會關閉,不然要delay大於0才會開啟該線程,該值就是config中的getTimeBetweenEvictionRunsMillis。開啟方式就是通過EvictionTimer的周期任務,這實際上就是一個Timer定時器。該定時器做的工作如下:
public void run() { final ClassLoader savedClassLoader = Thread.currentThread().getContextClassLoader(); try { if (factoryClassLoader != null) { // Set the class loader for the factory final ClassLoader cl = factoryClassLoader.get(); if (cl == null) { // The pool has been dereferenced and the class loader // GC‘d. Cancel this timer so the pool can be GC‘d as // well. cancel(); return; } Thread.currentThread().setContextClassLoader(cl); } // Evict from the pool try { evict(); } catch(final Exception e) { swallowException(e); } catch(final OutOfMemoryError oome) { // Log problem but give evictor thread a chance to continue // in case error is recoverable oome.printStackTrace(System.err); } // Re-create idle instances. try { ensureMinIdle(); } catch (final Exception e) { swallowException(e); } } finally { // Restore the previous CCL Thread.currentThread().setContextClassLoader(savedClassLoader); } }
可以看出,先進行了驅逐,再判斷是否小於minIdle的設置,小於就會再次創建對象。
private void ensureIdle(final int idleCount, final boolean always) throws Exception { if (idleCount < 1 || isClosed() || (!always && !idleObjects.hasTakeWaiters())) { return; } while (idleObjects.size() < idleCount) { final PooledObject<T> p = create(); if (p == null) { // Can‘t create objects, no reason to think another call to // create will work. Give up. break; } if (getLifo()) { idleObjects.addFirst(p); } else { idleObjects.addLast(p); } } if (isClosed()) { // Pool closed while object was being added to idle objects. // Make sure the returned object is destroyed rather than left // in the idle object pool (which would effectively be a leak) clear(); } }
就是調用create方法創建,根據lifo的參數決定是先入先出還是後入先出。evict方法主要做了如下操作:
public void evict() throws Exception { assertOpen(); if (idleObjects.size() > 0) { PooledObject<T> underTest = null; final EvictionPolicy<T> evictionPolicy = getEvictionPolicy(); synchronized (evictionLock) { final EvictionConfig evictionConfig = new EvictionConfig( getMinEvictableIdleTimeMillis(), getSoftMinEvictableIdleTimeMillis(), getMinIdle()); final boolean testWhileIdle = getTestWhileIdle(); for (int i = 0, m = getNumTests(); i < m; i++) { if (evictionIterator == null || !evictionIterator.hasNext()) { evictionIterator = new EvictionIterator(idleObjects); } if (!evictionIterator.hasNext()) { // Pool exhausted, nothing to do here return; } try { underTest = evictionIterator.next(); } catch (final NoSuchElementException nsee) { // Object was borrowed in another thread // Don‘t count this as an eviction test so reduce i; i--; evictionIterator = null; continue; } if (!underTest.startEvictionTest()) { // Object was borrowed in another thread // Don‘t count this as an eviction test so reduce i; i--; continue; } // User provided eviction policy could throw all sorts of // crazy exceptions. Protect against such an exception // killing the eviction thread. boolean evict; try { evict = evictionPolicy.evict(evictionConfig, underTest, idleObjects.size()); } catch (final Throwable t) { // Slightly convoluted as SwallowedExceptionListener // uses Exception rather than Throwable PoolUtils.checkRethrow(t); swallowException(new Exception(t)); // Don‘t evict on error conditions evict = false; } if (evict) { destroy(underTest); destroyedByEvictorCount.incrementAndGet(); } else { if (testWhileIdle) { boolean active = false; try { factory.activateObject(underTest); active = true; } catch (final Exception e) { destroy(underTest); destroyedByEvictorCount.incrementAndGet(); } if (active) { if (!factory.validateObject(underTest)) { destroy(underTest); destroyedByEvictorCount.incrementAndGet(); } else { try { factory.passivateObject(underTest); } catch (final Exception e) { destroy(underTest); destroyedByEvictorCount.incrementAndGet(); } } } } if (!underTest.endEvictionTest(idleObjects)) { // TODO - May need to add code here once additional // states are used } } } } } final AbandonedConfig ac = this.abandonedConfig; if (ac != null && ac.getRemoveAbandonedOnMaintenance()) { removeAbandoned(ac); } }
先是判斷池是開啟狀態,且空閑對象要大於0,不然不需要驅逐。然後循環了設置的numTests的次數,一次驅逐就檢查這麽多個對象。後面一段是並發被幹擾的一些操作,主要是保證被幹擾後仍檢查這麽些對象。最後就是根據驅逐策略來驅逐對象。上面配置項說過是怎麽回事,具體見DefaultEvictionPolicy。如果判斷是驅逐,就調用destory方法銷毀對象。否則,判斷testWhileIdle配置項,決定是否校驗對象是否仍可用,先激活對象activateObject,有異常直接銷毀。否則開始校驗對象的可用性,validateObject。失敗銷毀,成功就鈍化變成原樣子。鈍化失敗也直接銷毀。最後是一個遺棄對象的設置,就是說有些對象借出去了由於種種原因,比如寫法上的問題,導致對象很久沒有還回來,這個設置就是用於清理這類對象的。這類對象不再被池借出,但又暫用了資源。一般而言該配置很少用到,因為寫方式通常都將return操作放在finally模塊,不會出現此類情況。
最後我們看下借對象和還對象都做了哪些操作吧。
public T borrowObject(final long borrowMaxWaitMillis) throws Exception { assertOpen(); final AbandonedConfig ac = this.abandonedConfig; if (ac != null && ac.getRemoveAbandonedOnBorrow() && (getNumIdle() < 2) && (getNumActive() > getMaxTotal() - 3) ) { removeAbandoned(ac); } PooledObject<T> p = null; // Get local copy of current config so it is consistent for entire // method execution final boolean blockWhenExhausted = getBlockWhenExhausted(); boolean create; final long waitTime = System.currentTimeMillis(); while (p == null) { create = false; p = idleObjects.pollFirst(); if (p == null) { p = create(); if (p != null) { create = true; } } if (blockWhenExhausted) { if (p == null) { if (borrowMaxWaitMillis < 0) { p = idleObjects.takeFirst(); } else { p = idleObjects.pollFirst(borrowMaxWaitMillis, TimeUnit.MILLISECONDS); } } if (p == null) { throw new NoSuchElementException( "Timeout waiting for idle object"); } } else { if (p == null) { throw new NoSuchElementException("Pool exhausted"); } } if (!p.allocate()) { p = null; } if (p != null) { try { factory.activateObject(p); } catch (final Exception e) { try { destroy(p); } catch (final Exception e1) { // Ignore - activation failure is more important } p = null; if (create) { final NoSuchElementException nsee = new NoSuchElementException( "Unable to activate object"); nsee.initCause(e); throw nsee; } } if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) { boolean validate = false; Throwable validationThrowable = null; try { validate = factory.validateObject(p); } catch (final Throwable t) { PoolUtils.checkRethrow(t); validationThrowable = t; } if (!validate) { try { destroy(p); destroyedByBorrowValidationCount.incrementAndGet(); } catch (final Exception e) { // Ignore - validation failure is more important } p = null; if (create) { final NoSuchElementException nsee = new NoSuchElementException( "Unable to validate object"); nsee.initCause(validationThrowable); throw nsee; } } } } } updateStatsBorrow(p, System.currentTimeMillis() - waitTime); return p.getObject(); }
借的操作步驟如下:先確定池是否開啟,再根據條件決定是否移除遺棄的對象。開始獲取對象:1.從idle中獲取一個,沒獲取到就創建一個,創建的邏輯涉及參數maxTotal,超過這個值不會創建對象,返回null,maxTotal為-1意為創建的數量為無限(整數最大)。2.創建失敗,線程阻塞,等待時間為-1就一直等待,不為-1等到指定時間還沒等到,就拋出異常。3.不等待直接會在沒獲取對象的時候直接拋出異常。4.對象狀態不對,沒有鎖定,置為null。5.上述都沒問題,獲取對象後開始激活對象,失敗銷毀對象。成功後判斷是否borrow和create的時候要校驗對象可用性,需要進行校驗,校驗失敗銷毀。上訴是一個while(p==null)的循環,所以borrow的結果只有2種,1是借不到對象超時,2是借到對象。其他就是等待獲取空閑對象。
還對象的邏輯也不難:
public void returnObject(final T obj) { final PooledObject<T> p = allObjects.get(new IdentityWrapper<>(obj)); if (p == null) { if (!isAbandonedConfig()) { throw new IllegalStateException( "Returned object not currently part of this pool"); } return; // Object was abandoned and removed } synchronized(p) { final PooledObjectState state = p.getState(); if (state != PooledObjectState.ALLOCATED) { throw new IllegalStateException( "Object has already been returned to this pool or is invalid"); } p.markReturning(); // Keep from being marked abandoned } final long activeTime = p.getActiveTimeMillis(); if (getTestOnReturn()) { if (!factory.validateObject(p)) { try { destroy(p); } catch (final Exception e) { swallowException(e); } try { ensureIdle(1, false); } catch (final Exception e) { swallowException(e); } updateStatsReturn(activeTime); return; } } try { factory.passivateObject(p); } catch (final Exception e1) { swallowException(e1); try { destroy(p); } catch (final Exception e) { swallowException(e); } try { ensureIdle(1, false); } catch (final Exception e) { swallowException(e); } updateStatsReturn(activeTime); return; } if (!p.deallocate()) { throw new IllegalStateException( "Object has already been returned to this pool or is invalid"); } final int maxIdleSave = getMaxIdle(); if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) { try { destroy(p); } catch (final Exception e) { swallowException(e); } } else { if (getLifo()) { idleObjects.addFirst(p); } else { idleObjects.addLast(p); } if (isClosed()) { // Pool closed while object was being added to idle objects. // Make sure the returned object is destroyed rather than left // in the idle object pool (which would effectively be a leak) clear(); } } updateStatsReturn(activeTime); }
更新對象狀態,判斷還的時候是否要校驗對象可用性,不可用銷毀。之後鈍化對象,鈍化失敗銷毀,超過maxIdle也直接銷毀。最後根據lifo來確定放回方式。因為涉及銷毀對象,所以都要進行確定minidle來決定是否補充對象。
3.結束語
commons-pool2的主要邏輯就是上述內容了,代碼例子也給了一個。這裏總結一下對象的一個生命周期:create->activate->invalidate->borrow->invalidate->return->destory。其中validate階段發生在各個環節,主要通過TestOnXXX進行配置決定。
commons-pool2