1. 程式人生 > >commons-pool2

commons-pool2

名稱 技術分享 對象狀態 介紹 eth locate @override 工廠 2個

  轉載請註明源出處:http://www.cnblogs.com/lighten/p/7375611.html

1.前言

  本章介紹一下常用基礎Jar包commons-pools2,最近使用到了thrift作為rpc服務通訊,但是沒有找到其提供的連接池。百度了一下官方貌似沒有提供,需要自己實現,所以根據網上的實現方法通過使用commons-pool2包來構建自己的thrift連接池服務。完成後,順便研究了一下commons-pool2的實現(怕使用不當有坑),也就有了這篇文章。

2.commons-pool2

  既然名字中有個2也就意味著是第二個版本了,重新命名也就意味著和原版本並不兼容。這是apache提供的一個開源包,在很多地方都會使用,2015年7月之後就不再進行更新了,目前最新版本是2.4.2(最近突然又更新了目前在2.5.0版本)。其準確來說應該是一個對象池,不過是常用在於連接上而已。

  commons-pool2的類並不多,全部42個類如下:

    技術分享圖片技術分享圖片

  包的結構簡單,類不多,而且還有大量的接口和內部類,所以實際上需要關註的類沒幾個,commons-pool2使用起來也就方便了。

2.1 常見配置

  上面也說了,該類是一個基礎包,被很多其它jar包使用,常用於對象池的管理,所以其一些配置,在寫代碼的時候也會經常接觸到類似的,可能是封裝過的,也可能是版本1的,不過大同小異,主要是思路。了解了配置的具體作用,再結合代碼就能夠了解這類jar包是如何寫的了。

  通用的配置都在GenericObjectPoolConfig類中:

    maxTotal:對象池中最多允許的對象數,默認8(可能超過,不過超過後使用完了就會銷毀,後面源碼會介紹相關機制)

    maxIdle:對象池中最多允許存在的空閑對象,默認8

    minIdle:池中最少要保留的對象數,默認0

    lifo:是否使用FIFO先進先出的模式獲取對象(空閑對象都在一個隊列中),默認為true使用先進先出,false是先進後出

    fairness:是否使用公平鎖,默認false(公平鎖是線程安全中的概念,true的含義是誰先等待獲取鎖,隨先在鎖釋放的時候獲取鎖,如非必要,一般不使用公平鎖,會影響性能)

    maxWaitMillis:從池中獲取一個對象最長的等待時間,默認-1,含義是無限等,超過這個時間還未獲取空閑對象,就會拋出異常。

    minEvictableIdleTimeMillis:最小的驅逐時間,單位毫秒,默認30分鐘。這個用於驅逐線程,對象空閑時間超過這個時間,意味著此時系統不忙碌,會減少對象數量。

    evictorShutdownTimeoutMillis:驅逐線程關閉的超時時間,默認10秒。

    softMinEvictableIdleTimeMillis:也是最小的驅逐時間,但是會和另一個指標minIdle一同使用,滿足空閑時間超過這個設置,且當前空閑數量比設置的minIdle要大,會銷毀該對象。所以,通常該值設置的比minEvictableIdleTimeMillis要小。

    numTestsPerEvictionRun:驅逐線程運行每次測試的對象數量,默認3個。驅逐線程就是用來檢查對象空閑狀態,通過設置的對象數量等參數,保持對象的活躍度和數量,其是一個定時任務,每次不是檢查所有的對象,而是抽查幾個,這個就是用於抽查。

    evictionPolicyClassName:驅逐線程使用的策略類名,之前的minEvictableIdleTimeMillis和softMinEvictableIdleTimeMillis就是默認策略DefaultEvictionPolicy的實現,可以自己實現策略。

    testOnCreate:在創建對象的時候是否檢測對象,默認false。後續會結合代碼說明是如何檢測的。

    testOnBorrow:在獲取空閑對象的時候是否檢測對象是否有效,默認false。這個通常會設置成true,一般希望獲取一個可用有效的對象吧。

    testOnReturn:在使用完對象放回池中時是否檢測對象是否仍有效,默認false。

    testWhileIdle:在空閑的時候是否檢測對象是否有效,這個發生在驅逐線程執行時。

    timeBetweenEvictionRunsMillis:驅逐線程的執行周期,上面說過該線程是個定時任務。默認-1,即不開啟驅逐線程,所以與之相關的參數是沒有作用的。

    blockWhenExhausted:在對象池耗盡時是否阻塞,默認true。false的話超時就沒有作用了。

    jmxEnabled:是否允許jmx的方式創建一個配置實例,默認true。

    jmxNamePrefix:jmx默認的前綴名,默認為pool

    jmxNameBase:jmx默認的base name,默認為null,意味著池提供一個名稱。

2.2 基本實現

  上面的配置已經說明了一些內容了,此節介紹對象池的一個基礎實現思路。

  首先作為一個對象池,我們需要從池中借對象,借完了要還,還要能創建對象存入池中,校驗對象是否還能使用。這個就是一個對象池的基本定義了:

  技術分享圖片

  commons-pool2還提供了一種控制細粒度更高的對象池KeyedObjectPool<K,V>。其根據關鍵字來維護不同的池,在某些場景十分有用,這裏不對其做詳細介紹,弄明白了一般的線程池,對這種池擴展也就有了思路。從接口到抽象類到具體實現類池經歷了下面幾個類:ObjectPool->BaseObjectPool->SoftReferenceObjectPool,這種用的比較少,看名次也知道是軟引用的池,另一種是BaseGenericObjectPool->GenericObjectPool。通常我們會繼承GenericObjectPool來設計自己的對象池。

  有了池之後,我們需要一個創建池對象的類,這個就是工廠類:PooledObjectFactory。其要提供一個對象的生命周期的各個操作,包括創建、銷毀、校驗有效性、激活和鈍化對象。

技術分享圖片

  同樣,其提供了一個抽象類BasePooledObjectFactory,這個就沒有具體的實現類了,因為涉及到不同對象,不同對象的管理方法不同,不好抽象。我們需要做的就是繼承BasePooledObjectFactory對象,實現其未實現或者空實現的方法了,即上述截圖的方法。

  有了池來管理對象使用,有了工廠來管理對象的生命周期,一般而言也就夠了。但是還有一個重要的環節就是將池與工廠連接起來的對象的定義,所以要進行抽象。這就是PooledObject的作用了,其定義的一系列方法,將我們的對象和池以及工廠,通過這個接口關聯了起來。實現類是DefaultPooledObject,通常使用這個就夠了,不需要擴展。如果業務上對對象有更細致的控制,可以繼承或者直接自己實現PooledObject。

  到此也就剩下一個驅逐線程沒介紹了,其維持著池的健康,或者說是活力。以上就是一個對象池的基本內容:池本身,創建對象的工廠,清潔池的驅逐線程,關聯池和工廠和自己創建的對象的池對象規範。這樣也就沒幾個類沒介紹過了,剩下的就是UsageTracking,看名稱也能知道是幹啥的了,後面介紹詳細流程的時候會順帶一提,用的不多。

2.3 具體實現

  下面結合具體代碼構建一個對象池,來說明該池的是怎麽工作的。首先該對象的基本定義如下:需要一個對象類型Student,需要一個連接池管理對象CommonObjectPool,需要一個制造對象的工廠StudentFactory。其大體代碼如下:

  

public class Student {

    private String name;
    private int age;

    public Student(String name, int age) {
        this.name = name;
        this.age = age;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "Student{" +
                "name=‘" + name + ‘\‘‘ +
                ", age=" + age +
                ‘}‘;
    }
}



public class StudentFactory extends BasePooledObjectFactory<Student> {

    private Random random = new Random();

    public Student create() throws Exception {

        int age = random.nextInt(100);
        Student student = new Student("commons", age);
        System.out.println("創建對象:" + student);
        return student;
    }

    public PooledObject<Student> wrap(Student obj) {
        return new DefaultPooledObject<Student>(obj);
    }

    @Override
    public void destroyObject(PooledObject<Student> p) throws Exception {
        System.out.println("銷毀對象:" + p.getObject());
        super.destroyObject(p);
    }

    @Override
    public boolean validateObject(PooledObject<Student> p) {
        System.out.println("校驗對象是否可用:" + p.getObject());
        return super.validateObject(p);
    }

    @Override
    public void activateObject(PooledObject<Student> p) throws Exception {
        System.out.println("激活鈍化的對象系列操作:" + p.getObject());
        super.activateObject(p);
    }

    @Override
    public void passivateObject(PooledObject<Student> p) throws Exception {
        System.out.println("鈍化未使用的對象:" + p.getObject());
        super.passivateObject(p);
    }
}


public class CommonObjectPool extends GenericObjectPool<Student> {

    public CommonObjectPool(PooledObjectFactory<Student> factory, GenericObjectPoolConfig config, AbandonedConfig abandonedConfig) {
        super(factory, config, abandonedConfig);
    }
}

  使用方法如下:

public class Test {

    public static void main(String[] args) {
        StudentFactory studentFactory = new StudentFactory();
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        AbandonedConfig abandonedConfig = new AbandonedConfig();
        CommonObjectPool pool = new CommonObjectPool(studentFactory, config, abandonedConfig);

        Student student = null;
        try {
            student = pool.borrowObject();
            System.out.println(student);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(student != null) pool.returnObject(student);
        }
    }
}

  下面結合源碼和例子講解執行過程,先是通過對象工廠類和配置初始化了一個pool,pool的初始化操作代碼如下:

public GenericObjectPool(final PooledObjectFactory<T> factory,
            final GenericObjectPoolConfig config) {

        super(config, ONAME_BASE, config.getJmxNamePrefix());

        if (factory == null) {
            jmxUnregister(); // tidy up
            throw new IllegalArgumentException("factory may not be null");
        }
        this.factory = factory;

        idleObjects = new LinkedBlockingDeque<>(config.getFairness());

        setConfig(config);

        startEvictor(getTimeBetweenEvictionRunsMillis());
 }

  主要的工作就是設置工廠類,配置,開啟驅逐線程。下面先介紹驅逐線程的工作機制:

final void startEvictor(final long delay) {
        synchronized (evictionLock) {
            if (null != evictor) {
                EvictionTimer.cancel(evictor, evictorShutdownTimeoutMillis, TimeUnit.MILLISECONDS);
                evictor = null;
                evictionIterator = null;
            }
            if (delay > 0) {
                evictor = new Evictor();
                EvictionTimer.schedule(evictor, delay, delay);
            }
        }
}

  如果設置過了就會關閉,不然要delay大於0才會開啟該線程,該值就是config中的getTimeBetweenEvictionRunsMillis。開啟方式就是通過EvictionTimer的周期任務,這實際上就是一個Timer定時器。該定時器做的工作如下:

  

    public void run() {
            final ClassLoader savedClassLoader =
                    Thread.currentThread().getContextClassLoader();
            try {
                if (factoryClassLoader != null) {
                    // Set the class loader for the factory
                    final ClassLoader cl = factoryClassLoader.get();
                    if (cl == null) {
                        // The pool has been dereferenced and the class loader
                        // GC‘d. Cancel this timer so the pool can be GC‘d as
                        // well.
                        cancel();
                        return;
                    }
                    Thread.currentThread().setContextClassLoader(cl);
                }

                // Evict from the pool
                try {
                    evict();
                } catch(final Exception e) {
                    swallowException(e);
                } catch(final OutOfMemoryError oome) {
                    // Log problem but give evictor thread a chance to continue
                    // in case error is recoverable
                    oome.printStackTrace(System.err);
                }
                // Re-create idle instances.
                try {
                    ensureMinIdle();
                } catch (final Exception e) {
                    swallowException(e);
                }
            } finally {
                // Restore the previous CCL
                Thread.currentThread().setContextClassLoader(savedClassLoader);
            }
        }

  可以看出,先進行了驅逐,再判斷是否小於minIdle的設置,小於就會再次創建對象。

  

private void ensureIdle(final int idleCount, final boolean always) throws Exception {
        if (idleCount < 1 || isClosed() || (!always && !idleObjects.hasTakeWaiters())) {
            return;
        }

        while (idleObjects.size() < idleCount) {
            final PooledObject<T> p = create();
            if (p == null) {
                // Can‘t create objects, no reason to think another call to
                // create will work. Give up.
                break;
            }
            if (getLifo()) {
                idleObjects.addFirst(p);
            } else {
                idleObjects.addLast(p);
            }
        }
        if (isClosed()) {
            // Pool closed while object was being added to idle objects.
            // Make sure the returned object is destroyed rather than left
            // in the idle object pool (which would effectively be a leak)
            clear();
        }
}

  就是調用create方法創建,根據lifo的參數決定是先入先出還是後入先出。evict方法主要做了如下操作:

    public void evict() throws Exception {
        assertOpen();

        if (idleObjects.size() > 0) {

            PooledObject<T> underTest = null;
            final EvictionPolicy<T> evictionPolicy = getEvictionPolicy();

            synchronized (evictionLock) {
                final EvictionConfig evictionConfig = new EvictionConfig(
                        getMinEvictableIdleTimeMillis(),
                        getSoftMinEvictableIdleTimeMillis(),
                        getMinIdle());

                final boolean testWhileIdle = getTestWhileIdle();

                for (int i = 0, m = getNumTests(); i < m; i++) {
                    if (evictionIterator == null || !evictionIterator.hasNext()) {
                        evictionIterator = new EvictionIterator(idleObjects);
                    }
                    if (!evictionIterator.hasNext()) {
                        // Pool exhausted, nothing to do here
                        return;
                    }

                    try {
                        underTest = evictionIterator.next();
                    } catch (final NoSuchElementException nsee) {
                        // Object was borrowed in another thread
                        // Don‘t count this as an eviction test so reduce i;
                        i--;
                        evictionIterator = null;
                        continue;
                    }

                    if (!underTest.startEvictionTest()) {
                        // Object was borrowed in another thread
                        // Don‘t count this as an eviction test so reduce i;
                        i--;
                        continue;
                    }

                    // User provided eviction policy could throw all sorts of
                    // crazy exceptions. Protect against such an exception
                    // killing the eviction thread.
                    boolean evict;
                    try {
                        evict = evictionPolicy.evict(evictionConfig, underTest,
                                idleObjects.size());
                    } catch (final Throwable t) {
                        // Slightly convoluted as SwallowedExceptionListener
                        // uses Exception rather than Throwable
                        PoolUtils.checkRethrow(t);
                        swallowException(new Exception(t));
                        // Don‘t evict on error conditions
                        evict = false;
                    }

                    if (evict) {
                        destroy(underTest);
                        destroyedByEvictorCount.incrementAndGet();
                    } else {
                        if (testWhileIdle) {
                            boolean active = false;
                            try {
                                factory.activateObject(underTest);
                                active = true;
                            } catch (final Exception e) {
                                destroy(underTest);
                                destroyedByEvictorCount.incrementAndGet();
                            }
                            if (active) {
                                if (!factory.validateObject(underTest)) {
                                    destroy(underTest);
                                    destroyedByEvictorCount.incrementAndGet();
                                } else {
                                    try {
                                        factory.passivateObject(underTest);
                                    } catch (final Exception e) {
                                        destroy(underTest);
                                        destroyedByEvictorCount.incrementAndGet();
                                    }
                                }
                            }
                        }
                        if (!underTest.endEvictionTest(idleObjects)) {
                            // TODO - May need to add code here once additional
                            // states are used
                        }
                    }
                }
            }
        }
        final AbandonedConfig ac = this.abandonedConfig;
        if (ac != null && ac.getRemoveAbandonedOnMaintenance()) {
            removeAbandoned(ac);
        }
    }

  先是判斷池是開啟狀態,且空閑對象要大於0,不然不需要驅逐。然後循環了設置的numTests的次數,一次驅逐就檢查這麽多個對象。後面一段是並發被幹擾的一些操作,主要是保證被幹擾後仍檢查這麽些對象。最後就是根據驅逐策略來驅逐對象。上面配置項說過是怎麽回事,具體見DefaultEvictionPolicy。如果判斷是驅逐,就調用destory方法銷毀對象。否則,判斷testWhileIdle配置項,決定是否校驗對象是否仍可用,先激活對象activateObject,有異常直接銷毀。否則開始校驗對象的可用性,validateObject。失敗銷毀,成功就鈍化變成原樣子。鈍化失敗也直接銷毀。最後是一個遺棄對象的設置,就是說有些對象借出去了由於種種原因,比如寫法上的問題,導致對象很久沒有還回來,這個設置就是用於清理這類對象的。這類對象不再被池借出,但又暫用了資源。一般而言該配置很少用到,因為寫方式通常都將return操作放在finally模塊,不會出現此類情況。

  最後我們看下借對象和還對象都做了哪些操作吧。

    public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
        assertOpen();

        final AbandonedConfig ac = this.abandonedConfig;
        if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
                (getNumIdle() < 2) &&
                (getNumActive() > getMaxTotal() - 3) ) {
            removeAbandoned(ac);
        }

        PooledObject<T> p = null;

        // Get local copy of current config so it is consistent for entire
        // method execution
        final boolean blockWhenExhausted = getBlockWhenExhausted();

        boolean create;
        final long waitTime = System.currentTimeMillis();

        while (p == null) {
            create = false;
            p = idleObjects.pollFirst();
            if (p == null) {
                p = create();
                if (p != null) {
                    create = true;
                }
            }
            if (blockWhenExhausted) {
                if (p == null) {
                    if (borrowMaxWaitMillis < 0) {
                        p = idleObjects.takeFirst();
                    } else {
                        p = idleObjects.pollFirst(borrowMaxWaitMillis,
                                TimeUnit.MILLISECONDS);
                    }
                }
                if (p == null) {
                    throw new NoSuchElementException(
                            "Timeout waiting for idle object");
                }
            } else {
                if (p == null) {
                    throw new NoSuchElementException("Pool exhausted");
                }
            }
            if (!p.allocate()) {
                p = null;
            }

            if (p != null) {
                try {
                    factory.activateObject(p);
                } catch (final Exception e) {
                    try {
                        destroy(p);
                    } catch (final Exception e1) {
                        // Ignore - activation failure is more important
                    }
                    p = null;
                    if (create) {
                        final NoSuchElementException nsee = new NoSuchElementException(
                                "Unable to activate object");
                        nsee.initCause(e);
                        throw nsee;
                    }
                }
                if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {
                    boolean validate = false;
                    Throwable validationThrowable = null;
                    try {
                        validate = factory.validateObject(p);
                    } catch (final Throwable t) {
                        PoolUtils.checkRethrow(t);
                        validationThrowable = t;
                    }
                    if (!validate) {
                        try {
                            destroy(p);
                            destroyedByBorrowValidationCount.incrementAndGet();
                        } catch (final Exception e) {
                            // Ignore - validation failure is more important
                        }
                        p = null;
                        if (create) {
                            final NoSuchElementException nsee = new NoSuchElementException(
                                    "Unable to validate object");
                            nsee.initCause(validationThrowable);
                            throw nsee;
                        }
                    }
                }
            }
        }

        updateStatsBorrow(p, System.currentTimeMillis() - waitTime);

        return p.getObject();
    }

  借的操作步驟如下:先確定池是否開啟,再根據條件決定是否移除遺棄的對象。開始獲取對象:1.從idle中獲取一個,沒獲取到就創建一個,創建的邏輯涉及參數maxTotal,超過這個值不會創建對象,返回null,maxTotal為-1意為創建的數量為無限(整數最大)。2.創建失敗,線程阻塞,等待時間為-1就一直等待,不為-1等到指定時間還沒等到,就拋出異常。3.不等待直接會在沒獲取對象的時候直接拋出異常。4.對象狀態不對,沒有鎖定,置為null。5.上述都沒問題,獲取對象後開始激活對象,失敗銷毀對象。成功後判斷是否borrow和create的時候要校驗對象可用性,需要進行校驗,校驗失敗銷毀。上訴是一個while(p==null)的循環,所以borrow的結果只有2種,1是借不到對象超時,2是借到對象。其他就是等待獲取空閑對象。

  還對象的邏輯也不難:

    public void returnObject(final T obj) {
        final PooledObject<T> p = allObjects.get(new IdentityWrapper<>(obj));

        if (p == null) {
            if (!isAbandonedConfig()) {
                throw new IllegalStateException(
                        "Returned object not currently part of this pool");
            }
            return; // Object was abandoned and removed
        }

        synchronized(p) {
            final PooledObjectState state = p.getState();
            if (state != PooledObjectState.ALLOCATED) {
                throw new IllegalStateException(
                        "Object has already been returned to this pool or is invalid");
            }
            p.markReturning(); // Keep from being marked abandoned
        }

        final long activeTime = p.getActiveTimeMillis();

        if (getTestOnReturn()) {
            if (!factory.validateObject(p)) {
                try {
                    destroy(p);
                } catch (final Exception e) {
                    swallowException(e);
                }
                try {
                    ensureIdle(1, false);
                } catch (final Exception e) {
                    swallowException(e);
                }
                updateStatsReturn(activeTime);
                return;
            }
        }

        try {
            factory.passivateObject(p);
        } catch (final Exception e1) {
            swallowException(e1);
            try {
                destroy(p);
            } catch (final Exception e) {
                swallowException(e);
            }
            try {
                ensureIdle(1, false);
            } catch (final Exception e) {
                swallowException(e);
            }
            updateStatsReturn(activeTime);
            return;
        }

        if (!p.deallocate()) {
            throw new IllegalStateException(
                    "Object has already been returned to this pool or is invalid");
        }

        final int maxIdleSave = getMaxIdle();
        if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
            try {
                destroy(p);
            } catch (final Exception e) {
                swallowException(e);
            }
        } else {
            if (getLifo()) {
                idleObjects.addFirst(p);
            } else {
                idleObjects.addLast(p);
            }
            if (isClosed()) {
                // Pool closed while object was being added to idle objects.
                // Make sure the returned object is destroyed rather than left
                // in the idle object pool (which would effectively be a leak)
                clear();
            }
        }
        updateStatsReturn(activeTime);
    }

  更新對象狀態,判斷還的時候是否要校驗對象可用性,不可用銷毀。之後鈍化對象,鈍化失敗銷毀,超過maxIdle也直接銷毀。最後根據lifo來確定放回方式。因為涉及銷毀對象,所以都要進行確定minidle來決定是否補充對象。

3.結束語

  commons-pool2的主要邏輯就是上述內容了,代碼例子也給了一個。這裏總結一下對象的一個生命周期:create->activate->invalidate->borrow->invalidate->return->destory。其中validate階段發生在各個環節,主要通過TestOnXXX進行配置決定。

commons-pool2