1. 程式人生 > 實用技巧 >opencv+pytesseract 驗證碼識別!草雞簡單!

opencv+pytesseract 驗證碼識別!草雞簡單!

一、概述

本章將詳細介紹任務線上程池中的執行情況、空閒執行緒的回收過程、預設提供的四種拒絕策略、執行緒池關閉以及個人覺得比較有意思的地方。

二、任務線上程池中的執行過程

執行緒池的一個重要作用是管理執行緒,實現執行緒的複用,避免反覆建立和銷燬執行緒所帶來的資源消耗,我們通過new方法建立執行緒,start( )方法啟動執行緒,執行緒在執行完run( )方法中的內容後就被回收,那麼執行緒池中是如何建立啟動執行緒,並保持執行緒數量,一直接收任務進行處理的呢?

2.1 執行緒池工作流程

  1 ExecutorService executor = new ThreadPoolExecutor(1, 1, 20, TimeUnit.SECONDS,
  2
new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

以以上執行緒池為例,它大致的執行流程如下圖:

2.2 execute方法

跟著上面流程,首先看提交任務。execute( )方法來自於Executor介面,該方法將任務提交與每個任務將如何執行的機制(包括執行緒使用的細節、排程等)分離開來,和start( )與run( )作用類似。

  1 public void execute(Runnable command) {
  2
if (command == null) 3 throw new NullPointerException(); 4 int c = ctl.get(); 5 6 //當前執行緒數小於核心執行緒數,則啟動一個新執行緒執行該任務 7 if (workerCountOf(c) < corePoolSize) { 8 if (addWorker(command, true)) 9 return; 10 c = ctl.get(); 11 } 12 13 //若當前執行緒數大於核心執行緒數,則進入阻塞佇列進行排隊 14 if (isRunning(c) && workQueue.offer(command)) { 15
int recheck = ctl.get(); 16 if (! isRunning(recheck) && remove(command)) 17 reject(command); 18 else if (workerCountOf(recheck) == 0) 19 addWorker(null, false); 20 } 21 22 //若排隊不成功,則執行拒絕策略 23 else if (!addWorker(command, false)) 24 reject(command); 25 } 26
execute

該方法用於提交任務,並沒有建立啟動執行緒,我們跟進command在addWorker( )中的情況。

2.3 addWorker方法

  1 private boolean addWorker(Runnable firstTask, boolean core) {
  2 	retry:
  3 	for (;;) {
  4 		int c = ctl.get();
  5 		int rs = runStateOf(c);
  6 
  7 		if (rs >= SHUTDOWN &&
  8 			! (rs == SHUTDOWN &&
  9 			   firstTask == null &&
 10 			   ! workQueue.isEmpty()))
 11 			return false;
 12 
 13 		for (;;) {
 14 			int wc = workerCountOf(c);
 15 			if (wc >= CAPACITY ||
 16 			    //core為true表示建立核心執行緒,那麼當前執行緒數>核心執行緒數,就不能在建立核心執行緒
 17 				//返回false執行其他邏輯
 18 				wc >= (core ? corePoolSize : maximumPoolSize))
 19 				return false;
 20 			if (compareAndIncrementWorkerCount(c))
 21 				break retry;
 22 			c = ctl.get();  // Re-read ctl
 23 			if (runStateOf(c) != rs)
 24 				continue retry;
 25 		}
 26 	}
 27 
 28 	boolean workerStarted = false;
 29 	boolean workerAdded = false;
 30 	Worker w = null;
 31 	try {
 32 
 33 	    //將任務交給執行緒
 34 		w = new Worker(firstTask);
 35 		final Thread t = w.thread;
 36 		if (t != null) {
 37 			final ReentrantLock mainLock = this.mainLock;
 38 			mainLock.lock();
 39 			try {
 40 				int rs = runStateOf(ctl.get());
 41 
 42 				if (rs < SHUTDOWN ||
 43 					(rs == SHUTDOWN && firstTask == null)) {
 44 					if (t.isAlive())
 45 						throw new IllegalThreadStateException();
 46 
 47 					//將執行緒新增至執行緒池,執行緒池直接維護的是一組Worker物件
 48 					workers.add(w);
 49 					int s = workers.size();
 50 					if (s > largestPoolSize)
 51 						largestPoolSize = s;
 52 					workerAdded = true;
 53 				}
 54 			} finally {
 55 				mainLock.unlock();
 56 			}
 57 			if (workerAdded) {
 58 
 59 			    //啟動執行緒
 60 				t.start();
 61 				workerStarted = true;
 62 			}
 63 		}
 64 	} finally {
 65 
 66 		//若執行緒啟動失敗,則從執行緒池移除該執行緒
 67 		if (! workerStarted)
 68 			addWorkerFailed(w);
 69 	}
 70 	return workerStarted;
 71 }
addWorker

addWork( )主要用於建立一個新執行緒並將其加入執行緒池(HashSet)中,firstTask引數是這個新增的執行緒執行的第一個任務,core引數表示此次建立的是核心執行緒還是非核心執行緒。其中t.start( )啟動了執行緒,t是Worker中的屬性,但t執行的是哪個run( )方法呢?

2.4 Worker類

  1 private final class Worker
  2 	extends AbstractQueuedSynchronizer
  3 	implements Runnable
  4 {
  5 	final Thread thread;
  6 
  7 	Runnable firstTask;
  8 
  9 	volatile long completedTasks;
 10 
 11 	Worker(Runnable firstTask) {
 12 		setState(-1);
 13 		this.firstTask = firstTask;
 14 		this.thread = getThreadFactory().newThread(this);
 15 	}
 16 
 17 	public void run() {
 18 		runWorker(this);
 19 	}
 20 }
 21 
Worker

Worker實現了Runnable介面,建構函式中,它將傳入的任務給firstTask,處理任務的執行緒是通過執行緒構造工廠,將Worker本身作為任務交給執行緒建立而成。則addWorker( )方法中t.start( )啟動後執行的是Worker類中的run方法。

這裡的執行緒構造工廠以Executors中DefaultThreadFactory為例,它最終也是通過new建立執行緒。

  1 static class DefaultThreadFactory implements ThreadFactory {
  2 	private static final AtomicInteger poolNumber = new AtomicInteger(1);
  3 	private final ThreadGroup group;
  4 	private final AtomicInteger threadNumber = new AtomicInteger(1);
  5 	private final String namePrefix;
  6 
  7 	DefaultThreadFactory() {
  8 		SecurityManager s = System.getSecurityManager();
  9 		group = (s != null) ? s.getThreadGroup() :
 10 							  Thread.currentThread().getThreadGroup();
 11 		namePrefix = "pool-" +
 12 					  poolNumber.getAndIncrement() +
 13 					 "-thread-";
 14 	}
 15 
 16 	public Thread newThread(Runnable r) {
 17 		Thread t = new Thread(group, r,
 18 							  namePrefix + threadNumber.getAndIncrement(),
 19 							  0);
 20 		if (t.isDaemon())
 21 			t.setDaemon(false);
 22 		if (t.getPriority() != Thread.NORM_PRIORITY)
 23 			t.setPriority(Thread.NORM_PRIORITY);
 24 		return t;
 25 	}
 26 }
 27 
DefaultThreadFactory

2.5 runWorker方法

  1 final void runWorker(Worker w) {
  2     //當前執行執行緒
  3      Thread wt = Thread.currentThread();
  4      Runnable task = w.firstTask;
  5      w.firstTask = null;
  6      w.unlock(); // allow interrupts
  7      boolean completedAbruptly = true;
  8      try {
  9 	  //第一個任務執行後,通過getTask取佇列中任務
 10 	   while (task != null || (task = getTask()) != null) {
 11 		w.lock();
 12 		if ((runStateAtLeast(ctl.get(), STOP) ||
 13 			 (Thread.interrupted() &&
 14 		            runStateAtLeast(ctl.get(), STOP))) &&
 15 				!wt.isInterrupted())
 16 			wt.interrupt();
 17 		try {
 18 			beforeExecute(wt, task);
 19 			Throwable thrown = null;
 20 			try {
 21 			    //提交的任務實際在此執行
 22 				task.run();
 23 			} catch (RuntimeException x) {
 24 				thrown = x; throw x;
 25 			} catch (Error x) {
 26 				thrown = x; throw x;
 27 			} catch (Throwable x) {
 28 				thrown = x; throw new Error(x);
 29 			} finally {
 30 				afterExecute(task, thrown);
 31 			}
 32 		} finally {
 33 			task = null;
 34 			w.completedTasks++;
 35 			w.unlock();
 36 		}
 37 	  }
 38 	       completedAbruptly = false;
 39          } finally {
 40 	       processWorkerExit(w, completedAbruptly);
 41          }
 42 }
 43 
runWorker

2.6 任務執行過程小結

提交第一個任務時,呼叫addWorker( )將任務儲存給Worker,同時通過執行緒構造工廠,以Worker本身為引數建立新執行緒。將該執行緒加入執行緒池,啟動執行緒,執行緒將執行Worker中的run( )方法,run( )方法實際呼叫runWorker( )方法去執行儲存在Worker中的任務。

任務執行完成後,執行緒就會被回收,執行緒池如何實現執行緒複用的呢?

2.7 getTask方法

在runWorker方法中,當前執行緒執行完第一次提交的任務後,會進行while迴圈,通過getTask從阻塞佇列中取任務再執行。

  1 private Runnable getTask() {
  2 	boolean timedOut = false; // Did the last poll() time out?
  3 
  4 	for (;;) {
  5 		int c = ctl.get();
  6 		int rs = runStateOf(c);
  7 
  8 		if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  9 			decrementWorkerCount();
 10 			return null;
 11 		}
 12 
 13 		int wc = workerCountOf(c);
 14 
 15 		//允許核心執行緒超時,或當前執行緒數>核心執行緒數
 16 		boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
 17 
 18 		if ((wc > maximumPoolSize || (timed && timedOut))
 19 			&& (wc > 1 || workQueue.isEmpty())) {
 20 			if (compareAndDecrementWorkerCount(c))
 21 				return null;
 22 			continue;
 23 		}
 24 
 25 		try {
 26 
 27 			//阻塞佇列中的兩個獲取元素方法,兩者都會阻塞
 28 			//poll(long timeout, TimeUnit unit) 獲取並移除此佇列的頭部,在指定的等待時間前等待可用的元素,超時返回null
 29 			//take() 獲取並移除此佇列的頭部,在佇列中沒有可用元素之前一直等待
 30 			Runnable r = timed ?
 31 				workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
 32 				workQueue.take();
 33 			if (r != null)
 34 				return r;
 35 			//
 36 			timedOut = true;
 37 		} catch (InterruptedException retry) {
 38 			timedOut = false;
 39 		}
 40 	}
 41  }
getTaskgetTask( )返回任務是通過阻塞佇列中的兩個阻塞方法實現的,當設定允許核心執行緒超時,或者當前執行緒數大於核心執行緒數時,就會通過poll(long timeout, TimeUnit unit)獲取,poll在等待指定時間內還沒有獲取到元素就會返回null,回到runWorker中執行超時或異常執行緒的回收操作;而take( )方法會一直阻塞,直到獲取到元素。兩方法允許被中斷。

2.8 processWorkerExit方法

  1 private void processWorkerExit(Worker w, boolean completedAbruptly) {
  2         //completedAbruptly表示當前執行緒是否是異常退出
  3 		//正常結束時,在getTask執行了--操作,異常退出則需要在此執行--
  4         if (completedAbruptly)
  5             decrementWorkerCount();
  6 
  7         final ReentrantLock mainLock = this.mainLock;
  8         mainLock.lock();
  9         try {
 10             completedTaskCount += w.completedTasks;
 11             workers.remove(w);
 12         } finally {
 13             mainLock.unlock();
 14         }
 15 
 16         //有執行緒退出,可能是最後一個執行緒,嘗試終止執行緒池
 17         tryTerminate();
 18 
 19         int c = ctl.get();
 20 
 21 
 22 		//如果此時執行緒池還未stop,即嘗試終止執行緒池失敗,需要確認是否新增一個Worker
 23         if (runStateLessThan(c, STOP)) {
 24             if (!completedAbruptly) {
 25                 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
 26                 if (min == 0 && ! workQueue.isEmpty())
 27                     min = 1;
 28                 if (workerCountOf(c) >= min)
 29                     return; // replacement not needed
 30             }
 31             addWorker(null, false);
 32         }
 33     }
processWorkerExit

此方法用於清理退出的執行緒,將其從執行緒池中移除。當執行緒池還未終止,此時假如執行緒因異常退出,或執行緒池中無執行緒而任務佇列還有任務,或者當前執行緒數量小於核心執行緒,就會新增一個新的執行緒。若執行緒池無任務,且沒有核心執行緒或核心執行緒允許超時,則最後一個執行緒通過此方法結束,執行緒池終止。

2.9 執行緒複用實現小結

執行緒池中執行緒的複用是通過阻塞實現的,當執行緒池中執行緒啟動後,就會進入迴圈不斷從阻塞佇列中獲取任務,當阻塞佇列中沒有任務時,就會在poll( )或take( )方法陷入阻塞。若有執行緒等待超時,getTask( )會返回null,進入執行緒清理工作。

根據這個思路,我們也可以通過wait( )和notifyAll( )實現複用的功能。

三、ThreadPoolExecutor四種預設的拒絕策略

3.1 AbortPolicy

  1 public static class AbortPolicy implements RejectedExecutionHandler {
  2 	public AbortPolicy() { }
  3 
  4 	public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  5 		throw new RejectedExecutionException("Task " + r.toString() +
  6 											 " rejected from " +
  7 											 e.toString());
  8 	}
  9 }
AbortPolicy

AbortPolicy是丟棄任務,丟擲丟擲執行時 RejectedExecutionException;

  1 ExecutorService executor = new ThreadPoolExecutor(1, 1, 20, TimeUnit.SECONDS,
  2                 new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
  3 
  4         for (int i = 0;i < 10;i++)
  5             executor.execute(new PolicyTest());//sleep 5 seconds
  6 
  7 result:Exception in thread "main" java.util.concurrent.RejectedExecutionException
AbortPolicyTest

3.2 CallerRunsPolicy

  1 public static class CallerRunsPolicy implements RejectedExecutionHandler {
  2 	public CallerRunsPolicy() { }
  3 
  4 	public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  5 		if (!e.isShutdown()) {
  6 			r.run();
  7 		}
  8 	}
  9 }
CallerRunsPolicy

CallerRunsPolicy是直接在建立執行緒池的執行緒中執行被拒絕的任務,如果執行緒池已經關閉,則直接丟棄任務;

  1 ExecutorService executor = new ThreadPoolExecutor(1, 1, 2, TimeUnit.SECONDS,
  2 			new ArrayBlockingQueue<>(1), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
  3 
  4 	for (int i = 0; i < 3; i++) {
  5 		executor.execute(new PolicyTest());//print the name executed thread
  6 	}
  7 
  8 	executor.shutdown();
  9 	TimeUnit.SECONDS.sleep(10);
 10 
 11 	System.out.println("The ThreadPoolExecutor was shutdown");
 12 	executor.execute(new PolicyTest());
 13 
 14 result:I was executed by main
 15        I was executed by pool-1-thread-1
 16        I was executed by pool-1-thread-1
 17        The ThreadPoolExecutor was shutdown
CallerRunsPolicy Test

3.3 DiscardOldestPolicy

  1 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  2 	public DiscardOldestPolicy() { }
  3 
  4 	public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  5 		if (!e.isShutdown()) {
  6 			e.getQueue().poll();
  7 			e.execute(r);
  8 		}
  9 	}
 10 }
DiscardOldestPolicy

DiscardOldestPolicy是放棄最舊的未處理請求,然後重試execute,如果執行緒池已關閉,則丟棄該任務;

  1 public static void main(String[] args) throws InterruptedException {
  2 	ExecutorService executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS,
  3 			new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());
  4 
  5 	for (int i = 0; i < 3; i++) {
  6 		executor.execute(new PolicyTest());//列印任務建立的時間和執行的執行緒 
  7 		TimeUnit.SECONDS.sleep(1);
  8 	}
  9 
 10 	System.out.println("Runnables in queue:");
 11 	BlockingQueue<Runnable> queue = ((ThreadPoolExecutor) executor).getQueue();
 12 	for (Runnable r: queue){
 13 		System.out.println(r.toString());//列印任務建立時間
 14 	}
 15 
 16 	TimeUnit.SECONDS.sleep(1);
 17 	System.out.println("-------execute a new runnable-------");
 18 	executor.execute(new PolicyTest());
 19 
 20 	System.out.println("New runnables in queue:");
 21 	BlockingQueue<Runnable> queue2 = ((ThreadPoolExecutor) executor).getQueue();
 22 	for (Runnable r: queue2){
 23 		System.out.println(r.toString());
 24 	}
 25 }
 26 
 27 result: I was created at 15:57:04.667 and execute by pool-1-thread-1
 28         Runnables in queue:
 29         I was created at 15:57:05.669
 30         I was created at 15:57:06.669
 31 -------execute a new runnable-------
 32         New runnables in queue:
 33         I was created at 15:57:06.669
 34         I was created at 15:57:08.672
 35         I was created at 15:57:06.669 and execute by pool-1-thread-1
 36         I was created at 15:57:08.672 and execute by pool-1-thread-1
 37 //執行新任務後,最先建立的任務被刪除,新任務重新被execute
DiscardOldestPolicyTest

3.4 DiscardPolicy

預設情況下它將丟棄被拒絕的任務。

  1 public static class DiscardPolicy implements RejectedExecutionHandler {
  2 	public DiscardPolicy() { }
  3 
  4 	public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  5 	}
  6 }
DiscardPolicy

四、關閉執行緒池

4.1 ctl變數

執行緒池用ctl變量表示了兩部分資訊,執行緒池的狀態和當前執行緒數量。

  1    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  2     private static final int COUNT_BITS = Integer.SIZE - 3;
  3     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
  4 
  5     // runState is stored in the high-order bits
  6     private static final int RUNNING    = -1 << COUNT_BITS;
  7     private static final int SHUTDOWN   =  0 << COUNT_BITS;
  8     private static final int STOP       =  1 << COUNT_BITS;
  9     private static final int TIDYING    =  2 << COUNT_BITS;
 10     private static final int TERMINATED =  3 << COUNT_BITS;
 11 
 12     // Packing and unpacking ctl
 13     private static int runStateOf(int c)     { return c & ~CAPACITY; }
 14     private static int workerCountOf(int c)  { return c & CAPACITY; }
 15     private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl

通過位運算,執行緒池將一個32位的int型別的數的高3位用於表示執行緒池狀態,低29位用於表示當前執行緒數量。因此執行緒池執行緒最大值為2^29-1,即Capacity;而-1、0、1、2、3分別左移29位為111,000,001,010,011(後29位都是0),用來表示5種狀態。

runStateOf( )方法通過將capacity取反(高三位全為1,低29位全為0),進行&操作就可以獲得高3位;workerCountOf( )直接&,就可以保留下低29位;ctlOf( )將runState | workerCount就可以將兩部分合並。

4.2 執行緒池狀態

在追蹤任務執行流程中,幾個主要方法裡存在大量的狀態判斷,執行緒池有五個狀態,不同狀態下的工作許可權不同。

  • RUNNING:執行緒池建立就處於RUNNING狀態,該狀態下能夠接收新任務,以及對已新增的任務進行處理;
  • SHUTDOWN:如果呼叫了shutdown( )方法,則執行緒池處於SHUTDOWN狀態,該狀態不接收新任務,但能處理已新增的任務;
  1 public static void main(String[] args) throws InterruptedException {
  2 	ExecutorService executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS,
  3 			new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
  4 
  5 	for (int i = 0;i < 3;i++) {
  6 		executor.execute(new PolicyTest());//列印任務開始標誌(時間和執行執行緒),sleep2秒,列印任務完成情況
  7 		TimeUnit.SECONDS.sleep(1);
  8 	}
  9 	BlockingQueue<Runnable> queue = ((ThreadPoolExecutor) executor).getQueue();
 10 	for (Runnable r: queue)
 11 		System.out.println(r);
 12 
 13 	System.out.println(executor.isShutdown());
 14 	executor.shutdown();
 15 	System.out.println(executor.isShutdown());
 16 
 17 	TimeUnit.SECONDS.sleep(1);
 18 	executor.execute(new PolicyTest());
 19 }
 20 
 21 result:I was created at 10:13:03.371 and execute by pool-1-thread-1
 22 		I was created at 10:13:04.371
 23 		I was created at 10:13:05.371
 24 		false
 25 		true
 26 		Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task I was created at 10:13:07.373
 27 		I was created at 10:13:03.371 and finish by pool-1-thread-1
 28 		I was created at 10:13:04.371 and execute by pool-1-thread-1
 29 		I was created at 10:13:04.371 and finish by pool-1-thread-1
 30 		I was created at 10:13:05.371 and execute by pool-1-thread-1
 31 		I was created at 10:13:05.371 and finish by pool-1-thread-1
shutdownTest

以上例,雖然已經執行了shutdown( )方法,isShutdown( )返回了true,但是執行緒池並沒有立即停止,而是繼續執行完正在執行和佇列中已存在任務,直到任務完成以後才退出,不過shutdown( )以後對新提交的任務執行了拒絕策略。

  • STOP:如果呼叫shutdownNow( )方法,則執行緒池處與於STOP狀態,此狀態不接收新任務,不處理已新增的任務,並且會嘗試中斷正在處理的任務;
  1 //以上例,將shutdown( )換成shutdownNow( ),列印任務佇列中元素,並列印shutdownNow返回值
  2 result:I was created at 10:40:32.554 and execute by pool-1-thread-1
  3 		任務佇列中任務:
  4 		I was created at 10:40:33.555
  5 		I was created at 10:40:34.555
  6 		false
  7 		shutdownNow返回刪除的任務:
  8 		I was created at 10:40:33.555
  9 		I was created at 10:40:34.555
 10 		true
 11 		Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task I was created at 10:40:36.558
shutdownNowTest

將上例shutdown( )換成shutdownNow( ) ,執行緒池立即退出,正在執行的任務被中斷,任務佇列中的任務被刪除,新提交的任務執行了拒絕策略。

  • TIDYING:當所有的任務已終止,ctl記錄的”任務數量”為0,執行緒池會變為TIDYING狀態,TIDYING狀態時,會執行鉤子函式terminated( );
  • TERMINATED:當執行緒池處於SHUTDOWN或STOP狀態,並且所有工作執行緒已經銷燬,任務快取佇列已經清空或執行結束後,執行緒池被設定為TERMINATED狀態。

4.3 shutdown和shutdownNow方法

上面測試了呼叫shutdown( )和shutdownNow( )方法執行緒池對任務會有不同的處理方式,我們從原始碼繼續剖析。

  1 public void shutdown() {
  2 	final ReentrantLock mainLock = this.mainLock;
  3 	mainLock.lock();
  4 	try {
  5 		checkShutdownAccess();
  6 		//設定狀態
  7 		advanceRunState(SHUTDOWN);
  8 		interruptIdleWorkers();
  9 		onShutdown(); // hook for ScheduledThreadPoolExecutor
 10 	} finally {
 11 		mainLock.unlock();
 12 	}
 13 	//嘗試終止執行緒池
 14 	tryTerminate();
 15 }
 16 
 17 public List<Runnable> shutdownNow() {
 18 	List<Runnable> tasks;
 19 	final ReentrantLock mainLock = this.mainLock;
 20 	mainLock.lock();
 21 	try {
 22 		checkShutdownAccess();
 23 		advanceRunState(STOP);
 24 		interruptWorkers();
 25 		//複製任務到List
 26 		tasks = drainQueue();
 27 	} finally {
 28 		mainLock.unlock();
 29 	}
 30 	tryTerminate();
 31 	return tasks;
 32 }
shutdown and shutdownNow

從原始碼看兩者主要步驟有設定狀態,interrupt執行緒,嘗試終結執行緒池。shutdown( )將狀態設為SHUTDOWN,shutdownNow( )將狀態設定為STOP(advanceRunState( )方法如果當前狀態大於你要設定的狀態則設定失敗,因為執行緒池狀態表示是遞增的,如果當前狀態數值更大,說明已經經歷過此狀態),兩者主要不同在於分別使用了interruptidleWorkers( )和interruptWorkers( )方法interrupt執行緒,最後兩者都嘗試終結執行緒池。這兩個方法均不會阻塞,直接返回,同時它們都是通過interrupt( )中斷執行緒,而interrupt並不是真正意義上的中斷執行緒,它只是設定了一箇中斷標誌(訊號),只有當執行緒對該標誌進行響應(Thread.interrupted( )、Thread.isInterrupted( ))才會退出任務,對於阻塞的執行緒,呼叫中斷時,執行緒將會立刻退出阻塞狀態並丟擲 InterruptedException 異常。因此接收不到interrupt訊號或沒對收到訊號進行處理的任務,可能永遠也不會結束。

  1 public static void main(String[] args) throws InterruptedException {
  2         ExecutorService executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS,
  3                 new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
  4 
  5         executor.execute(new NoneInterruptFlag());
  6         System.out.println(((ThreadPoolExecutor) executor).getActiveCount() + " active count");
  7         executor.shutdownNow();
  8         TimeUnit.SECONDS.sleep(1);
  9         System.out.println(executor.isShutdown());
 10         System.out.println(((ThreadPoolExecutor) executor).getActiveCount() + " active count");
 11     }
 12 
 13 static class NoneInterruptFlag implements Runnable {
 14 
 15 	@Override
 16 	public void run() {
 17 		while (true) {
 18 			//do something
 19 		}
 20 	}
 21 }
NoneInterruptTask

上面程式,即使呼叫shutdown( )也無法終止執行緒,interrupt只是發出了一箇中斷訊號,而run方法中沒有接收這個訊號,也就無法終止。或者如下丟擲了異常,但並沒有進行處理,仍會繼續執行。

  1 @Override
  2 public void run() {
  3    while (true) {
  4 	try {
  5            TimeUnit.SECONDS.sleep(10);
  6            System.out.println("I am still running!");
  7 	} catch (InterruptedException e) {
  8 	   e.printStackTrace();
  9 	}
 10     }
 11 }
notDealInterruptException

4.4 interruptIdleWorkers和interruptWorkers方法

  1 private void interruptIdleWorkers() {
  2 	interruptIdleWorkers(false);
  3 }
  4 
  5 private void interruptIdleWorkers(boolean onlyOne) {
  6 	final ReentrantLock mainLock = this.mainLock;
  7 	mainLock.lock();
  8 	try {
  9 		for (Worker w : workers) {
 10 			Thread t = w.thread;
 11 			//對執行緒嘗試獲取鎖
 12 			if (!t.isInterrupted() && w.tryLock()) {
 13 				try {
 14 					t.interrupt();
 15 				} catch (SecurityException ignore) {
 16 				} finally {
 17 					w.unlock();
 18 				}
 19 			}
 20 			if (onlyOne)
 21 				break;
 22 		}
 23 	} finally {
 24 		mainLock.unlock();
 25 	}
 26 }
interruptIdleWorkers

interruptIdleWorkers對所有執行緒進行遍歷時,進行了tryLock( )操作,只有加鎖成功的執行緒才會執行interrupt( )。什麼情況執行緒才能被加鎖呢?回到runWorker( ),執行緒在執行任務之前有w.lock( )操作,因此只有沒有執行任務的執行緒會tryLock( )成功,執行interrupt( )。

  1 private void interruptWorkers() {
  2 	final ReentrantLock mainLock = this.mainLock;
  3 	mainLock.lock();
  4 	try {
  5 		for (Worker w : workers)
  6 			w.interruptIfStarted();
  7 	} finally {
  8 		mainLock.unlock();
  9 	}
 10 }
 11 
 12 
 13 void interruptIfStarted() {
 14 	Thread t;
 15 	if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
 16 		try {
 17 			t.interrupt();
 18 		} catch (SecurityException ignore) {
 19 		}
 20 	}
 21 }
interruptWorkers

interruptWorkers( )呼叫了Worker類中的interruptIfStarted( )方法,只有Worker的state大於等於0時才會執行interrupt( )。Worker實現了AQS類,狀態 1 表示鎖被佔用,0 表示鎖被釋放,因此大於等於 0 就代表了所有的執行緒,即執行緒池內全部執行緒都會執行interrupt( )。

4.5 tryTerminate方法

處理完執行緒池內執行緒,還需要處理任務佇列。shutdownNow( )呼叫了drainQueue( )方法將佇列中的元素複製到List,刪除佇列中的元素。

  1 tasks = drainQueue();
  2 
  3 private List<Runnable> drainQueue() {
  4 	BlockingQueue<Runnable> q = workQueue;
  5 	List<Runnable> taskList = new ArrayList<Runnable>();
  6 	q.drainTo(taskList);
  7 	//若q中還有元素表示drainTo出錯,將出錯元素重新加入list
  8 	if (!q.isEmpty()) {
  9 		for (Runnable r : q.toArray(new Runnable[0])) {
 10 			if (q.remove(r))
 11 				taskList.add(r);
 12 		}
 13 	}
 14 	return taskList;
 15 }
 16 
 17 /*
 18  *移除此佇列中所有可用的元素,並將它們新增到給定 collection 中。
 19  *在試圖向 collection c 中新增元素沒有成功時,可能導致在丟擲相關異常時,
 20  *元素會同時在兩個 collection 中出現,或者在其中一個 collection 中出現,
 21  *也可能在兩個 collection 中都不出現。如果試圖將一個佇列放入自身佇列中,
 22  *則會導致 IllegalArgumentException 異常。此外,如果正在進行此操作時修
 23  *改指定的 collection,則此操作行為是不確定的。
 24  */
 25 public int drainTo(Collection<? super E> c) {
 26 	checkNotNull(c);
 27 	if (c == this)
 28 		throw new IllegalArgumentException();
 29 	final Object[] items = this.items;
 30 	final ReentrantLock lock = this.lock;
 31 	lock.lock();
 32 	try {
 33 		int i = takeIndex;
 34 		int n = 0;
 35 		int max = count;
 36 		while (n < max) {
 37 			c.add(this.<E>cast(items[i]));
 38 			items[i] = null;
 39 			i = inc(i);
 40 			++n;
 41 		}
 42 		if (n > 0) {
 43 			count = 0;
 44 			putIndex = 0;
 45 			takeIndex = 0;
 46 			notFull.signalAll();
 47 		}
 48 		return n;
 49 	} finally {
 50 		lock.unlock();
 51 	}
 52 }
drainQueue

shutdown( )沒有處理佇列,而是在執行tryTerminate( )方法時進行了判斷。文件註釋為:在以下情況將執行緒池變為TERMINATED終止狀態,shutdown 且 正在執行的worker 和 workQueue佇列都empty, stop 且沒有正在執行的worker。 這個方法必須在任何可能導致執行緒池終止的情況下被呼叫,如: 減少worker數量;shutdown時從queue中移除任務。

  1 final void tryTerminate() {
  2 	for (;;) {
  3 		int c = ctl.get();
  4 
  5 		//如果是執行緒池正running或正在關閉、已經關閉(tidying、terminated)
  6 		//或處於shutdown狀態且任務佇列非空,直接返回
  7 		if (isRunning(c) ||
  8 			runStateAtLeast(c, TIDYING) ||
  9 			(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
 10 			return;
 11 
 12 		//此時執行緒池處於stop 或 shutdown且佇列為空 的狀態
 13 		//如果還有執行緒,就嘗試中斷一個執行緒,返回
 14 		if (workerCountOf(c) != 0) { // Eligible to terminate
 15 			interruptIdleWorkers(ONLY_ONE);
 16 			return;
 17 		}
 18 
 19 		final ReentrantLock mainLock = this.mainLock;
 20 		mainLock.lock();
 21 		try {
 22 			if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
 23 				try {
 24 					terminated();
 25 				} finally {
 26 					ctl.set(ctlOf(TERMINATED, 0));
 27 
 28 					//喚醒等待執行緒池終止的執行緒,awaitTermination( )
 29 					termination.signalAll();
 30 				}
 31 				return;
 32 			}
 33 		} finally {
 34 			mainLock.unlock();
 35 		}
 36 		// else retry on failed CAS
 37 	}
 38 }
 39 
tryTerminate

4.6 awaitTermination方法

  1 public boolean awaitTermination(long timeout, TimeUnit unit)
  2 	throws InterruptedException {
  3 	//將超時時間轉化為納秒單位
  4 	long nanos = unit.toNanos(timeout);
  5 	final ReentrantLock mainLock = this.mainLock;
  6 	mainLock.lock();
  7 	try {
  8 		for (;;) {
  9 			//若狀態是terminated,返回true
 10 			if (runStateAtLeast(ctl.get(), TERMINATED))
 11 				return true;
 12 			//超時返回false
 13 			if (nanos <= 0)
 14 				return false;
 15 			//實現阻塞
 16 			nanos = termination.awaitNanos(nanos);
 17 		}
 18 	} finally {
 19 		mainLock.unlock();
 20 	}
 21 }
awaitTermination

上面說到shutdown( )和shutdownNow( )方法都是不阻塞方法,awaitTermination( )也用於終結執行緒池,它是一個阻塞方法。該方法並沒有對執行緒池中執行緒或狀態做任何修改,僅僅是在超時後,返回一個結果。

問題一,如果在該方法阻塞期間,另一個執行緒shutdown了執行緒池,該方法還會繼續阻塞嗎?

  1 public static void main(String[] args) throws InterruptedException {
  2 	ExecutorService executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS,
  3 			new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
  4 
  5 	executor.execute(new PolicyTest());//sleep 10 seconds
  6 
  7 	new Thread(() -> {
  8 		try {
  9 			//保證awaitTermination已阻塞
 10 			TimeUnit.SECONDS.sleep(1);
 11 			executor.shutdownNow();
 12 		} catch (InterruptedException e) {
 13 			e.printStackTrace();
 14 		}
 15 	}).start();
 16 
 17 	System.out.println("awaitTermination已阻塞");
 18 	boolean b = executor.awaitTermination(20, TimeUnit.SECONDS);
 19 	System.out.println(b);
 20 	System.out.println(executor.isTerminated());
 21 }
 22 
 23 result:I was created at 15:37:14.897 and execute by pool-1-thread-1
 24 		awaitTermination已阻塞
 25 		true
 26 		true
awaitTerminationTest1結果該方法立刻退出了阻塞,在分析tryTerminate( )方法時,出現了termination.signalAll()呼叫,該方法用於喚醒等待執行緒池終止的執行緒,也就是呼叫了awaitTermination( )方法的執行緒。

問題二,有沒有可能執行緒池已經退出,awaitTermination( )依然返回false?

  1 public static void main(String[] args) throws InterruptedException {
  2 	ExecutorService executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS,
  3 			new ArrayBlockingQueue<>(2), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
  4 
  5 	((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true);
  6 	executor.execute(new PolicyTest());//sleep 3 seconds
  7 
  8 	boolean b = executor.awaitTermination(20, TimeUnit.SECONDS);
  9 	System.out.println(b);
 10 	System.out.println(executor.isTerminated());
 11 }
 12 
 13 result:I was created at 15:26:14.046 and execute by pool-1-thread-1
 14        I was created at 15:26:14.046 and finish by pool-1-thread-1
 15        false
awaitTerminationTest2

結果執行緒池實際已經退出,但awaitTermination( )方法還在阻塞且最後返回false。awaitTermination( )方法是在指定時間內,執行緒池為terminated狀態返回true,而到terminated狀態必定呼叫了tryterminate( )且成功設定了terminated狀態。但除此之外,當執行緒池沒有核心執行緒或允許核心執行緒超時,執行緒池最終會通過processWorkerExit( )方法結束所有執行緒,執行緒池直接從running狀態退出。

4.7 優雅關閉執行緒池

如果直接粗暴關掉執行緒池,可能正在執行的任務就突然丟失了;但有一些任務可能耗費時間太長,不可能一直等待。比較shutdown( )和shutdownNow( ),前者比較溫柔,後者就比較簡單粗暴了,此時可以將三個方法一起用,先進行shutdwon( ),如果在指定時間內還沒有結束,才呼叫shutdownNow( )。

  1 executor.shutdown();
  2 try {
  3 	if (!executor.awaitTermination(10, TimeUnit.MINUTES))
  4 		executor.shutdownNow();
  5 } catch (InterruptedException e) {
  6 	executor.shutdownNow();
  7 }
elegantEnd

如果遇到類似上面while(true)的情況關閉不了,也可以通過將執行緒設定為守護執行緒,當然肯定要避免這樣的情況。