1. 程式人生 > >jdk自帶執行緒池詳解

jdk自帶執行緒池詳解

一、前言

在最近做的一個專案中,需要大量的使用到多執行緒和執行緒池,下面就java自帶的執行緒池和大家一起分享。

二、簡介

多執行緒技術主要解決處理器單元內多個執行緒執行的問題,它可以顯著減少處理器單元的閒置時間,增加處理器單元的吞吐能力,但頻繁的建立執行緒的開銷是很大的,那麼如何來減少這部分的開銷了,那麼就要考慮使用執行緒池了。執行緒池就是一個執行緒的容器,每次只執行額定數量的執行緒,執行緒池就是用來管理這些額定數量的執行緒。

三、涉及執行緒池的類結構圖


其中供我們使用的,主要是ThreadPoolExecutor類。

四、如何建立執行緒池

我們建立執行緒池一般有以下幾種方法:

1、使用Executors工廠類

Executors主要提供了下面幾種建立執行緒池的方法:


下面來看下使用示例:

1)newFixedThreadPool(固定大小的執行緒池)

public class FixedThreadPool {
	public static void main(String[] args) {
		ExecutorService pool = Executors.newFixedThreadPool(5);// 建立一個固定大小為5的執行緒池
		for (int i = 0; i < 10; i++) {
			pool.submit(new MyThread());
		}
		pool.shutdown();
	}
}
public class MyThread extends Thread {
	@Override
	public void run() {
		System.out.println(Thread.currentThread().getName() + "正在執行。。。");
	}
}

測試結果如下:

pool-1-thread-1正在執行。。。
pool-1-thread-2正在執行。。。
pool-1-thread-3正在執行。。。
pool-1-thread-2正在執行。。。
pool-1-thread-3正在執行。。。
pool-1-thread-2正在執行。。。
pool-1-thread-2正在執行。。。
pool-1-thread-3正在執行。。。
pool-1-thread-5正在執行。。。
pool-1-thread-4正在執行。。。

固定大小的執行緒池:每次提交一個任務就建立一個執行緒,直到執行緒達到執行緒池的最大大小。執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒線。

2)newSingleThreadExecutor(單執行緒的執行緒池)

public class SingleThreadPool {
	public static void main(String[] args) {  
        ExecutorService pool=Executors.newSingleThreadExecutor();//建立一個單執行緒池  
        for(int i=0;i<100;i++){  
            pool.submit(new MyThread());  
        }  
        pool.shutdown();  
    }
}

測試結果如下:

pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。
pool-1-thread-1正在執行。。。

單執行緒的執行緒池:這個執行緒池只有一個執行緒在工作,也就是相當於單執行緒序列執行所有任務。如果這個唯一的執行緒因為異常結束,那麼會有一個新的執行緒來替代它。此執行緒池保證所有任務的執行順序按照任務的提交順序執行。

3)newScheduledThreadPool

public class ScheduledThreadPool {
	public static void main(String[] args) {  
        ScheduledExecutorService pool=Executors.newScheduledThreadPool(6);  
        for(int i=0;i<10000;i++){  
            pool.submit(new MyThread());  
        }  
          
        pool.schedule(new MyThread(), 1000, TimeUnit.MILLISECONDS);  
        pool.schedule(new MyThread(), 1000, TimeUnit.MILLISECONDS);  
        pool.shutdown();  
    }  
}

測試結果如下:

pool-1-thread-1正在執行。。。
pool-1-thread-6正在執行。。。
pool-1-thread-5正在執行。。。
pool-1-thread-4正在執行。。。
pool-1-thread-2正在執行。。。
pool-1-thread-3正在執行。。。
pool-1-thread-4正在執行。。。
pool-1-thread-5正在執行。。。
pool-1-thread-6正在執行。。。
pool-1-thread-1正在執行。。。
…………此處會延時1S…………
pool-1-thread-4正在執行。。。
pool-1-thread-1正在執行。。。
測試結果的最後兩個執行緒都是在延時1S之後,才開始執行的。此執行緒池支援定時以及週期性執行任務的需求

4)newCachedThreadPool(可快取的執行緒池)

public class CachedThreadPool {
	public static void main(String[] args) {  
        ExecutorService pool=Executors.newCachedThreadPool();  
        for(int i=0;i<100;i++){  
            pool.submit(new MyThread());  
        }  
        pool.shutdown();  
    }  
}

測試結果如下:

pool-1-thread-5正在執行。。。
pool-1-thread-7正在執行。。。
pool-1-thread-5正在執行。。。
pool-1-thread-16正在執行。。。
pool-1-thread-17正在執行。。。
pool-1-thread-16正在執行。。。
pool-1-thread-5正在執行。。。
pool-1-thread-7正在執行。。。
pool-1-thread-16正在執行。。。
pool-1-thread-18正在執行。。。
pool-1-thread-10正在執行。。。
可快取的執行緒池:如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小。

官方建議程式設計師使用較為方便的Executors工廠方法Executors.newCachedThreadPool()(無界執行緒池,可以進行自動執行緒回收)、Executors.newFixedThreadPool(int)(固定大小執行緒池)Executors.newSingleThreadExecutor()(單個後臺執行緒),這幾種執行緒池均為大多數使用場景預定義了預設配置。

2、繼承ThreadPoolExecutor類,並複寫父類的構造方法。

在介紹這種方式之前,我們來分析下前面幾個建立執行緒池的底層程式碼是怎樣的?

public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
}

Executors工廠類的底層程式碼可以看出,工廠類提供的建立執行緒池的方法,其實都是通過構造ThreadPoolExecutor來實現的。ThreadPoolExecutor構造方法程式碼如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
那麼接下來,我們就來談談這個ThreadPoolExecutor構造方法。在這個構造方法中,主要有以下幾個引數:

corePoolSize--池中所儲存的執行緒數,包括空閒執行緒。

maximumPoolSize--池中允許的最大執行緒數。

keepAliveTime--當執行緒數大於corePoolSize時,此為終止空閒執行緒等待新任務的最長時間。

Unit--keepAliveTime 引數的時間單位。

workQueue--執行前用於保持任務的佇列。此佇列僅保持由 execute方法提交的 Runnable任務。

threadFactory--執行程式建立新執行緒時使用的工廠。

Handler--由於超出執行緒範圍和佇列容量而使執行被阻塞時所使用的處理程式。

接下來,咋們來說下這幾個引數之間的關係。當執行緒池剛建立的時候,執行緒池裡面是沒有任何執行緒的(注意,並不是執行緒池一建立,裡面就建立了一定數量的執行緒),當呼叫execute()方法新增一個任務時,執行緒池會做如下的判斷:

1)如果當前正在執行的執行緒數量小於corePoolSize,那麼立刻建立一個新的執行緒,執行這個任務。

2)如果當前正在執行的執行緒數量大於或等於corePoolSize,那麼這個任務將會放入佇列中。

3)如果執行緒池的佇列已經滿了,但是正在執行的執行緒數量小於maximumPoolSize,那麼還是會建立新的執行緒,執行這個任務。

4)如果佇列已經滿了,且當前正在執行的執行緒數量大於或等於maximumPoolSize,那麼執行緒池會根據拒絕執行策略來處理當前的任務。

5)當一個任務執行完後,執行緒會從佇列中取下一個任務來執行,如果佇列中沒有需要執行的任務,那麼這個執行緒就會處於空閒狀態,如果超過了keepAliveTime存活時間,則這個執行緒會被執行緒池回收(注:回收執行緒是有條件的,如果當前執行的執行緒數量大於corePoolSize的話,這個執行緒就會被銷燬,如果不大於corePoolSize,是不會銷燬這個執行緒的,執行緒的數量必須保持在corePoolSize數量內).為什麼不是執行緒一空閒就回收,而是需要等到超過keepAliveTime才進行執行緒的回收了,原因很簡單:因為執行緒的建立和銷燬消耗很大,更不能頻繁的進行建立和銷燬,當超過keepAliveTime後,發現確實用不到這個執行緒了,才會進行銷燬。這其中unit表示keepAliveTime的時間單位,unit的定義如下:

public enum TimeUnit {
    NANOSECONDS {
        // keepAliveTime以納秒為單位
    },
    MICROSECONDS {
       // keepAliveTime以微秒為單位
    },
    MILLISECONDS {
       // keepAliveTime以毫秒為單位
    },
    SECONDS {
        // keepAliveTime以秒為單位
    },
    MINUTES {
       // keepAliveTime以分鐘為單位
    },
    HOURS {
       // keepAliveTime以小時為單位
    },
    DAYS {
        // keepAliveTime以天為單位
    };

下面從原始碼來分析一下,對於上面的幾種情況,主要涉及到的原始碼有以下幾塊:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
}

其實,這段程式碼很簡單,主要描述的就是,如果當前的執行緒池小於corePoolSize的時候,是直接新建一個執行緒來處理任務。
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < maximumPoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
}

上面這段程式碼描述的是,如果當前執行緒池的數量小於maximumPoolSize的時候,也會建立一個執行緒,來執行任務
五、執行緒池的佇列

執行緒池的佇列,總的來說有3種:

直接提交:工作佇列的預設選項是 SynchronousQueue,它將任務直接提交給執行緒而不保持它們。在此,如果不存在可用於立即執行任務的執行緒,則試圖把任務加入佇列將失敗,因此會構造一個新的執行緒。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。

無界佇列:使用無界佇列(例如,不具有預定義容量的 LinkedBlockingQueue)將導致在所有 corePoolSize 執行緒都忙時新任務在佇列中等待。這樣,建立的執行緒就不會超過 corePoolSize。(因此,maximumPoolSize的值也就無效了。)當每個任務完全獨立於其他任務,即任務執行互不影響時,適合於使用無界佇列;例如,在 Web頁伺服器中。這種排隊可用於處理瞬態突發請求,當命令以超過佇列所能處理的平均數連續到達時,此策略允許無界執行緒具有增長的可能性。

有界佇列:當使用有限的 maximumPoolSizes時,有界佇列(如 ArrayBlockingQueue)有助於防止資源耗盡,但是可能較難調整和控制。佇列大小和最大池大小可能需要相互折衷:使用大型佇列和小型池可以最大限度地降低 CPU 使用率、作業系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O邊界),則系統可能為超過您許可的更多執行緒安排時間。使用小型佇列通常要求較大的池大小,CPU使用率較高,但是可能遇到不可接受的排程開銷,這樣也會降低吞吐量。

下面就來說下執行緒池的佇列,類結構圖如下:


1)SynchronousQueue

該佇列對應的就是上面所說的直接提交,首先SynchronousQueue是無界的,也就是說他存數任務的能力是沒有限制的,但是由於該Queue本身的特性,在某次新增元素後必須等待其他執行緒取走後才能繼續新增

2)LinkedBlockingQueue

該佇列對應的就是上面的無界佇列。

3)ArrayBlockingQueue

該佇列對應的就是上面的有界佇列。ArrayBlockingQueue有以下3中構造方法:

public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = (E[]) new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
}

    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);
        if (capacity < c.size())
            throw new IllegalArgumentException();

        for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
            add(it.next());
    }

下面我們重點來說下這個fairfair表示佇列訪問執行緒的競爭策略,當為true的時候,任務插入佇列遵從FIFO的規則,如果為false,則可以“插隊”。舉個例子,假如現在有很多工在排隊,這個時候正好一個執行緒執行完了任務,同時又新來了一個任務,如果為false的話,這個任務不用在佇列中排隊,可以直接插隊,然後執行。如下圖所示:

六、執行緒池的拒絕執行策略

當執行緒的數量達到最大值時,這個時候,任務還在不斷的來,這個時候,就只好拒絕接受任務了。

ThreadPoolExecutor 允許自定義當新增任務失敗後的執行策略。你可以呼叫執行緒池的 setRejectedExecutionHandler() 方法,用自定義的RejectedExecutionHandler 物件替換現有的策略,ThreadPoolExecutor提供的預設的處理策略是直接丟棄,同時拋異常資訊,ThreadPoolExecutor 提供 個現有的策略,分別是:
ThreadPoolExecutor.AbortPolicy:表示拒絕任務並丟擲異常,原始碼如下:

public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an <tt>AbortPolicy</tt>.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always.
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException(); //拋異常
        }
    }

 ThreadPoolExecutor.DiscardPolicy:表示拒絕任務但不做任何動作,原始碼如下:

public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a <tt>DiscardPolicy</tt>.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        } // 直接拒絕,但不做任何操作
    }

 ThreadPoolExecutor.CallerRunsPolicy:表示拒絕任務,並在呼叫者的執行緒中直接執行該任務,原始碼如下:

public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a <tt>CallerRunsPolicy</tt>.
         */
        public CallerRunsPolicy() { }

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run(); // 直接執行任務
            }
        }
    }

 ThreadPoolExecutor.DiscardOldestPolicy:表示先丟棄任務佇列中的第一個任務,然後把這個任務加進佇列。原始碼如下:

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a <tt>DiscardOldestPolicy</tt> for the given executor.
         */
        public DiscardOldestPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll(); // 丟棄佇列中的第一個任務
                e.execute(r); // 執行新任務
            }
        }
}

當任務源源不斷到來的時候,會從Queue中poll一個任務出來,然後執行新的任務