1. 程式人生 > >commons-pool2原始碼走讀(四) 物件池實現GenericObjectPool

commons-pool2原始碼走讀(四) 物件池實現GenericObjectPool

commons-pool2原始碼走讀(四) 物件池實現GenericObjectPool<T>

GenericObjectPool <T> 是一個可配置的ObjectPool實現。
當與適當的PooledObjectFactory組合使用時,GenericObjectPool為任意物件提供健壯的池功能。

您可以選擇性的配置池來檢查和可能回收池中的空閒物件,並確保有最少數量的空閒物件可用。這是由一個“空閒物件回收”執行緒(即BaseGenericObjectPool <T> 的Evictor)執行的,執行緒是非同步執行的。在配置這個可選特性時,應該謹慎使用。驅逐執行與客戶端執行緒爭用池中的物件,因此如果它們執行得太頻繁,可能會導致效能問題。

還可以配置池來檢測和刪除被洩漏的物件,比如一個從池中借出的物件,在超過removeAbandonedTimeout超時之前既不使用也不返回。移除洩漏的連線,可能發生在物件被借用時物件池已接近飽和,也可能是被回收執行緒檢查出,或者兩者都執行時。如果池物件實現了TrackedUse介面,那麼其最後一次使用時間使取決於getLastUsed方法;否則,是由物件從池中借出的時間決定。

實現注意:為了防止可能的死鎖,已經採取了謹慎措施,以確保在同步塊中不會發生對工廠方法的呼叫。這個類執行緒安全。

1、介面繼承、實現關係

GenericObjectPool <T> 實現了ObjectPool<T> 具備物件池的功能,同時 繼承了BaseGenericObjectPool<T> 的對於物件狀態管理和回收等功能。
這裡寫圖片描述

2、建構函式

建構函式通過GenericObjectPoolConfig 和PooledObjectFactory來進行引數的初始化和物件工廠類的引入。

    public GenericObjectPool(final PooledObjectFactory<T> factory,
            final GenericObjectPoolConfig config) {
        //父類BaseGenericObjectPool構造方法
        super(config, ONAME_BASE, config.getJmxNamePrefix());

        if
(factory == null) { jmxUnregister(); // tidy up throw new IllegalArgumentException("factory may not be null"); } this.factory = factory; //空閒物件佇列,此佇列非JDK而是自行實現的一個佇列 idleObjects = new LinkedBlockingDeque<>(config.getFairness()); //覆蓋BaseGenericObjectPool裡面的配置引數 setConfig(config); //初始化回收執行緒 startEvictor(getTimeBetweenEvictionRunsMillis()); }

3、相關屬性

    // --- 可配置的屬性 -------------------------------------------------
    //最大空閒數量
    private volatile int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
    //最小空閒數量
    private volatile int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
    //物件工廠
    private final PooledObjectFactory<T> factory;

    // --- 內部屬性 -------------------------------------------------

    //池中所有的物件,只能是<=maxActive
    private final Map<IdentityWrapper<T>, PooledObject<T>> allObjects =
        new ConcurrentHashMap<>();
    //已建立物件總數(不包含已銷燬的)
    private final AtomicLong createCount = new AtomicLong(0);
    //呼叫建立方法匯流排程數
    private long makeObjectCount = 0;
    //makeObjectCount 增長時併發鎖
    private final Object makeObjectCountLock = new Object();
    //空閒物件佇列
    private final LinkedBlockingDeque<PooledObject<T>> idleObjects;

    // JMX specific attributes
    private static final String ONAME_BASE =
        "org.apache.commons.pool2:type=GenericObjectPool,name=";

    //洩漏物件回收配置引數
    private volatile AbandonedConfig abandonedConfig = null;

4、 物件池方法實現

  • 借用物件
    整個流程為,檢查池是否關閉 –> 是否回收洩漏物件 –> 是否阻塞建立物件 –> 建立物件 –> 分配物件 –> 啟用物件 –> 校驗物件 –> 更改借用資訊 –> 返回物件
   public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
        //判斷物件池是否關閉:BaseGenericObjectPool.closed==true
        assertOpen();
        //如果回收洩漏的引數配置不為空,並且removeAbandonedOnBorrow引數配置為true
        //並且Idle數量<2,Active數量>總數Total-3
        //在借用時進行回收洩漏連線(會影響效能)
        final AbandonedConfig ac = this.abandonedConfig;
        if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
                (getNumIdle() < 2) &&
                (getNumActive() > getMaxTotal() - 3) ) {
            //回收洩漏物件
            removeAbandoned(ac);
        }

        PooledObject<T> p = null;

        //copy blockWhenExhausted 防止其它執行緒更改getBlockWhenExhausted值造成併發問題
        //借用物件時如果沒有是否阻塞直到有物件產生
        final boolean blockWhenExhausted = getBlockWhenExhausted();
        //建立成功標識
        boolean create;
        //記錄當前時間,用作記錄借用操作總共花費的時間
        final long waitTime = System.currentTimeMillis();
        //當物件為空時一直獲取
        while (p == null) {
            create = false;
            //從雙端佇列彈出第一個隊首物件,為空返回null
            p = idleObjects.pollFirst();
            //如果為空則重新建立一個物件
            if (p == null) {
                //建立物件
                p = create();
                //p==null可能物件池達到上限不能繼續建立!
                if (p != null) {
                    create = true;
                }
            }
            //如果物件p還是為空則阻塞等待
            if (blockWhenExhausted) {
                if (p == null) {
                    if (borrowMaxWaitMillis < 0) {
                        //沒有超時時間則阻塞等待到有物件為止
                        p = idleObjects.takeFirst();
                    } else {
                        //有超時時間
                        p = idleObjects.pollFirst(borrowMaxWaitMillis,
                                TimeUnit.MILLISECONDS);
                    }
                }
                //達到超時時間,還未取到物件,則丟擲異常
                if (p == null) {
                    throw new NoSuchElementException(
                            "Timeout waiting for idle object");
                }
            } else {
                //未取到物件,則丟擲異常
                if (p == null) {
                    throw new NoSuchElementException("Pool exhausted");
                }
            }
            //呼叫PooledObject.allocate()方法分配物件
            //[具體實現請看](https://blog.csdn.net/qq447995687/article/details/80413227)
            if (!p.allocate()) {
                p = null;
            }
            //分配成功
            if (p != null) {
                try {
                    //啟用物件,具體請看factory實現,物件重借出到歸還整個流程經歷的過程圖
                    factory.activateObject(p);
                } catch (final Exception e) {
                    try {
                        destroy(p);
                    } catch (final Exception e1) {
                        // Ignore - activation failure is more important
                    }
                    p = null;
                    if (create) {
                        final NoSuchElementException nsee = new NoSuchElementException(
                                "Unable to activate object");
                        nsee.initCause(e);
                        throw nsee;
                    }
                }
                //物件建立成功,是否進行測試
                if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {
                    boolean validate = false;
                    Throwable validationThrowable = null;
                    try {
                        //校驗物件,具體請看factory實現,物件重借出到歸還整個流程經歷的過程圖
                        validate = factory.validateObject(p);
                    } catch (final Throwable t) {
                        PoolUtils.checkRethrow(t);
                        validationThrowable = t;
                    }
                    //校驗不通過則銷燬物件
                    if (!validate) {
                        try {
                            destroy(p);
                            destroyedByBorrowValidationCount.incrementAndGet();
                        } catch (final Exception e) {
                            // Ignore - validation failure is more important
                        }
                        p = null;
                        if (create) {
                            final NoSuchElementException nsee = new NoSuchElementException(
                                    "Unable to validate object");
                            nsee.initCause(validationThrowable);
                            throw nsee;
                        }
                    }
                }
            }
        }
        //更新物件借用狀態
        updateStatsBorrow(p, System.currentTimeMillis() - waitTime);

        return p.getObject();
    }

建立物件
當借用時,空閒物件為空,並且未達到池最大數量,則會呼叫該方法重新建立一個空閒物件

   private PooledObject<T> create() throws Exception {
        int localMaxTotal = getMaxTotal();
        // 如果最大數量為負數則設定為Integer的最大值
        if (localMaxTotal < 0) {
            localMaxTotal = Integer.MAX_VALUE;
        }

        // 是否建立成功的一個flag:
        // - TRUE:  呼叫工廠類成功建立一個物件
        // - FALSE: 返回空
        // - null:  重複建立
        Boolean create = null;
        while (create == null) {
            synchronized (makeObjectCountLock) {
                //加上本次操作,總共建立個數
                final long newCreateCount = createCount.incrementAndGet();
                if (newCreateCount > localMaxTotal) {
                    //連線池容量已滿,不能繼續增長。在對最後一個物件的建立上,
                    //加入了設計較為精妙,需細細揣摩
                    createCount.decrementAndGet();
                    //呼叫建立物件方法執行緒數=0
                    if (makeObjectCount == 0) {
                        //容量已滿並且沒有執行緒呼叫makeObject()方法,
                        //表明沒有任何可能性再繼續建立物件,
                        //返回並等待歸還的空閒物件
                        create = Boolean.FALSE;
                    } else {
                        //其它執行緒呼叫makeObject()方法在建立物件了。
                        //如果繼續建立則可能超過物件池容量,不返回false,因為其它執行緒也在建立,
                        //但是是否能夠建立成功是未知的,如果其它執行緒沒能建立成功,
                        //則此執行緒可能會搶奪到繼續建立的權利。
                        //釋放鎖,等待其它執行緒建立結束並喚醒該執行緒
                        makeObjectCountLock.wait();
                    }
                } else {
                    // 物件池未滿,從新建立一個物件
                    makeObjectCount++;
                    create = Boolean.TRUE;
                }
            }
        }
        //物件池容量達到上限,返回null重新等待其它執行緒歸還物件
        if (!create.booleanValue()) {
            return null;
        }

        final PooledObject<T> p;
        try {
            //建立一個新物件
            p = factory.makeObject();
        } catch (final Exception e) {
            createCount.decrementAndGet();
            throw e;
        } finally {
            //與上面wait()方法相呼應,
            //如果上面丟擲了異常,喚醒其它執行緒爭奪繼續建立最後一個資源的權利
            synchronized (makeObjectCountLock) {
                makeObjectCount--;
                makeObjectCountLock.notifyAll();
            }
        }
        //設定洩漏引數,並加入呼叫堆疊
        final AbandonedConfig ac = this.abandonedConfig;
        if (ac != null && ac.getLogAbandoned()) {
            p.setLogAbandoned(true);
            // TODO: in 3.0, this can use the method defined on PooledObject
            if (p instanceof DefaultPooledObject<?>) {
                ((DefaultPooledObject<T>) p).setRequireFullStackTrace(ac.getRequireFullStackTrace());
            }
        }
        //將建立總數增加,並將物件放入    allObjects  
        createdCount.incrementAndGet();
        allObjects.put(new IdentityWrapper<>(p.getObject()), p);
        return p;
    }

回收洩漏物件

 private void removeAbandoned(final AbandonedConfig ac) {
        // Generate a list of abandoned objects to remove
        final long now = System.currentTimeMillis();
        //超時時間=當前時間-配置的超時時間,如果一個物件的上次借用時間在此時間之前,
        //說明上次借用後經過了removeAbandonedTimeout時間限制還未被歸還過即可能是洩漏的物件
        final long timeout =
                now - (ac.getRemoveAbandonedTimeout() * 1000L);
        //洩漏的,需要移除的物件列表
        final ArrayList<PooledObject<T>> remove = new ArrayList<>();
        final Iterator<PooledObject<T>> it = allObjects.values().iterator();
        //遍歷池中物件依次判斷是否需要移除
        while (it.hasNext()) {
            final PooledObject<T> pooledObject = it.next();
            synchronized (pooledObject) {
                //如果物件的狀態為已分配ALLOCATED ,並且已經超過洩漏定義時間則新增到需要移除佇列進行統一移除
                if (pooledObject.getState() == PooledObjectState.ALLOCATED &&
                        pooledObject.getLastUsedTime() <= timeout) {
                    pooledObject.markAbandoned();
                    remove.add(pooledObject);
                }
            }
        }

        // 移除洩漏連線,如果配置了列印堆疊,則列印呼叫堆疊資訊
        final Iterator<PooledObject<T>> itr = remove.iterator();
        while (itr.hasNext()) {
            final PooledObject<T> pooledObject = itr.next();
            if (ac.getLogAbandoned()) {
                pooledObject.printStackTrace(ac.getLogWriter());
            }
            try {
                //銷燬物件
                invalidateObject(pooledObject.getObject());
            } catch (final Exception e) {
                e.printStackTrace();
            }
        }
    }
  • invalidate物件
    public void invalidateObject(final T obj) throws Exception {
        //從所有物件中取出該物件,如果不存在則丟擲異常
        final PooledObject<T> p = allObjects.get(new IdentityWrapper<>(obj));
        if (p == null) {
            if (isAbandonedConfig()) {
                return;
            }
            throw new IllegalStateException(
                    "Invalidated object not currently part of this pool");
        }
        //如果物件不是無效狀態PooledObjectState.INVALID,則銷燬此物件
        synchronized (p) {
            if (p.getState() != PooledObjectState.INVALID) {
                destroy(p);
            }
        }
        //
        ensureIdle(1, false);
    }

銷燬物件
銷燬物件,並從池中移除,更新物件池已建立數量和總銷燬數量

    private void destroy(final PooledObject<T> toDestroy) throws Exception {
        toDestroy.invalidate();
        idleObjects.remove(toDestroy);
        allObjects.remove(new IdentityWrapper<>(toDestroy.getObject()));
        try {
            factory.destroyObject(toDestroy);
        } finally {
            destroyedCount.incrementAndGet();
            createCount.decrementAndGet();
        }
    }

確保最小空閒數量

    private void ensureIdle(final int idleCount, final boolean always) throws Exception {
        //!idleObjects.hasTakeWaiters()如果idleObjects佇列還有執行緒等待獲取物件則由最後一個
        //等待者確保最小空閒數量
        if (idleCount < 1 || isClosed() || (!always && !idleObjects.hasTakeWaiters())) {
            return;
        }
        //一直建立空閒物件知道空閒物件數量>總空閒數量閾值
        while (idleObjects.size() < idleCount) {
            final PooledObject<T> p = create();
            if (p == null) {
                // Can't create objects, no reason to think another call to
                // create will work. Give up.
                break;
            }
            //根據先進先出引數,新增物件到隊首或者隊尾
            if (getLifo()) {
                idleObjects.addFirst(p);
            } else {
                idleObjects.addLast(p);
            }
        }
        //在此過程中如果連線池關閉則clear所有物件
        if (isClosed()) {
            clear();
        }
    }
  • 歸還物件

歸還物件方法將適用完的物件從新放置回物件池中重複利用。其整個流程為:檢查是否存在 –> 檢查狀態是否正確 –> 是否在歸還時測試物件 –> 校驗物件 –> 鈍化(解除安裝)物件 –> 結束分配 –> 銷燬/歸還該物件 –> 更新連線池歸還資訊

    public void returnObject(final T obj) {
        final PooledObject<T> p = allObjects.get(new IdentityWrapper<>(obj));

        if (p == null) {
            //如果物件為空,並且沒有配置洩漏引數則丟擲異常,表明該物件不是連線池中的物件
            if (!isAbandonedConfig()) {
                throw new IllegalStateException(
                        "Returned object not currently part of this pool");
            }
            //如果物件為空,表明該物件是abandoned並且已被銷燬
            return; 
        }

        synchronized(p) {
            final PooledObjectState state = p.getState();
            //如果被歸還的物件不是已分配狀態,丟擲異常
            if (state != PooledObjectState.ALLOCATED) {
                throw new IllegalStateException(
                        "Object has already been returned to this pool or is invalid");
            }
            //更改狀態為returning,避免在此過程中被標記為被遺棄。
            p.markReturning();
        }

        final long activeTime = p.getActiveTimeMillis();
        //是否在歸還時測試該物件
        if (getTestOnReturn()) {
            //校驗物件
            if (!factory.validateObject(p)) {
                try {
                    //校驗不通過則destroy物件
                    destroy(p);
                } catch (final Exception e) {
                    swallowException(e);
                }
                try {
                    //確保最小空閒數量
                    ensureIdle(1, false);
                } catch (final Exception e) {
                    swallowException(e);
                }
                //更新連線池歸還資訊BaseGenericObjectPool#returnedCount,activeTimes
                updateStatsReturn(activeTime);
                return;
            }
        }
        //校驗通過
        try {
            //鈍化(解除安裝)物件
            factory.passivateObject(p);
        } catch (final Exception e1) {
            swallowException(e1);
            try {
                destroy(p);
            } catch (final Exception e) {
                swallowException(e);
            }
            try {
                ensureIdle(1, false);
            } catch (final Exception e) {
                swallowException(e);
            }
            updateStatsReturn(activeTime);
            return;
        }
        //結束分配,如果物件為ALLOCATED或者RETURNING更改物件為空閒IDLE狀態
        //具體看org.apache.commons.pool2.impl.DefaultPooledObject#deallocate方法
        if (!p.deallocate()) {
            throw new IllegalStateException(
                    "Object has already been returned to this pool or is invalid");
        }

        final int maxIdleSave = getMaxIdle();
        //如果物件池已經關閉或者空閒數量達到上限,則銷燬該物件
        if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
            try {
                destroy(p);
            } catch (final Exception e) {
                swallowException(e);
            }
        } else {
            //否則將歸還的物件新增到空閒佇列,連線池的最終目的:重用一個連線
            if (getLifo()) {
                idleObjects.addFirst(p);
            } else {
                idleObjects.addLast(p);
            }
            if (isClosed()) {
                // Pool closed while object was being added to idle objects.
                // Make sure the returned object is destroyed rather than left
                // in the idle object pool (which would effectively be a leak)
                clear();
            }
        }
        updateStatsReturn(activeTime);
    }
  • clear連線池
    依次銷燬每個連結
    public void clear() {
        PooledObject<T> p = idleObjects.poll();

        while (p != null) {
            try {
                destroy(p);
            } catch (final Exception e) {
                swallowException(e);
            }
            p = idleObjects.poll();
        }
    }
  • 回收物件

此方法實現了org.apache.commons.pool2.impl.BaseGenericObjectPool#evict 方法,用於回收執行緒回收空閒物件。
回收的整個流程為:判斷池是否關閉及是否有空閒物件 –> 根據策略獲得回收的條數 –> 判斷物件狀態開始進行回收 –> 根據回收策略EvictionPolicy判斷是否能夠回收 –> 如能回收則銷燬物件 –> 不能回收則判斷是否校驗物件 –> 啟用物件 –> 校驗物件 –> 鈍化物件 –> 結束回收更改物件狀態 –> 回收洩漏連線

public void evict() throws Exception {
        assertOpen();

        if (idleObjects.size() > 0) {

            PooledObject<T> underTest = null;
            final EvictionPolicy<T> evictionPolicy = getEvictionPolicy();

            synchronized (evictionLock) {
                //回收引數
                final EvictionConfig evictionConfig = new EvictionConfig(
                        getMinEvictableIdleTimeMillis(),
                        getSoftMinEvictableIdleTimeMillis(),
                        getMinIdle());
                //是否在回收時測試物件
                final boolean testWhileIdle = getTestWhileIdle();
                //根據getNumTests()對部分物件進行回收測試
                for (int i = 0, m = getNumTests(); i < m; i++) {
                    //evictionIterator是空閒物件的一個迭代器,可以想象為idleObjects.iterator()
                    if (evictionIterator == null || !evictionIterator.hasNext()) {
                        evictionIterator = new EvictionIterator(idleObjects);
                    }
                    if (!evictionIterator.hasNext()) {
                        // Pool exhausted, nothing to do here
                        return;
                    }
                    //多執行緒併發時,有可能上面檢測到有物件,而另一個物件隨後將其借出
                    try {
                        underTest = evictionIterator.next();
                    } catch (final NoSuchElementException nsee) {
                        // 物件被其它執行緒借出
                        i--;
                        evictionIterator = null;
                        continue;
                    }
                    //根據狀態判斷是否能夠開始回收測試,並更改狀態,詳細實現請看原始碼走讀(一) 
                    if (!underTest.startEvictionTest()) {
                        // Object was borrowed in another thread
                        // Don't count this as an eviction test so reduce i;
                        i--;
                        continue;
                    }

                    //根據回收策略判斷物件是否能夠被回收,單獨分析
                    boolean evict;
                    try {
                        //根據回收策略判斷物件是否能夠被回收
                        evict = evictionPolicy.evict(evictionConfig, underTest,
                                idleObjects.size());
                    } catch (final Throwable t) {
                        // Slightly convoluted as SwallowedExceptionListener
                        // uses Exception rather than Throwable
                        PoolUtils.checkRethrow(t);
                        swallowException(new Exception(t));
                        // Don't evict on error conditions
                        evict = false;
                    }
                    //如果能被回收則銷燬物件
                    if (evict) {
                        destroy(underTest);
                        destroyedByEvictorCount.incrementAndGet();
                    } else {
                        //不能被回收,則是否進行校驗,與借出流程相同,只不過該處只是校驗而沒有借出實際使用
                        if (testWhileIdle) {
                            boolean active = false;
                            try {
                                //物件已經被借出,直接啟用
                                factory.activateObject(underTest);
                                active = true;
                            } catch (final Exception e) {
                                destroy(underTest);
                                destroyedByEvictorCount.incrementAndGet();
                            }
                            if (active) {
                                //啟用成功進行校驗
                                if (!factory.validateObject(underTest)) {
                                    //校驗不通過則銷燬物件
                                    destroy(underTest);
                                    destroyedByEvictorCount.incrementAndGet();
                                } else {
                                    try {
                                        //校驗通過則重新將物件鈍化(解除安裝)
                                        factory.passivateObject(underTest);
                                    } catch (final Exception e) {
                                        destroy(underTest);
                                        destroyedByEvictorCount.incrementAndGet();
                                    }
                                }
                            }
                        }
                        //結束回收測試,更改物件狀態或者新增到空閒佇列,
                        //如果在此途中被借出,還需重新新增到idleObjects,具體實現請看原始碼走讀(一)
                        if (!underTest.endEvictionTest(idleObjects)) {
                            // TODO - May need to add code here once additional
                            // states are used
                        }
                    }
                }
            }
        }
        //配置了回收,則進行回收洩漏連線
        final AbandonedConfig ac = this.abandonedConfig;
        if (ac != null && ac.getRemoveAbandonedOnMaintenance()) {
            removeAbandoned(ac);
        }
    }

返回有多少物件需要進行回收測試

    private int getNumTests() {
        final int numTestsPerEvictionRun = getNumTestsPerEvictionRun();
        if (numTestsPerEvictionRun >= 0) {
            return Math.min(numTestsPerEvictionRun, idleObjects.size());
        }
        return (int) (Math.ceil(idleObjects.size() /
                Math.abs((double) numTestsPerEvictionRun)));
    }