1. 程式人生 > >線程池定制初探

線程池定制初探

優雅 des ali 線程池 pri get cau actor nano

背景

? 我在的公司雖然是移動支付領域的公司。但是我做的業務類似於管理系統,所以一開始寫代碼的時候沒有考慮到數據的量的問題。開始有一個統計頁面,大概要統計的數據分為十多個維度,然後每個維度需要考慮十個左右的方面。也就是統計頁面輕輕地點擊一個查詢按鈕,要進行100次左右的數據庫查詢。開始數據量小的時候,查詢還能夠使用,頁面不會超時。到後面數據量越來越大,最大的一張表數據量已經超過1億。這時候悲催的事情發生了--- 頁面點擊查詢直接未響應.....


方案考慮

? 其實當時的方案我想了兩種:

  1. 優化業務實現邏輯,其實在查詢的時候一個表的多個維度的查詢可以傳一個維度列表進去。查出來結果之後,後臺在進行分組計算。
  2. 采用多線程,對每一個維度采用一個線程去執行。這樣其實每個線程的查詢次數在10次左右,總的時間差不多可以縮短10倍。

? 當然,當時我知道最好的方案是混合使用1和2。但當時1的實現存在以下問題,最終讓我決定選擇方案2:

  1. 因為每個維度涉及多張表, 不同的表歸屬於不同的模塊。如果某張表的查詢條件不支持維度列表,那麽需要提需求給對應模塊開發...... (何年何月能完)
  2. 方案1的改動涉及代碼變動較多,且不好封裝每個線程任務,寫出來的代碼邏輯有點繞,後臺存在大量的重新分維度統計的代碼。簡而言之就是不優雅

實現考慮

? 既然最終選定方案2的話,那麽自然考慮到選擇線程池。那麽選啥線程池呢?Single?Schedule肯定不用想直接PASS。Cached?其實當前來說是可行的,因為當前線上的維度也就十多個。以Cached線程池的特性,只要同時並發的線程數量不至於太大,也不至於給系統太大壓力導致系統癱瘓。但是因為維度會隨著業務的增長而越來越多,如果後續維度增加到20甚至30,那麽對系統的壓力就無法預估了。

? 思前想後,我最終決定選擇Fix線程池,將線程池固化大小為10個。但這時候我又想,其實統計頁面一天查詢的次數並不多。可能就每天早上點擊查詢一次,後面可能就不再點查詢。那麽這時候又出現了兩種蛋疼的選擇:

  1. 直接在查詢的方法內部初始化線程池
  2. 在類的屬性中初始化線程池

? 第1種方案的話,每次查詢都要重新初始化線程池,造成很大的資源消耗。如果連續查詢多次,並不會後面比前面快,反而可能由於不停的線程池銷毀創建導致越來越慢。最終我選擇了第2種方案,當然我並沒有選擇餓漢模式直接初始化,而是選擇了懶漢模式在方法中進行線程池初始化,且通過鎖保證只初始化一次。


想法嘗試

? 到這裏你如果覺得我的定制初探就完了,那你就too young too naive。我不追求可行,只追求完美~

這時候我就在想,其實根據用戶的操作習慣,統計頁面的查詢按鈕,要麽就隔著幾個小時不按,要麽就可能一時心血來潮,連續查詢幾天或者同一天分多個維度查詢多次。而大家都知道Fix線程池固化了線程池的大小,即使後面連續幾個小時沒有任務來,仍然會一直保持著初始大小的線程數。那麽能不能實現即能夠控制線程數量Fix,又可以在空閑的時候銷毀核心線程呢?答案當然是有的,關鍵點在於:ThreadPoolExecutor的allowCoreThreadTimeOut方法

/**
     * Sets the policy governing whether core threads may time out and
     * terminate if no tasks arrive within the keep-alive time, being
     * replaced if needed when new tasks arrive. When false, core
     * threads are never terminated due to lack of incoming
     * tasks. When true, the same keep-alive policy applying to
     * non-core threads applies also to core threads. To avoid
     * continual thread replacement, the keep-alive time must be
     * greater than zero when setting <tt>true</tt>. This method
     * should in general be called before the pool is actively used.
     * @param value <tt>true</tt> if should time out, else <tt>false</tt>
     * @throws IllegalArgumentException if value is <tt>true</tt>
     * and the current keep-alive time is not greater than zero.
     *
     * @since 1.6
     */
    public void allowCoreThreadTimeOut(boolean value) {
        if (value && keepAliveTime <= 0)
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");

        allowCoreThreadTimeOut = value;
    }

? 從源碼的註釋來看,該方法可以支持線程池的keep-alive time的設置同時對核心線程和非核心線程生效。具體為啥,後面我分析線程池源碼的時候會講到,現在我們只需要看看用到該處的源碼(在ThreadPoolExecutor的getTask方法中):

/**
     * Gets the next task for a worker thread to run.  The general
     * approach is similar to execute() in that worker threads trying
     * to get a task to run do so on the basis of prevailing state
     * accessed outside of locks.  This may cause them to choose the
     * "wrong" action, such as trying to exit because no tasks
     * appear to be available, or entering a take when the pool is in
     * the process of being shut down.  These potential problems are
     * countered by (1) rechecking pool state (in workerCanExit)
     * before giving up, and (2) interrupting other workers upon
     * shutdown, so they can recheck state. All other user-based state
     * changes (to allowCoreThreadTimeOut etc) are OK even when
     * performed asynchronously wrt getTask.
     *
     * @return the task
     */
    Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }

? 關鍵在於workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),該方法表示嘗試從等待隊列中獲取任務,如果超過keepAlive time,則直接返回null。如果返回null的話,work線程就會被終止。

? 好了這些都是後話,在我看了線程池的源碼之後才能夠清楚地知道為啥這個參數有這個作用。那麽在之前,我是怎麽測試驗證我的想法的呢?其實很簡單:

  1. 我先參照線程池的默認的DefaultThreadFactory定義自己的線程工廠,目的是為了獲取線程工廠內的ThreadGroup屬性,因為ThreadGroup類有一個activeCount方法,該方法可以獲取線程組內活躍的線程個數。
class MyThreadFactory implements ThreadFactory {
        static final AtomicInteger poolNumber = new AtomicInteger(1);
        final ThreadGroup group;
        final AtomicInteger threadNumber = new AtomicInteger(1);
        final String namePrefix;

        MyThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
        
        // 我所增加的方法,為了獲取線程組
        public ThreadGroup getThreadGroup() {
            return this.group;
        }
    }
  1. 萬事俱備,只欠東風了,我只需要構造兩種不同的情況驗證我的猜想即可!
MyThreadFactory myThreadFactory = new MyThreadFactory();
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS,
                                new LinkedBlockingQueue<Runnable>(), myThreadFactory); 
// executor.allowCoreThreadTimeOut(true);
for (int i = 0; i <= 20; i++) {
  executor.submit(new MyRunnable());
}
System.out.println(myThreadFactory.getThreadGroup().activeCount());     // 6 
Thread.sleep(20000);
System.out.println("After destroy, active thread count:" + myThreadFactory.getThreadGroup().activeCount());             // 6/1
executor.shutdown();

? 運行的結果:

  1. 如果不執行executor.allowCoreThreadTimeOut(true);兩個activeCount的結果都是6
  2. 如果執行executor.allowCoreThreadTimeOut(true);第一個activeCount的結果為6,第二個activeCount的結果為1

最終實現

? 好了,終於到最終定制實現了。我的代碼實現如下(類為Spring管理的類,最終線程池shutdown在PreDestroy的時候):

    private volatile ThreadPoolExecutor searchExecutors; 
    
    private final Object lock = new Object();
    
    /**
     * 初始化線程池,開始不進行初始化,免得浪費系統資源
     */
    private void initExecutor() {
        if (searchExecutors != null) {
            return;
        }
        
        synchronized (lock) {
            if (searchExecutors == null) {
                 // 設置一個固定大小為10,核心線程如果超過10分鐘空閑也可銷毀的線程池
                ThreadPoolExecutor tempExecutor = new ThreadPoolExecutor(10, 10, 10, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory());
                tempExecutor.allowCoreThreadTimeOut(true);
                this.searchExecutors = tempExecutor;
            }
        }
    }

    @PreDestroy
    public void destroy() {
        if (searchExecutors != null) {
            searchExecutors.shutdown();
        }
    }

這裏再說兩點

  1. 這個初始化方法采用了double-check-lock的方式,來保證多線程並發獲取到的是同一個線程池實例
  2. 註意到在設置屬性searchExecutors之前借助了一個tempExecutor。這樣也是為了防止ThreadPoolExecutor對象已經被初始化,但是allowCoreThreadTimeOut還未被執行的問題。(對象過早逃逸導致屬性與預期不符)。

總結

? 通過這次線程池定制初探,發現其實看起來再沒有技術含量的工作,如果細細想下去還是會有很多可以深入研究的東西。而做軟件其實也要像做藝術品一樣,多考慮不同的實現可能,盡量選擇最完美的解決方案。

線程池定制初探