Java執行緒池的實現原理
原理概述:
其實java 的執行緒池的實現原理很簡單,說白了就是一個執行緒集合workerSet和一個阻塞佇列workerQueue。當用戶向執行緒池提交一個任務(也就是一個執行緒)時,執行緒池會把任務放到workqueue中。
workerSet中的執行緒會不斷的從workqueue中獲取執行緒然後執行。當workQueue中沒有任務的時候,worker就會阻塞,直到佇列中有任務就取出來繼續執行。
執行緒池的幾個主要引數的作用
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolsize:規定了執行緒池有幾個執行緒(worker)在執行
maximumPoolSize:當wokerQueue滿了,不能繼續新增任務的時候,這個引數才會生效。規定了執行緒池最多隻能有多少個執行緒(woker)在執行。
keepAliveTime:超出corePoolSize大小的那些執行緒的生存時間,這些執行緒如果長時間沒有執行任務並且超過了keepAliveTime設定的時間,就會消亡。
unit:生存時間對於的單位。
workQueue:存放任務的佇列。
threaFactory:建立執行緒的工廠。
handler:當workQueue已經滿了,並且執行緒池的執行緒數也已經達到maximumPoolSize,將執行拒絕策略。
任務提交後的流程分析
使用者通過submit提交一個任務。執行緒池會執行如下流程:
1.判斷當前執行的woker數量是否超過corePoolSize,如果不超過corePoolSize。就建立一個worker直接執行該任務。--執行緒池最開始時沒有worker在執行的。
2.如果正在執行的worker數量超過或者等於corePoolSize,那麼就將該任務加入到workQueue佇列中去。
3.如果workQueue佇列滿了,也就是offer方法返回false的話,就檢查當前執行的worker數量是否小於maximumPoolSize,如果小於就建立一個worker直接執行該任務。
4.如果當前執行的worker數量是否大於等於maximumPoolSize那麼就執行RejectedExecutionHandler i拒絕這個任務的提交。
話不說多,直接原始碼解析:
我們先來看一下ThreadPoolExecution中的幾個關鍵屬性。
//這個屬性是用來存放 當前執行的worker數量以及執行緒池狀態的 //int是32位的,這裡把int的高3位拿來充當執行緒池狀態的標誌位,後29位拿來充當當前執行worker的數量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //存放任務的阻塞佇列 private final BlockingQueue<Runnable> workQueue; //worker的集合,用set來存放 private final HashSet<Worker> workers = new HashSet<Worker>(); //歷史達到的worker數最大值 private int largestPoolSize; //當佇列滿了並且worker的數量達到maxSize的時候,執行具體的拒絕策略 private volatile RejectedExecutionHandler handler; //超出coreSize的worker的生存時間 private volatile long keepAliveTime; //常駐worker的數量 private volatile int corePoolSize; //最大worker的數量,一般當workQueue滿了才會用到這個引數 private volatile int maximumPoolSize;
1.提交任務相關原始碼
下面是execute方法的原始碼:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //workerCountOf(c)會獲取當前正在執行的worker數量 if (workerCountOf(c) < corePoolSize) { //如果workerCount小於corePoolSize,就建立一個worker然後直接執行該任務 if (addWorker(command, true)) return; c = ctl.get(); } //isRunning(c)是判斷執行緒池是否在執行中,如果執行緒池被關閉了就不會再接受任務 //後面將任務加入到佇列中 if (isRunning(c) && workQueue.offer(command)) { //如果新增到佇列成功了,會再檢查一次執行緒池的狀態 int recheck = ctl.get(); //如果執行緒池關閉了,就將剛才新增的任務從佇列中移除 if (! isRunning(recheck) && remove(command)) //執行拒絕策略 reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //如果加入佇列失敗,就嘗試直接建立worker來執行任務 else if (!addWorker(command, false)) //如果建立worker失敗,就執行拒絕策略 reject(command); }
新增worker的方法addworker原始碼:
private boolean addWorker(Runnable firstTask, boolean core) { retry: //使用自旋+cas失敗重試來保證執行緒競爭問題 for (;;) { //先獲取執行緒池的狀態 int c = ctl.get(); int rs = runStateOf(c); // 如果執行緒池是關閉的,或者workQueue佇列非空,就直接返回false,不做任何處理 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //根據入參core 來判斷可以建立的worker數量是否達到上限,如果達到上限了就拒絕建立worker if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //沒有的話就嘗試修改ctl新增workerCount的值。這裡用了cas操作,如果失敗了下一個迴圈會繼續重試,直到設定成功 if (compareAndIncrementWorkerCount(c)) //如果設定成功了就跳出外層的那個for迴圈 break retry; //重讀一次ctl,判斷如果執行緒池的狀態改變了,會再重新迴圈一次 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; //建立一個worker,將提交上來的任務直接交給worker w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //加鎖,防止競爭 mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); //還是判斷執行緒池的狀態 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //如果worker的執行緒已經啟動了,會丟擲異常 if (t.isAlive()) throw new IllegalThreadStateException(); //新增新建的worker到執行緒池中 workers.add(w); int s = workers.size(); //更新歷史worker數量的最大值 if (s > largestPoolSize) largestPoolSize = s; //設定新增標誌位 workerAdded = true; } } finally { mainLock.unlock(); } //如果worker是新增的,就啟動該執行緒 if (workerAdded) { t.start(); //成功啟動了執行緒,設定對應的標誌位 workerStarted = true; } } } finally { //如果啟動失敗了,會觸發執行相應的方法 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
2.worker的結構
worker是ThreadPoolExector內部定義的一個內部類。我們先看一下Worker的繼承關係。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
它實現了Runnable介面,所以可以拿來當執行緒用,同時它還繼承了AbstractQueueSynchronizer同步器類,主要用來實現一個不可重入的鎖。
一些屬性還有構造方法:
//執行的執行緒,前面addWorker方法中就是直接通過啟動這個執行緒來啟動這個worker final Thread thread; //當一個worker剛建立的時候,就先嚐試執行這個任務 Runnable firstTask; //記錄完成任務的數量 volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //建立一個Thread,將自己設定給他,後面這個thread啟動的時候,也就是執行worker的run方法 this.thread = getThreadFactory().newThread(this); }
worker的run方法:
public void run() { //這裡呼叫了ThreadPoolExecutor的runWorker方法 runWorker(this); }
ThreadPoolExecutor的runWorker方法
final void runWorker(Worker w) { //獲取當前執行緒 Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; //執行unlock方法,允許其他執行緒來中斷自己 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //如果前面的firstTask有值,就直接執行這個任務 //如果沒有具體的任務,就執行getTask()方法從佇列中獲取任務 //這裡會不斷執行迴圈體,除非執行緒中斷或者getTask()返回null才會跳出這個迴圈 while (task != null || (task = getTask()) != null) { //執行任務前先鎖住,這裡主要的作用就是給shutdown方法判斷worker是否在執行中的 //shutdown方法裡面會嘗試給這個執行緒加鎖,如果這個執行緒在執行,就不會中斷它 w.lock(); //判斷執行緒池狀態,如果執行緒池被強制關閉了,就馬上退出 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //執行任務前呼叫。預留的方法,可擴充套件 beforeExecute(wt, task); Throwable thrown = null; try { //真正的執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //執行任務後呼叫。預留的方法,可擴充套件 afterExecute(task, thrown); } } finally { task = null; //記錄完成的任務數量 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
下面來看一下getTask ()方法,這裡面設計到keepAliveTime的使用,從這個方法我們可以看出執行緒池是怎麼讓超過corePoolSize的那部分worker銷燬的。
private Runnable getTask() { boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果執行緒池已經關閉了,就直接返回null, //如果這裡返回null,呼叫的那個worker就會跳出while迴圈,然後執行完銷燬執行緒 //SHUTDOWN狀態表示執行了shutdown()方法 //STOP表示執行了shutdownNow()方法 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } //獲取當前正在執行中的worker數量 int wc = workerCountOf(c); // 如果設定了核心worker也會超時或者當前正在執行的worker數量超過了corePoolSize,就要根據時間判斷是否要銷燬執行緒了 //其實就是從佇列獲取任務的時候要不要設定超時間時間,如果超過這個時間佇列還沒有任務進來,就會返回null boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //如果上一次迴圈從佇列獲取到的未null,這時候timedOut就會為true了 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { //通過cas來設定WorkerCount,如果多個執行緒競爭,只有一個可以設定成功 //最後如果沒設定成功,就進入下一次迴圈,說不定下一次worker的數量就沒有超過corePoolSize了,也就不用銷燬worker了 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //如果要設定超時時間,就設定一下咯 //過了這個keepAliveTime時間還沒有任務進佇列就會返回null,那worker就會銷燬 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //如果r為null,就設定timedOut為true timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
3.新增Callable任務的實現原始碼
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
要新增一個有返回值的任務的實現也很簡單。
其實就是對任務做了一層封裝,將其封裝成Future,然後提交給執行緒池執行,最後返回這個future。
這裡的newTaskFor(task)方法會將封裝成一個FutureTask類。
外部的執行緒拿到這個future,執行get()方法的時候,如果任務本身沒有執行完,執行執行緒就會被阻塞,直到任務執行完,
下面是FutureTask的get()方法
public V get() throws InterruptedException, ExecutionException { int s = state; //判斷狀態,如果任務還沒執行完,就進入休眠,等待喚醒 if (s <= COMPLETING) s = awaitDone(false, 0L); //返回值 return report(s); }
FutureTask中通過一個state狀態來判斷任務是否完成,當run方法執行完後,會將state狀態置為完成,同時喚醒所有正在等待的執行緒,我們可以看一下FutureTask的run方法。
public void run() { //判斷執行緒的狀態 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //執行call方法 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) //這個方法裡面會設定返回內容,並且喚醒所以等待中的執行緒 set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
4.shutdown和shutdownnow方法的實現
shutdown方法會將執行緒池的狀態設定為SHUTDOWN,執行緒池進入到這個狀態後,就拒絕再接受任務,然後會將剩餘的任務全部執行完。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //檢查是否可以關閉執行緒 checkShutdownAccess(); //設定執行緒池狀態 advanceRunState(SHUTDOWN); //嘗試中斷worker interruptIdleWorkers(); //預留方法,留給子類實現 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //遍歷所有的worker for (Worker w : workers) { Thread t = w.thread; //先嚐試呼叫w.tryLock(),如果獲取到鎖,就說明worker是空閒的,就可以直接中斷它 //注意的是,worker自己本身實現了AQS同步框架,然後實現的類似鎖的功能 //它實現的鎖是不可重入的,所以如果worker在執行任務的時候,會先進行加鎖,這裡tryLock()就會返回false if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
shutdownNow做的比較絕,它先將執行緒池狀態設定為STOP,然後拒絕所有提交的任務,最後中斷左右正在執行的worker,然後清空任務佇列。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //檢測許可權 advanceRunState(STOP); //中斷所有的worker interruptWorkers(); //清空任務佇列 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //遍歷所有worker,然後呼叫中斷方法 for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }