opencv+pytesseract 驗證碼識別!草雞簡單!
一、概述
本章將詳細介紹任務線上程池中的執行情況、空閒執行緒的回收過程、預設提供的四種拒絕策略、執行緒池關閉以及個人覺得比較有意思的地方。
二、任務線上程池中的執行過程
執行緒池的一個重要作用是管理執行緒,實現執行緒的複用,避免反覆建立和銷燬執行緒所帶來的資源消耗,我們通過new方法建立執行緒,start( )方法啟動執行緒,執行緒在執行完run( )方法中的內容後就被回收,那麼執行緒池中是如何建立啟動執行緒,並保持執行緒數量,一直接收任務進行處理的呢?
2.1 執行緒池工作流程
1 ExecutorService executor = new ThreadPoolExecutor(1, 1, 20, TimeUnit.SECONDS, 2new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
以以上執行緒池為例,它大致的執行流程如下圖:
2.2 execute方法
跟著上面流程,首先看提交任務。execute( )方法來自於Executor介面,該方法將任務提交與每個任務將如何執行的機制(包括執行緒使用的細節、排程等)分離開來,和start( )與run( )作用類似。
1 public void execute(Runnable command) { 2executeif (command == null) 3 throw new NullPointerException(); 4 int c = ctl.get(); 5 6 //當前執行緒數小於核心執行緒數,則啟動一個新執行緒執行該任務 7 if (workerCountOf(c) < corePoolSize) { 8 if (addWorker(command, true)) 9 return; 10 c = ctl.get(); 11 } 12 13 //若當前執行緒數大於核心執行緒數,則進入阻塞佇列進行排隊 14 if (isRunning(c) && workQueue.offer(command)) { 15int recheck = ctl.get(); 16 if (! isRunning(recheck) && remove(command)) 17 reject(command); 18 else if (workerCountOf(recheck) == 0) 19 addWorker(null, false); 20 } 21 22 //若排隊不成功,則執行拒絕策略 23 else if (!addWorker(command, false)) 24 reject(command); 25 } 26
該方法用於提交任務,並沒有建立啟動執行緒,我們跟進command在addWorker( )中的情況。
2.3 addWorker方法
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: 3 for (;;) { 4 int c = ctl.get(); 5 int rs = runStateOf(c); 6 7 if (rs >= SHUTDOWN && 8 ! (rs == SHUTDOWN && 9 firstTask == null && 10 ! workQueue.isEmpty())) 11 return false; 12 13 for (;;) { 14 int wc = workerCountOf(c); 15 if (wc >= CAPACITY || 16 //core為true表示建立核心執行緒,那麼當前執行緒數>核心執行緒數,就不能在建立核心執行緒 17 //返回false執行其他邏輯 18 wc >= (core ? corePoolSize : maximumPoolSize)) 19 return false; 20 if (compareAndIncrementWorkerCount(c)) 21 break retry; 22 c = ctl.get(); // Re-read ctl 23 if (runStateOf(c) != rs) 24 continue retry; 25 } 26 } 27 28 boolean workerStarted = false; 29 boolean workerAdded = false; 30 Worker w = null; 31 try { 32 33 //將任務交給執行緒 34 w = new Worker(firstTask); 35 final Thread t = w.thread; 36 if (t != null) { 37 final ReentrantLock mainLock = this.mainLock; 38 mainLock.lock(); 39 try { 40 int rs = runStateOf(ctl.get()); 41 42 if (rs < SHUTDOWN || 43 (rs == SHUTDOWN && firstTask == null)) { 44 if (t.isAlive()) 45 throw new IllegalThreadStateException(); 46 47 //將執行緒新增至執行緒池,執行緒池直接維護的是一組Worker物件 48 workers.add(w); 49 int s = workers.size(); 50 if (s > largestPoolSize) 51 largestPoolSize = s; 52 workerAdded = true; 53 } 54 } finally { 55 mainLock.unlock(); 56 } 57 if (workerAdded) { 58 59 //啟動執行緒 60 t.start(); 61 workerStarted = true; 62 } 63 } 64 } finally { 65 66 //若執行緒啟動失敗,則從執行緒池移除該執行緒 67 if (! workerStarted) 68 addWorkerFailed(w); 69 } 70 return workerStarted; 71 }addWorker
addWork( )主要用於建立一個新執行緒並將其加入執行緒池(HashSet)中,firstTask引數是這個新增的執行緒執行的第一個任務,core引數表示此次建立的是核心執行緒還是非核心執行緒。其中t.start( )啟動了執行緒,t是Worker中的屬性,但t執行的是哪個run( )方法呢?
2.4 Worker類
1 private final class Worker 2 extends AbstractQueuedSynchronizer 3 implements Runnable 4 { 5 final Thread thread; 6 7 Runnable firstTask; 8 9 volatile long completedTasks; 10 11 Worker(Runnable firstTask) { 12 setState(-1); 13 this.firstTask = firstTask; 14 this.thread = getThreadFactory().newThread(this); 15 } 16 17 public void run() { 18 runWorker(this); 19 } 20 } 21Worker
Worker實現了Runnable介面,建構函式中,它將傳入的任務給firstTask,處理任務的執行緒是通過執行緒構造工廠,將Worker本身作為任務交給執行緒建立而成。則addWorker( )方法中t.start( )啟動後執行的是Worker類中的run方法。
這裡的執行緒構造工廠以Executors中DefaultThreadFactory為例,它最終也是通過new建立執行緒。
1 static class DefaultThreadFactory implements ThreadFactory { 2 private static final AtomicInteger poolNumber = new AtomicInteger(1); 3 private final ThreadGroup group; 4 private final AtomicInteger threadNumber = new AtomicInteger(1); 5 private final String namePrefix; 6 7 DefaultThreadFactory() { 8 SecurityManager s = System.getSecurityManager(); 9 group = (s != null) ? s.getThreadGroup() : 10 Thread.currentThread().getThreadGroup(); 11 namePrefix = "pool-" + 12 poolNumber.getAndIncrement() + 13 "-thread-"; 14 } 15 16 public Thread newThread(Runnable r) { 17 Thread t = new Thread(group, r, 18 namePrefix + threadNumber.getAndIncrement(), 19 0); 20 if (t.isDaemon()) 21 t.setDaemon(false); 22 if (t.getPriority() != Thread.NORM_PRIORITY) 23 t.setPriority(Thread.NORM_PRIORITY); 24 return t; 25 } 26 } 27DefaultThreadFactory
2.5 runWorker方法
1 final void runWorker(Worker w) { 2 //當前執行執行緒 3 Thread wt = Thread.currentThread(); 4 Runnable task = w.firstTask; 5 w.firstTask = null; 6 w.unlock(); // allow interrupts 7 boolean completedAbruptly = true; 8 try { 9 //第一個任務執行後,通過getTask取佇列中任務 10 while (task != null || (task = getTask()) != null) { 11 w.lock(); 12 if ((runStateAtLeast(ctl.get(), STOP) || 13 (Thread.interrupted() && 14 runStateAtLeast(ctl.get(), STOP))) && 15 !wt.isInterrupted()) 16 wt.interrupt(); 17 try { 18 beforeExecute(wt, task); 19 Throwable thrown = null; 20 try { 21 //提交的任務實際在此執行 22 task.run(); 23 } catch (RuntimeException x) { 24 thrown = x; throw x; 25 } catch (Error x) { 26 thrown = x; throw x; 27 } catch (Throwable x) { 28 thrown = x; throw new Error(x); 29 } finally { 30 afterExecute(task, thrown); 31 } 32 } finally { 33 task = null; 34 w.completedTasks++; 35 w.unlock(); 36 } 37 } 38 completedAbruptly = false; 39 } finally { 40 processWorkerExit(w, completedAbruptly); 41 } 42 } 43runWorker
2.6 任務執行過程小結
提交第一個任務時,呼叫addWorker( )將任務儲存給Worker,同時通過執行緒構造工廠,以Worker本身為引數建立新執行緒。將該執行緒加入執行緒池,啟動執行緒,執行緒將執行Worker中的run( )方法,run( )方法實際呼叫runWorker( )方法去執行儲存在Worker中的任務。
任務執行完成後,執行緒就會被回收,執行緒池如何實現執行緒複用的呢?
2.7 getTask方法
在runWorker方法中,當前執行緒執行完第一次提交的任務後,會進行while迴圈,通過getTask從阻塞佇列中取任務再執行。
1 private Runnable getTask() { 2 boolean timedOut = false; // Did the last poll() time out? 3 4 for (;;) { 5 int c = ctl.get(); 6 int rs = runStateOf(c); 7 8 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 9 decrementWorkerCount(); 10 return null; 11 } 12 13 int wc = workerCountOf(c); 14 15 //允許核心執行緒超時,或當前執行緒數>核心執行緒數 16 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 17 18 if ((wc > maximumPoolSize || (timed && timedOut)) 19 && (wc > 1 || workQueue.isEmpty())) { 20 if (compareAndDecrementWorkerCount(c)) 21 return null; 22 continue; 23 } 24 25 try { 26 27 //阻塞佇列中的兩個獲取元素方法,兩者都會阻塞 28 //poll(long timeout, TimeUnit unit) 獲取並移除此佇列的頭部,在指定的等待時間前等待可用的元素,超時返回null 29 //take() 獲取並移除此佇列的頭部,在佇列中沒有可用元素之前一直等待 30 Runnable r = timed ? 31 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 32 workQueue.take(); 33 if (r != null) 34 return r; 35 // 36 timedOut = true; 37 } catch (InterruptedException retry) { 38 timedOut = false; 39 } 40 } 41 }getTaskgetTask( )返回任務是通過阻塞佇列中的兩個阻塞方法實現的,當設定允許核心執行緒超時,或者當前執行緒數大於核心執行緒數時,就會通過poll(long timeout, TimeUnit unit)獲取,poll在等待指定時間內還沒有獲取到元素就會返回null,回到runWorker中執行超時或異常執行緒的回收操作;而take( )方法會一直阻塞,直到獲取到元素。兩方法允許被中斷。
2.8 processWorkerExit方法
1 private void processWorkerExit(Worker w, boolean completedAbruptly) { 2 //completedAbruptly表示當前執行緒是否是異常退出 3 //正常結束時,在getTask執行了--操作,異常退出則需要在此執行-- 4 if (completedAbruptly) 5 decrementWorkerCount(); 6 7 final ReentrantLock mainLock = this.mainLock; 8 mainLock.lock(); 9 try { 10 completedTaskCount += w.completedTasks; 11 workers.remove(w); 12 } finally { 13 mainLock.unlock(); 14 } 15 16 //有執行緒退出,可能是最後一個執行緒,嘗試終止執行緒池 17 tryTerminate(); 18 19 int c = ctl.get(); 20 21 22 //如果此時執行緒池還未stop,即嘗試終止執行緒池失敗,需要確認是否新增一個Worker 23 if (runStateLessThan(c, STOP)) { 24 if (!completedAbruptly) { 25 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 26 if (min == 0 && ! workQueue.isEmpty()) 27 min = 1; 28 if (workerCountOf(c) >= min) 29 return; // replacement not needed 30 } 31 addWorker(null, false); 32 } 33 }processWorkerExit
此方法用於清理退出的執行緒,將其從執行緒池中移除。當執行緒池還未終止,此時假如執行緒因異常退出,或執行緒池中無執行緒而任務佇列還有任務,或者當前執行緒數量小於核心執行緒,就會新增一個新的執行緒。若執行緒池無任務,且沒有核心執行緒或核心執行緒允許超時,則最後一個執行緒通過此方法結束,執行緒池終止。
2.9 執行緒複用實現小結
執行緒池中執行緒的複用是通過阻塞實現的,當執行緒池中執行緒啟動後,就會進入迴圈不斷從阻塞佇列中獲取任務,當阻塞佇列中沒有任務時,就會在poll( )或take( )方法陷入阻塞。若有執行緒等待超時,getTask( )會返回null,進入執行緒清理工作。
根據這個思路,我們也可以通過wait( )和notifyAll( )實現複用的功能。
三、ThreadPoolExecutor四種預設的拒絕策略
3.1 AbortPolicy
1 public static class AbortPolicy implements RejectedExecutionHandler { 2 public AbortPolicy() { } 3 4 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 5 throw new RejectedExecutionException("Task " + r.toString() + 6 " rejected from " + 7 e.toString()); 8 } 9 }AbortPolicy
AbortPolicy是丟棄任務,丟擲丟擲執行時 RejectedExecutionException;
1 ExecutorService executor = new ThreadPoolExecutor(1, 1, 20, TimeUnit.SECONDS, 2 new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); 3 4 for (int i = 0;i < 10;i++) 5 executor.execute(new PolicyTest());//sleep 5 seconds 6 7 result:Exception in thread "main" java.util.concurrent.RejectedExecutionExceptionAbortPolicyTest
3.2 CallerRunsPolicy
1 public static class CallerRunsPolicy implements RejectedExecutionHandler { 2 public CallerRunsPolicy() { } 3 4 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 5 if (!e.isShutdown()) { 6 r.run(); 7 } 8 } 9 }CallerRunsPolicy
CallerRunsPolicy是直接在建立執行緒池的執行緒中執行被拒絕的任務,如果執行緒池已經關閉,則直接丟棄任務;
1 ExecutorService executor = new ThreadPoolExecutor(1, 1, 2, TimeUnit.SECONDS, 2 new ArrayBlockingQueue<>(1), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); 3 4 for (int i = 0; i < 3; i++) { 5 executor.execute(new PolicyTest());//print the name executed thread 6 } 7 8 executor.shutdown(); 9 TimeUnit.SECONDS.sleep(10); 10 11 System.out.println("The ThreadPoolExecutor was shutdown"); 12 executor.execute(new PolicyTest()); 13 14 result:I was executed by main 15 I was executed by pool-1-thread-1 16 I was executed by pool-1-thread-1 17 The ThreadPoolExecutor was shutdownCallerRunsPolicy Test
3.3 DiscardOldestPolicy
1 public static class DiscardOldestPolicy implements RejectedExecutionHandler { 2 public DiscardOldestPolicy() { } 3 4 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 5 if (!e.isShutdown()) { 6 e.getQueue().poll(); 7 e.execute(r); 8 } 9 } 10 }DiscardOldestPolicy
DiscardOldestPolicy是放棄最舊的未處理請求,然後重試execute,如果執行緒池已關閉,則丟棄該任務;
1 public static void main(String[] args) throws InterruptedException { 2 ExecutorService executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, 3 new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy()); 4 5 for (int i = 0; i < 3; i++) { 6 executor.execute(new PolicyTest());//列印任務建立的時間和執行的執行緒 7 TimeUnit.SECONDS.sleep(1); 8 } 9 10 System.out.println("Runnables in queue:"); 11 BlockingQueue<Runnable> queue = ((ThreadPoolExecutor) executor).getQueue(); 12 for (Runnable r: queue){ 13 System.out.println(r.toString());//列印任務建立時間 14 } 15 16 TimeUnit.SECONDS.sleep(1); 17 System.out.println("-------execute a new runnable-------"); 18 executor.execute(new PolicyTest()); 19 20 System.out.println("New runnables in queue:"); 21 BlockingQueue<Runnable> queue2 = ((ThreadPoolExecutor) executor).getQueue(); 22 for (Runnable r: queue2){ 23 System.out.println(r.toString()); 24 } 25 } 26 27 result: I was created at 15:57:04.667 and execute by pool-1-thread-1 28 Runnables in queue: 29 I was created at 15:57:05.669 30 I was created at 15:57:06.669 31 -------execute a new runnable------- 32 New runnables in queue: 33 I was created at 15:57:06.669 34 I was created at 15:57:08.672 35 I was created at 15:57:06.669 and execute by pool-1-thread-1 36 I was created at 15:57:08.672 and execute by pool-1-thread-1 37 //執行新任務後,最先建立的任務被刪除,新任務重新被executeDiscardOldestPolicyTest
3.4 DiscardPolicy
預設情況下它將丟棄被拒絕的任務。
1 public static class DiscardPolicy implements RejectedExecutionHandler { 2 public DiscardPolicy() { } 3 4 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 5 } 6 }DiscardPolicy
四、關閉執行緒池
4.1 ctl變數
執行緒池用ctl變量表示了兩部分資訊,執行緒池的狀態和當前執行緒數量。
1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 2 private static final int COUNT_BITS = Integer.SIZE - 3; 3 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 4 5 // runState is stored in the high-order bits 6 private static final int RUNNING = -1 << COUNT_BITS; 7 private static final int SHUTDOWN = 0 << COUNT_BITS; 8 private static final int STOP = 1 << COUNT_BITS; 9 private static final int TIDYING = 2 << COUNT_BITS; 10 private static final int TERMINATED = 3 << COUNT_BITS; 11 12 // Packing and unpacking ctl 13 private static int runStateOf(int c) { return c & ~CAPACITY; } 14 private static int workerCountOf(int c) { return c & CAPACITY; } 15 private static int ctlOf(int rs, int wc) { return rs | wc; }ctl
通過位運算,執行緒池將一個32位的int型別的數的高3位用於表示執行緒池狀態,低29位用於表示當前執行緒數量。因此執行緒池執行緒最大值為2^29-1,即Capacity;而-1、0、1、2、3分別左移29位為111,000,001,010,011(後29位都是0),用來表示5種狀態。
runStateOf( )方法通過將capacity取反(高三位全為1,低29位全為0),進行&操作就可以獲得高3位;workerCountOf( )直接&,就可以保留下低29位;ctlOf( )將runState | workerCount就可以將兩部分合並。
4.2 執行緒池狀態
在追蹤任務執行流程中,幾個主要方法裡存在大量的狀態判斷,執行緒池有五個狀態,不同狀態下的工作許可權不同。
- RUNNING:執行緒池建立就處於RUNNING狀態,該狀態下能夠接收新任務,以及對已新增的任務進行處理;
- SHUTDOWN:如果呼叫了shutdown( )方法,則執行緒池處於SHUTDOWN狀態,該狀態不接收新任務,但能處理已新增的任務;
1 public static void main(String[] args) throws InterruptedException { 2 ExecutorService executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, 3 new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); 4 5 for (int i = 0;i < 3;i++) { 6 executor.execute(new PolicyTest());//列印任務開始標誌(時間和執行執行緒),sleep2秒,列印任務完成情況 7 TimeUnit.SECONDS.sleep(1); 8 } 9 BlockingQueue<Runnable> queue = ((ThreadPoolExecutor) executor).getQueue(); 10 for (Runnable r: queue) 11 System.out.println(r); 12 13 System.out.println(executor.isShutdown()); 14 executor.shutdown(); 15 System.out.println(executor.isShutdown()); 16 17 TimeUnit.SECONDS.sleep(1); 18 executor.execute(new PolicyTest()); 19 } 20 21 result:I was created at 10:13:03.371 and execute by pool-1-thread-1 22 I was created at 10:13:04.371 23 I was created at 10:13:05.371 24 false 25 true 26 Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task I was created at 10:13:07.373 27 I was created at 10:13:03.371 and finish by pool-1-thread-1 28 I was created at 10:13:04.371 and execute by pool-1-thread-1 29 I was created at 10:13:04.371 and finish by pool-1-thread-1 30 I was created at 10:13:05.371 and execute by pool-1-thread-1 31 I was created at 10:13:05.371 and finish by pool-1-thread-1shutdownTest
以上例,雖然已經執行了shutdown( )方法,isShutdown( )返回了true,但是執行緒池並沒有立即停止,而是繼續執行完正在執行和佇列中已存在任務,直到任務完成以後才退出,不過shutdown( )以後對新提交的任務執行了拒絕策略。
- STOP:如果呼叫shutdownNow( )方法,則執行緒池處與於STOP狀態,此狀態不接收新任務,不處理已新增的任務,並且會嘗試中斷正在處理的任務;
1 //以上例,將shutdown( )換成shutdownNow( ),列印任務佇列中元素,並列印shutdownNow返回值 2 result:I was created at 10:40:32.554 and execute by pool-1-thread-1 3 任務佇列中任務: 4 I was created at 10:40:33.555 5 I was created at 10:40:34.555 6 false 7 shutdownNow返回刪除的任務: 8 I was created at 10:40:33.555 9 I was created at 10:40:34.555 10 true 11 Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task I was created at 10:40:36.558shutdownNowTest
將上例shutdown( )換成shutdownNow( ) ,執行緒池立即退出,正在執行的任務被中斷,任務佇列中的任務被刪除,新提交的任務執行了拒絕策略。
- TIDYING:當所有的任務已終止,ctl記錄的”任務數量”為0,執行緒池會變為TIDYING狀態,TIDYING狀態時,會執行鉤子函式terminated( );
- TERMINATED:當執行緒池處於SHUTDOWN或STOP狀態,並且所有工作執行緒已經銷燬,任務快取佇列已經清空或執行結束後,執行緒池被設定為TERMINATED狀態。
4.3 shutdown和shutdownNow方法
上面測試了呼叫shutdown( )和shutdownNow( )方法執行緒池對任務會有不同的處理方式,我們從原始碼繼續剖析。
1 public void shutdown() { 2 final ReentrantLock mainLock = this.mainLock; 3 mainLock.lock(); 4 try { 5 checkShutdownAccess(); 6 //設定狀態 7 advanceRunState(SHUTDOWN); 8 interruptIdleWorkers(); 9 onShutdown(); // hook for ScheduledThreadPoolExecutor 10 } finally { 11 mainLock.unlock(); 12 } 13 //嘗試終止執行緒池 14 tryTerminate(); 15 } 16 17 public List<Runnable> shutdownNow() { 18 List<Runnable> tasks; 19 final ReentrantLock mainLock = this.mainLock; 20 mainLock.lock(); 21 try { 22 checkShutdownAccess(); 23 advanceRunState(STOP); 24 interruptWorkers(); 25 //複製任務到List 26 tasks = drainQueue(); 27 } finally { 28 mainLock.unlock(); 29 } 30 tryTerminate(); 31 return tasks; 32 }shutdown and shutdownNow
從原始碼看兩者主要步驟有設定狀態,interrupt執行緒,嘗試終結執行緒池。shutdown( )將狀態設為SHUTDOWN,shutdownNow( )將狀態設定為STOP(advanceRunState( )方法如果當前狀態大於你要設定的狀態則設定失敗,因為執行緒池狀態表示是遞增的,如果當前狀態數值更大,說明已經經歷過此狀態),兩者主要不同在於分別使用了interruptidleWorkers( )和interruptWorkers( )方法interrupt執行緒,最後兩者都嘗試終結執行緒池。這兩個方法均不會阻塞,直接返回,同時它們都是通過interrupt( )中斷執行緒,而interrupt並不是真正意義上的中斷執行緒,它只是設定了一箇中斷標誌(訊號),只有當執行緒對該標誌進行響應(Thread.interrupted( )、Thread.isInterrupted( ))才會退出任務,對於阻塞的執行緒,呼叫中斷時,執行緒將會立刻退出阻塞狀態並丟擲 InterruptedException 異常。因此接收不到interrupt訊號或沒對收到訊號進行處理的任務,可能永遠也不會結束。
1 public static void main(String[] args) throws InterruptedException { 2 ExecutorService executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, 3 new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); 4 5 executor.execute(new NoneInterruptFlag()); 6 System.out.println(((ThreadPoolExecutor) executor).getActiveCount() + " active count"); 7 executor.shutdownNow(); 8 TimeUnit.SECONDS.sleep(1); 9 System.out.println(executor.isShutdown()); 10 System.out.println(((ThreadPoolExecutor) executor).getActiveCount() + " active count"); 11 } 12 13 static class NoneInterruptFlag implements Runnable { 14 15 @Override 16 public void run() { 17 while (true) { 18 //do something 19 } 20 } 21 }NoneInterruptTask
上面程式,即使呼叫shutdown( )也無法終止執行緒,interrupt只是發出了一箇中斷訊號,而run方法中沒有接收這個訊號,也就無法終止。或者如下丟擲了異常,但並沒有進行處理,仍會繼續執行。
1 @Override 2 public void run() { 3 while (true) { 4 try { 5 TimeUnit.SECONDS.sleep(10); 6 System.out.println("I am still running!"); 7 } catch (InterruptedException e) { 8 e.printStackTrace(); 9 } 10 } 11 }notDealInterruptException
4.4 interruptIdleWorkers和interruptWorkers方法
1 private void interruptIdleWorkers() { 2 interruptIdleWorkers(false); 3 } 4 5 private void interruptIdleWorkers(boolean onlyOne) { 6 final ReentrantLock mainLock = this.mainLock; 7 mainLock.lock(); 8 try { 9 for (Worker w : workers) { 10 Thread t = w.thread; 11 //對執行緒嘗試獲取鎖 12 if (!t.isInterrupted() && w.tryLock()) { 13 try { 14 t.interrupt(); 15 } catch (SecurityException ignore) { 16 } finally { 17 w.unlock(); 18 } 19 } 20 if (onlyOne) 21 break; 22 } 23 } finally { 24 mainLock.unlock(); 25 } 26 }interruptIdleWorkers
interruptIdleWorkers對所有執行緒進行遍歷時,進行了tryLock( )操作,只有加鎖成功的執行緒才會執行interrupt( )。什麼情況執行緒才能被加鎖呢?回到runWorker( ),執行緒在執行任務之前有w.lock( )操作,因此只有沒有執行任務的執行緒會tryLock( )成功,執行interrupt( )。
1 private void interruptWorkers() { 2 final ReentrantLock mainLock = this.mainLock; 3 mainLock.lock(); 4 try { 5 for (Worker w : workers) 6 w.interruptIfStarted(); 7 } finally { 8 mainLock.unlock(); 9 } 10 } 11 12 13 void interruptIfStarted() { 14 Thread t; 15 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { 16 try { 17 t.interrupt(); 18 } catch (SecurityException ignore) { 19 } 20 } 21 }interruptWorkers
interruptWorkers( )呼叫了Worker類中的interruptIfStarted( )方法,只有Worker的state大於等於0時才會執行interrupt( )。Worker實現了AQS類,狀態 1 表示鎖被佔用,0 表示鎖被釋放,因此大於等於 0 就代表了所有的執行緒,即執行緒池內全部執行緒都會執行interrupt( )。
4.5 tryTerminate方法
處理完執行緒池內執行緒,還需要處理任務佇列。shutdownNow( )呼叫了drainQueue( )方法將佇列中的元素複製到List,刪除佇列中的元素。
1 tasks = drainQueue(); 2 3 private List<Runnable> drainQueue() { 4 BlockingQueue<Runnable> q = workQueue; 5 List<Runnable> taskList = new ArrayList<Runnable>(); 6 q.drainTo(taskList); 7 //若q中還有元素表示drainTo出錯,將出錯元素重新加入list 8 if (!q.isEmpty()) { 9 for (Runnable r : q.toArray(new Runnable[0])) { 10 if (q.remove(r)) 11 taskList.add(r); 12 } 13 } 14 return taskList; 15 } 16 17 /* 18 *移除此佇列中所有可用的元素,並將它們新增到給定 collection 中。 19 *在試圖向 collection c 中新增元素沒有成功時,可能導致在丟擲相關異常時, 20 *元素會同時在兩個 collection 中出現,或者在其中一個 collection 中出現, 21 *也可能在兩個 collection 中都不出現。如果試圖將一個佇列放入自身佇列中, 22 *則會導致 IllegalArgumentException 異常。此外,如果正在進行此操作時修 23 *改指定的 collection,則此操作行為是不確定的。 24 */ 25 public int drainTo(Collection<? super E> c) { 26 checkNotNull(c); 27 if (c == this) 28 throw new IllegalArgumentException(); 29 final Object[] items = this.items; 30 final ReentrantLock lock = this.lock; 31 lock.lock(); 32 try { 33 int i = takeIndex; 34 int n = 0; 35 int max = count; 36 while (n < max) { 37 c.add(this.<E>cast(items[i])); 38 items[i] = null; 39 i = inc(i); 40 ++n; 41 } 42 if (n > 0) { 43 count = 0; 44 putIndex = 0; 45 takeIndex = 0; 46 notFull.signalAll(); 47 } 48 return n; 49 } finally { 50 lock.unlock(); 51 } 52 }drainQueue
shutdown( )沒有處理佇列,而是在執行tryTerminate( )方法時進行了判斷。文件註釋為:在以下情況將執行緒池變為TERMINATED終止狀態,shutdown 且 正在執行的worker 和 workQueue佇列都empty, stop 且沒有正在執行的worker。 這個方法必須在任何可能導致執行緒池終止的情況下被呼叫,如: 減少worker數量;shutdown時從queue中移除任務。
1 final void tryTerminate() { 2 for (;;) { 3 int c = ctl.get(); 4 5 //如果是執行緒池正running或正在關閉、已經關閉(tidying、terminated) 6 //或處於shutdown狀態且任務佇列非空,直接返回 7 if (isRunning(c) || 8 runStateAtLeast(c, TIDYING) || 9 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) 10 return; 11 12 //此時執行緒池處於stop 或 shutdown且佇列為空 的狀態 13 //如果還有執行緒,就嘗試中斷一個執行緒,返回 14 if (workerCountOf(c) != 0) { // Eligible to terminate 15 interruptIdleWorkers(ONLY_ONE); 16 return; 17 } 18 19 final ReentrantLock mainLock = this.mainLock; 20 mainLock.lock(); 21 try { 22 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { 23 try { 24 terminated(); 25 } finally { 26 ctl.set(ctlOf(TERMINATED, 0)); 27 28 //喚醒等待執行緒池終止的執行緒,awaitTermination( ) 29 termination.signalAll(); 30 } 31 return; 32 } 33 } finally { 34 mainLock.unlock(); 35 } 36 // else retry on failed CAS 37 } 38 } 39tryTerminate
4.6 awaitTermination方法
1 public boolean awaitTermination(long timeout, TimeUnit unit) 2 throws InterruptedException { 3 //將超時時間轉化為納秒單位 4 long nanos = unit.toNanos(timeout); 5 final ReentrantLock mainLock = this.mainLock; 6 mainLock.lock(); 7 try { 8 for (;;) { 9 //若狀態是terminated,返回true 10 if (runStateAtLeast(ctl.get(), TERMINATED)) 11 return true; 12 //超時返回false 13 if (nanos <= 0) 14 return false; 15 //實現阻塞 16 nanos = termination.awaitNanos(nanos); 17 } 18 } finally { 19 mainLock.unlock(); 20 } 21 }awaitTermination
上面說到shutdown( )和shutdownNow( )方法都是不阻塞方法,awaitTermination( )也用於終結執行緒池,它是一個阻塞方法。該方法並沒有對執行緒池中執行緒或狀態做任何修改,僅僅是在超時後,返回一個結果。
問題一,如果在該方法阻塞期間,另一個執行緒shutdown了執行緒池,該方法還會繼續阻塞嗎?
1 public static void main(String[] args) throws InterruptedException { 2 ExecutorService executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, 3 new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); 4 5 executor.execute(new PolicyTest());//sleep 10 seconds 6 7 new Thread(() -> { 8 try { 9 //保證awaitTermination已阻塞 10 TimeUnit.SECONDS.sleep(1); 11 executor.shutdownNow(); 12 } catch (InterruptedException e) { 13 e.printStackTrace(); 14 } 15 }).start(); 16 17 System.out.println("awaitTermination已阻塞"); 18 boolean b = executor.awaitTermination(20, TimeUnit.SECONDS); 19 System.out.println(b); 20 System.out.println(executor.isTerminated()); 21 } 22 23 result:I was created at 15:37:14.897 and execute by pool-1-thread-1 24 awaitTermination已阻塞 25 true 26 trueawaitTerminationTest1結果該方法立刻退出了阻塞,在分析tryTerminate( )方法時,出現了termination.signalAll()呼叫,該方法用於喚醒等待執行緒池終止的執行緒,也就是呼叫了awaitTermination( )方法的執行緒。
問題二,有沒有可能執行緒池已經退出,awaitTermination( )依然返回false?
1 public static void main(String[] args) throws InterruptedException { 2 ExecutorService executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, 3 new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); 4 5 ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true); 6 executor.execute(new PolicyTest());//sleep 3 seconds 7 8 boolean b = executor.awaitTermination(20, TimeUnit.SECONDS); 9 System.out.println(b); 10 System.out.println(executor.isTerminated()); 11 } 12 13 result:I was created at 15:26:14.046 and execute by pool-1-thread-1 14 I was created at 15:26:14.046 and finish by pool-1-thread-1 15 falseawaitTerminationTest2
結果執行緒池實際已經退出,但awaitTermination( )方法還在阻塞且最後返回false。awaitTermination( )方法是在指定時間內,執行緒池為terminated狀態返回true,而到terminated狀態必定呼叫了tryterminate( )且成功設定了terminated狀態。但除此之外,當執行緒池沒有核心執行緒或允許核心執行緒超時,執行緒池最終會通過processWorkerExit( )方法結束所有執行緒,執行緒池直接從running狀態退出。
4.7 優雅關閉執行緒池
如果直接粗暴關掉執行緒池,可能正在執行的任務就突然丟失了;但有一些任務可能耗費時間太長,不可能一直等待。比較shutdown( )和shutdownNow( ),前者比較溫柔,後者就比較簡單粗暴了,此時可以將三個方法一起用,先進行shutdwon( ),如果在指定時間內還沒有結束,才呼叫shutdownNow( )。
1 executor.shutdown(); 2 try { 3 if (!executor.awaitTermination(10, TimeUnit.MINUTES)) 4 executor.shutdownNow(); 5 } catch (InterruptedException e) { 6 executor.shutdownNow(); 7 }elegantEnd
如果遇到類似上面while(true)的情況關閉不了,也可以通過將執行緒設定為守護執行緒,當然肯定要避免這樣的情況。