1. 程式人生 > 其它 >執行緒池基本使用及原理分析

執行緒池基本使用及原理分析

# 執行緒池基本使用及原理分析 從原始碼入手,分析執行緒池的基本使用場景以及核心程式碼原理分析。分析版本:jdk1.8 @[toc] ## 執行緒池介紹 > In [computer programming](https://encyclopedia.thefreedictionary.com/Computer+programming), a **thread pool** is a software design pattern for achieving [concurrency](https://encyclopedia.thefreedictionary.com/Concurrency+(computer+science)) of execution in a computer program. Often also called a **replicated workers** or **worker-crew model**,[[1\]](https://encyclopedia.thefreedictionary.com/Thread+pool+pattern#cite_note-1) a thread pool maintains multiple [threads](https://encyclopedia.thefreedictionary.com/Thread+(computer+science)) waiting for [tasks](https://encyclopedia.thefreedictionary.com/Task+(computers)) to be allocated for [concurrent](https://encyclopedia.thefreedictionary.com/Concurrent+computing) execution by the supervising program. By maintaining a pool of threads, the model increases performance and avoids latency in execution due to frequent creation and destruction of threads for short-lived tasks.[[2\]](https://encyclopedia.thefreedictionary.com/Thread+pool+pattern#cite_note-2) The number of available threads is tuned to the computing resources available to the program, such as [parallel](https://encyclopedia.thefreedictionary.com/Parallel+computing) [processors](https://encyclopedia.thefreedictionary.com/Central+processing+unit), cores, memory, and network sockets.[[3\]](https://encyclopedia.thefreedictionary.com/Thread+pool+pattern#cite_note-3) > > -- 摘要自維基百科 > > **執行緒池**(英語:thread pool):一種[執行緒](https://baike.baidu.com/item/%E7%BA%BF%E7%A8%8B)使用模式。執行緒過多會帶來排程開銷,進而影響快取區域性性和整體效能。而執行緒池維護著多個執行緒,等待著監督管理者分配可併發執行的任務。這避免了在處理短時間任務時建立與銷燬執行緒的代價。執行緒池不僅能夠保證核心的充分利用,還能防止過分排程。可用執行緒數量應該取決於可用的併發處理器、處理器核心、記憶體、網路sockets等的數量。 > > -- 摘要自百度百科 介紹一般都能理解,**池**這個概念,在平常的開發中也非常常見,例如資料庫的連線池這些。主要還是為了方便管理和控制開銷。 ## 基本使用 ```java @Test public void contextLoads() throws IOException { //建立一個固定大小的執行緒池 ExecutorService executorService = Executors.newFixedThreadPool(10); //執行執行緒 executorService.execute(() -> { System.out.println("running thread " + Thread.currentThread()); }); //關閉執行緒池 executorService.shutdown(); System.in.read(); } ``` 上面的程式碼,就是一個簡單的執行緒池使用方法。Executors是JUC提供的一個簡單使用執行緒池的工具類,為不是很瞭解執行緒池的開發者提供了一層封裝。 上面的程式碼是建立了一個FixedThreadPool。在Executors中還提供了一些其他的執行緒池的建立方法。 > ### FixedThreadPool > > 建立一個固定大小的執行緒池 > > ```java > public static ExecutorService newFixedThreadPool(int nThreads) { > return new ThreadPoolExecutor(nThreads, nThreads, > 0L, TimeUnit.MILLISECONDS, > new LinkedBlockingQueue()); > } > ``` > > ### CachedThreadPool > > 這個執行緒池核心執行緒數為0,在執行執行緒時會優先判斷是否有可用執行緒,如果有則使用,如果沒有則建立。60秒後,執行緒被釋放(下面的案例中)。 > > ```java > public static ExecutorService newCachedThreadPool() { > return new ThreadPoolExecutor(0, Integer.MAX_VALUE, > 60L, TimeUnit.SECONDS, > new SynchronousQueue()); > } > ``` > > ### SingleThreadExecutor > > 建立一個只包含單個執行緒的執行緒池。 > > ```java > public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { > return new FinalizableDelegatedExecutorService > (new ThreadPoolExecutor(1, 1, > 0L, TimeUnit.MILLISECONDS, > new LinkedBlockingQueue(), > threadFactory)); > } > ``` > > ### ScheduledThreadPoolExecutor > > 支援延遲執行的執行緒池。 > > ```java > public ScheduledThreadPoolExecutor(int corePoolSize) { > super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, > new DelayedWorkQueue()); > } > ``` > > 除了以上四種外,Executors中還提供了一些其他的執行緒池有需要可以簡單瞭解一下。 ## 原理分析 ### 執行緒池建立 從上面的幾種執行緒看到,最終核心部分還是在於執行緒池的建立。下面是執行緒池建立的構造方法: ```java /** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:
* {@code corePoolSize < 0}
* {@code keepAliveTime < 0}
* {@code maximumPoolSize <= 0}
* {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } ``` > corePoolSize:核心執行緒數 > > maximumPoolSize:最大執行緒數 > > keepAliveTime:非核心執行緒的存活時間 > > unit:時間的單位 > > workQueue:這個是任務的佇列 > > threadFactory:執行緒工廠類 > > handler:執行拒絕策略時的回撥,類似熔斷的補償 > > 上面的**部分**不同型別的執行緒池實際就是通過調整這些引數實現的。 ### 執行緒執行 #### 流程圖 ![執行緒池執行流程圖](https://imgconvert.csdnimg.cn/aHR0cDovL2Fzc2V0cy5wcm9jZXNzb24uY29tL2NoYXJ0X2ltYWdlLzVmMzI1YzdjNjM3Njg5MzEzYWNhMWE1My5wbmc?x-oss-process=image/format,png) #### execute 下面是執行緒池執行排程執行緒的方法: ```java /** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get();//這裡的ctl是用來儲存執行緒池的執行狀態和執行緒數量,高低位運算的機制,高3位用來存狀態,低29位用來儲存執行緒數量 if (workerCountOf(c) < corePoolSize) {//判斷當前執行的執行緒數是否小於核心執行緒數 if (addWorker(command, true))//新增工作執行緒,true false表示是否核心執行緒 return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) {//判斷執行緒池是否在執行狀態並新增到阻塞佇列 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))//重新校驗一次狀態,如果異常執行拒絕策略 reject(command);//拒絕策略 else if (workerCountOf(recheck) == 0)//執行緒都被釋放,新建一個執行緒 addWorker(null, false); } else if (!addWorker(command, false))//如果不能新增到佇列,嘗試新增一個新的執行緒,如果失敗,執行拒絕策略 reject(command); } ``` > 上面就是執行的大致流程,其實都是一些判斷。具體的執行步驟實際在原始碼中作者已經有了標註: > > ```java > /* > * Proceed in 3 steps: > * > * 1. If fewer than corePoolSize threads are running, try to > * start a new thread with the given command as its first > * task. The call to addWorker atomically checks runState and > * workerCount, and so prevents false alarms that would add > * threads when it shouldn't, by returning false. > * > * 2. If a task can be successfully queued, then we still need > * to double-check whether we should have added a thread > * (because existing ones died since last checking) or that > * the pool shut down since entry into this method. So we > * recheck state and if necessary roll back the enqueuing if > * stopped, or start a new thread if there are none. > * > * 3. If we cannot queue task, then we try to add a new > * thread. If it fails, we know we are shut down or saturated > * and so reject the task. > */ > ``` > > 而執行的核心程式碼,在於addWorker,接下來分析一下addWorker的核心程式碼。 #### addWorker ```java private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) {//其實這段程式碼就是檢查一下狀態和佇列情況,然後cas工作執行緒數 int c = ctl.get(); int rs = runStateOf(c); //檢查執行緒池狀態 //如果執行緒池已經shutdown,不會接受新的任務,但是未執行完的任務還是會繼續執行 // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c))//cas工作執行緒數,成功跳出迴圈 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs)//如果狀態發生變化,continue continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false;//啟動標識 boolean workerAdded = false;//新增成功標識 Worker w = null; try { w = new Worker(firstTask);//這裡的worker實現了Runnable final Thread t = w.thread;//執行worker的執行緒 if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); //檢查執行緒池狀態、檢查firstTask狀態、檢查worker執行緒狀態 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w);//將worker新增到集合 int s = workers.size(); if (s > largestPoolSize)//如果集合長度大於最大執行緒數,更新記錄最大執行緒數 largestPoolSize = s; workerAdded = true;//標識新增成功標識 } } finally { mainLock.unlock(); } if (workerAdded) {//新增成功後,啟動worker中的執行緒 t.start(); workerStarted = true; } } } finally { if (! workerStarted)//如果失敗了,會去執行回滾 addWorkerFailed(w);//執行回滾 } return workerStarted; } ``` addWorker的方法就是做一些判斷校驗,以及將firstTask新增到工作執行緒worker中。實際將邏輯委派給worker來執行,所以還需要了解一下worker的執行邏輯。worker的run方法中是呼叫runWorker(),因此看一下runWorker的方法邏輯。 #### runWorker ```java final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask;//拿到初始化的firstTask w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //這個執行緒就是從task中取任務,如果任務不為空就一直迴圈執行任務 while (task != null || (task = getTask()) != null) {//這裡的getTask是從阻塞佇列中取任務 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt //依然是對一些狀態的判斷 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //這個是模板方法,預設無定義 beforeExecute(wt, task); Throwable thrown = null; try { //直接呼叫task的run方法,而不是通過執行緒啟動的 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //依然模板 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++;//記錄完成任務的數量 w.unlock(); } } completedAbruptly = false; } finally { //銷燬worker processWorkerExit(w, completedAbruptly); } } ``` 分析完runWorker方法後,可以得出,task任務的執行實際就是通過直接呼叫task的run方法來完成的,而在執行完一個任務後,會迴圈從佇列中取任務,在任務都執行完成後,去銷燬worker。這塊邏輯的關注點主要在getTask()方法以及processWorkerExit()方法,也就是從佇列中獲取任務以及任務執行完成後的銷燬。接下來逐步分析一下getTask和processWorkerExit。 #### getTask ```java private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); //判斷執行緒池如果shutdown了並且為stop或者佇列中沒有任務了,則減去workerCount並返回null(返回null後執行緒會退出)。 //可以看到,如果佇列中還有任務,這裡還是會執行完的(除非執行緒池STOP了) // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } //拿到工作執行緒數 int wc = workerCountOf(c); //這裡的timed用來做超時判斷 //allowCoreThreadTimeOut預設為false,表示核心執行緒預設一直保持活動 //如果工作執行緒數已經大於核心執行緒數了,也就是非核心執行緒 // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //如果工作執行緒數已經大於最大執行緒數了或者校驗超時控制(也就是任務超時了) //並且任務佇列已經沒有任務了,則返回null if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //如果需要超時控制,通過poll取任務,否則通過take Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null)//取到任務返回 return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } ``` 分析完getTask方法可以發現,執行緒池如果shutdown了,任務還是會被執行完,但是如果是stop了,則不會繼續執行了。核心執行緒通過take阻塞一直保持(除非設定了核心執行緒超時),非核心執行緒通過poll取任務,超時後會進入銷燬邏輯。最後,在來看下銷燬。 #### processWorkerExit ```java private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //記錄完成數量 completedTaskCount += w.completedTasks; workers.remove(w);//集合中刪除worker } finally { mainLock.unlock(); } //嘗試終止 tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) {//判斷是否已經終止 if (!completedAbruptly) {//為false表示任務已經執行完了的場景 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//這個min用來判斷核心執行緒的數量 if (min == 0 && ! workQueue.isEmpty())//如果核心執行緒keep數量為0並且任務佇列不為空則會至少保持一個worker的存活 min = 1; if (workerCountOf(c) >= min)//如果當前的worker已經大於配置的核心執行緒數量了,則讓執行緒被銷燬 return; // replacement not needed } addWorker(null, false); } } ``` #### 總結 > 通過執行緒池來執行任務,原理是通過委派給一個**Worker**物件來完成。 > > **Worker**物件本身也實現了**Runnable**介面,通過一個執行緒啟動呼叫到**Worker**物件的**run**方法,**Worker**物件在來呼叫任務的**run**方法完成物件任務的邏輯執行。**Worker**物件同時繼承了**AQS**,這是為了自定義實現獨佔鎖的功能,這是為了表示當前執行緒正在執行任務中,不應該被中斷。 > > 核心執行緒會在**getTask**的時候通過呼叫阻塞佇列的**take**方法阻塞,直到有新的任務加入佇列重新開始執行。非核心執行緒在執行完任務後會被銷燬,超時時間的表現是在從阻塞佇列中取任務時,是呼叫**take**還是**poll**來完成。 > > 核心執行緒會被回收的場景是在設定了**allowCoreThreadTimeOut**引數為true後,會將超時的核心執行緒銷燬。