java 執行緒池 Executors 及 ThreadPoolExecutor
Java通過Executors提供四種執行緒池,分別為:
newCachedThreadPool建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。
newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。
newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。
(1) newCachedThreadPool
建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。示例程式碼如下:
package test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPoolExecutorTest { public static void main(String[] args) { ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int index = i; try { Thread.sleep(index * 1000); } catch (InterruptedException e) { e.printStackTrace(); } cachedThreadPool.execute(new Runnable() { public void run() { System.out.println(index); } }); } } }
執行緒池為無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的執行緒,而不用每次新建執行緒。
(2) newFixedThreadPool
建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。示例程式碼如下:
package test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPoolExecutorTest { public static void main(String[] args) { ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { final int index = i; fixedThreadPool.execute(new Runnable() { public void run() { try { System.out.println(index); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }
因為執行緒池大小為3,每個任務輸出index後sleep 2秒,所以每兩秒列印3個數字。
定長執行緒池的大小最好根據系統資源進行設定。如Runtime.getRuntime().availableProcessors()
(3) newScheduledThreadPool
建立一個定長執行緒池,支援定時及週期性任務執行。延遲執行示例程式碼如下:
package test; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ThreadPoolExecutorTest { public static void main(String[] args) { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.schedule(new Runnable() { public void run() { System.out.println("delay 3 seconds"); } }, 3, TimeUnit.SECONDS); } }
表示延遲3秒執行。
定期執行示例程式碼如下:
package test; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ThreadPoolExecutorTest { public static void main(String[] args) { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.scheduleAtFixedRate(new Runnable() { public void run() { System.out.println("delay 1 seconds, and excute every 3 seconds"); } }, 1, 3, TimeUnit.SECONDS); } }
表示延遲1秒後每3秒執行一次。
(4) newSingleThreadExecutor
建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。示例程式碼如下:
package test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPoolExecutorTest { public static void main(String[] args) { ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { final int index = i; singleThreadExecutor.execute(new Runnable() { public void run() { try { System.out.println(index); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }
結果依次輸出,相當於順序執行各個任務。
你可以使用JDK自帶的監控工具來監控我們建立的執行緒數量,執行一個不終止的執行緒,建立指定量的執行緒,來觀察:
工具目錄: C:\Program Files\Java\jdk1.6.0_06\bin\jconsole.exe
執行程式做稍微修改:
package test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPoolExecutorTest { public static void main(String[] args) { ExecutorService singleThreadExecutor = Executors.newCachedThreadPool(); for (int i = 0; i < 100; i++) { final int index = i; singleThreadExecutor.execute(new Runnable() { public void run() { try { while(true) { System.out.println(index); Thread.sleep(10 * 1000); } } catch (InterruptedException e) { e.printStackTrace(); } } }); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } }ThreadPoolExecutor機制 轉自http://825635381.iteye.com/blog/2184680
一、概述
1、ThreadPoolExecutor作為java.util.concurrent包對外提供基礎實現,以內部執行緒池的形式對外提供管理任務執行,執行緒排程,執行緒池管理等等服務;
2、Executors方法提供的執行緒服務,都是通過引數設定來實現不同的執行緒池機制。
3、先來了解其執行緒池管理的機制,有助於正確使用,避免錯誤使用導致嚴重故障。同時可以根據自己的需求實現自己的執行緒池
二、核心構造方法講解
下面是ThreadPoolExecutor最核心的構造方法
Java程式碼
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- if (corePoolSize < 0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
構造方法引數講解
引數名 | 作用 |
corePoolSize | 核心執行緒池大小 |
maximumPoolSize | 最大執行緒池大小 |
keepAliveTime | 執行緒池中超過corePoolSize數目的空閒執行緒最大存活時間;可以allowCoreThreadTimeOut(true)使得核心執行緒有效時間 |
TimeUnit | keepAliveTime時間單位 |
workQueue | 阻塞任務佇列 |
threadFactory | 新建執行緒工廠 |
RejectedExecutionHandler | 當提交任務數超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理 |
重點講解:
其中比較容易讓人誤解的是:corePoolSize,maximumPoolSize,workQueue之間關係。
1.當執行緒池小於corePoolSize時,新提交任務將建立一個新執行緒執行任務,即使此時執行緒池中存在空閒執行緒。
2.當執行緒池達到corePoolSize時,新提交任務將被放入workQueue中,等待執行緒池中任務排程執行
3.當workQueue已滿,且maximumPoolSize>corePoolSize時,新提交任務會建立新執行緒執行任務
4.當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理
5.當執行緒池中超過corePoolSize執行緒,空閒時間達到keepAliveTime時,關閉空閒執行緒
6.當設定allowCoreThreadTimeOut(true)時,執行緒池中corePoolSize執行緒空閒時間達到keepAliveTime也將關閉
執行緒管理機制圖示:
三、Executors提供的執行緒池配置方案
1、構造一個固定執行緒數目的執行緒池,配置的corePoolSize與maximumPoolSize大小相同,同時使用了一個無界LinkedBlockingQueue存放阻塞任務,因此多餘的任務將存在再阻塞佇列,不會由RejectedExecutionHandler處理
Java程式碼
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
- }
2、構造一個緩衝功能的執行緒池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一個無容量的阻塞佇列 SynchronousQueue,因此任務提交之後,將會建立新的執行緒執行;執行緒空閒超過60s將會銷燬
Java程式碼
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
- }
3、構造一個只支援一個執行緒的執行緒池,配置corePoolSize=maximumPoolSize=1,無界阻塞佇列LinkedBlockingQueue;保證任務由一個執行緒序列執行
Java程式碼
- public static ExecutorService newSingleThreadExecutor() {
- return new FinalizableDelegatedExecutorService
- (new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>()));
- }
4、構造有定時功能的執行緒池,配置corePoolSize,無界延遲阻塞佇列DelayedWorkQueue;有意思的是:maximumPoolSize=Integer.MAX_VALUE,由於DelayedWorkQueue是無界佇列,所以這個值是沒有意義的
Java程式碼
- public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
- return new ScheduledThreadPoolExecutor(corePoolSize);
- }
- public static ScheduledExecutorService newScheduledThreadPool(
- int corePoolSize, ThreadFactory threadFactory) {
- return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
- }
- public ScheduledThreadPoolExecutor(int corePoolSize,
- ThreadFactory threadFactory) {
- super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
- new DelayedWorkQueue(), threadFactory);
- }
四、定製屬於自己的執行緒池
Java程式碼
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.RejectedExecutionHandler;
- import java.util.concurrent.ThreadFactory;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicInteger;
- public class CustomThreadPoolExecutor {
- private ThreadPoolExecutor pool = null;
- /**
- * 執行緒池初始化方法
- *
- * corePoolSize 核心執行緒池大小----10
- * maximumPoolSize 最大執行緒池大小----30
- * keepAliveTime 執行緒池中超過corePoolSize數目的空閒執行緒最大存活時間----30+單位TimeUnit
- * TimeUnit keepAliveTime時間單位----TimeUnit.MINUTES
- * workQueue 阻塞佇列----new ArrayBlockingQueue<Runnable>(10)====10容量的阻塞佇列
- * threadFactory 新建執行緒工廠----new CustomThreadFactory()====定製的執行緒工廠
- * rejectedExecutionHandler 當提交任務數超過maxmumPoolSize+workQueue之和時,
- * 即當提交第41個任務時(前面執行緒都沒有執行完,此測試方法中用sleep(100)),
- * 任務會交給RejectedExecutionHandler來處理
- */
- public void init() {
- pool = new ThreadPoolExecutor(
- 10,
- 30,
- 30,
- TimeUnit.MINUTES,
- new ArrayBlockingQueue<Runnable>(10),
- new CustomThreadFactory(),
- new CustomRejectedExecutionHandler());
- }
- public void destory() {
- if(pool != null) {
- pool.shutdownNow();
- }
- }
- public ExecutorService getCustomThreadPoolExecutor() {
- return this.pool;
- }
- private class CustomThreadFactory implements ThreadFactory {
- private AtomicInteger count = new AtomicInteger(0);
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
- System.out.println(threadName);
- t.setName(threadName);
- return t;
- }
- }
- private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- // 記錄異常
- // 報警處理等
- System.out.println("error.............");
- }
- }
- // 測試構造的執行緒池
- public static void main(String[] args) {
- CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();
- // 1.初始化
- exec.init();
- ExecutorService pool = exec.getCustomThreadPoolExecutor();
- for(int i=1; i<100; i++) {
- System.out.println("提交第" + i + "個任務!");
- pool.execute(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(300);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("running=====");
- }
- });
- }
- // 2.銷燬----此處不能銷燬,因為任務沒有提交執行完,如果銷燬執行緒池,任務也就無法執行了
- // exec.destory();
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
方法中建立一個核心執行緒數為30個,緩衝佇列有10個的執行緒池。每個執行緒任務,執行時會先睡眠0.1秒,保證提交40個任務時沒有任務被執行完,這樣提交第41個任務是,會交給CustomRejectedExecutionHandler 類來處理。