1. 程式人生 > >ThreadPoolExecutor和CyclicBarrier配合使用可能帶來的隱患

ThreadPoolExecutor和CyclicBarrier配合使用可能帶來的隱患

今天是很蛋疼的一天,排查一個bug排查了4個多小時。

情形簡化之後大概是這樣的:

我使用了spring的ThreadPoolTaskExecutor來進行併發時候的非同步處理。並且給任務Runnable加上了CyclicBarrier,以達到讓所有執行緒處理完之後再進行主執行緒的下一步操作的目的。其中executor的配置如下:

	<bean id="coreBlockExecutor"
		class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
		<property name="corePoolSize" value="5" />
		<property name="maxPoolSize" value="2000" />
		<property name="queueCapacity" value="1000" />
	</bean>


結果,bug就這樣華麗麗的出現了:我new出來了1000個runnable,但無論如何也只能執行出5個runnable的結果(即corePoolSize),其他runnable就一直呆在blockingqueue裡面不動彈了。


**************************************************** 華麗的分割線——原因 ****************************************************

花了n久去檢視ThreadPoolTaskExecutor和ThreadPoolExecutor的原始碼。
發現是這樣的,其實ThreadPoolTaskExecutor也就是呼叫了ThreadPoolExecutor,在後者中,有這樣一個方法:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }



可以看到,在當前執行緒池的執行緒數大於等於corePoolSize時候,會判斷

  if (runState == RUNNING && workQueue.offer(command)) 


也就是,當前執行緒池是處於執行狀態並且佇列是否還能插入runnable。
所以,當佇列滿的時候,也就是會去執行下面的程式碼


 else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated


即當前執行緒數如果小於最大執行緒數,則會呼叫addThread方法,將runnbale例項化成一個內部類Worker,加入執行緒池中執行。程式碼如下:

    private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
        if (t != null) {
            w.thread = t;
            workers.add(w);
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
        }
        return t;
    }


而普通情況,即使填入executor的runnable數量不能填滿queue,但在核心執行緒中執行的任務Worker結束之後,過了最大空閒時間(keepAliveTime)之後,即會釋放執行緒,去從queue中獲取等待中的任務。

程式碼如下:


        public void run() {
            try {
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }
    }


    Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }


原來Worker中的run方法是通過輪詢來獲取當前任務是否結束以及從佇列裡面繼續拿任務的。當符合以下其中一個條件時:
可以看到,只有Worker當前的任務已完成的時候,才能去佇列裡面拿其他任務。否則,就只能keep on waiting了



所以原因就出來了:使用了CyclicBarrier進行柵欄式的改裝之後,所有核心執行緒中的任務都會一直等待,不會空閒下來。這樣核心執行緒就永遠處於尷尬的被hold住的狀態了。既不能結束當前的任務,也無法從佇列獲取新的任務,更無法中止執行緒了。


所以,像我這樣沒有仔細看原始碼的碼農,碰到問題就悲劇了。哎




**************************************************** 華麗的分割線——解決方法 ****************************************************

1、允許的情況下,barrier的await設定過期時間
2、仔細考慮queue長度和併發規模

總之,好像也沒什麼特別好的辦法。