多執行緒底層程式碼跟蹤
首先何為執行緒(程序/執行緒)?
程序作業系統動態執行基本單元,系統為每個程序分配記憶體,包括一般情況下,包括文字區域(text region/指令)、資料區域(data region/變數)和堆疊(stack region/物件)。
我們的程式雖然可以做到多程序,但是,多程序需要切換上下文,什麼是上下文?
當程式執行到多程序的指令,那麼會把當前的執行環境-堆疊等,複製一份,到另一塊記憶體區域,而又因為cpu是輪尋機制,不是順序執行,關於CPU執行原理,百度上有篇文章寫的很好,我這邊引用一下。
頻繁切換上下文(大概一次20微秒),屬於沒必要的昂貴消耗。另外就是程序間通訊需要通過管道機制,比較複雜。
那麼多執行緒就成了我們最好的選擇。
執行緒的定義是,一個執行緒有且至少有一個執行緒,執行緒利用程序的資源,且本身不擁有系統資源,所以對執行緒的排程的開銷就會小很多。
因為這篇文章我定義到Java的分類下面,所以還是要通過Java來描述
其實我認為要真的好好深入學習執行緒程序,cpu排程這塊,還是要通過C來學
日後有時間,我會用C語言來模擬實現一遍
既然瞭解了什麼是執行緒,看下Java怎麼實現多執行緒:Thread
,Runnable
,Future
至於網上有些說4種的,其實就是用ExecutorService來管理了一下。
那麼從頭聊一聊。
Thread
其實是Runnable的實現類,類宣告如下
1 |
public class Thread implements Runnable |
看下最核心的一個方法
首先判定現成的狀態,0狀態表示該執行緒是新建立的,一切不是新建狀態的執行緒,都視為非法
第二部新增到執行緒組,執行緒組預設初始長度為4,如果滿了就闊為2倍。
之後可以看到,呼叫了一個本地方法start0
,如果成功,則更改started
標籤量
最後一個判定,啟動失敗,從執行緒組中移除當前執行緒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
public synchronized void start() { /** * This method is not invoked for the main method thread or "system" * group threads created/set up by the VM. Any new functionality added * to this method in the future may have to also be added to the VM. * * A zero status value corresponds to state "NEW". */ if (threadStatus != 0) throw new IllegalThreadStateException(); /* Notify the group that this thread is about to be started * so that it can be added to the group's list of threads * and the group's unstarted count can be decremented. */ group.add(this); boolean started = false; try { start0(); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { /* do nothing. If start0 threw a Throwable then it will be passed up the call stack */ } } } |
Thread
中出現的Runnable,作為一個介面,只有一個方法,就是run
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@FunctionalInterface public interface Runnable { /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */ public abstract void run(); } |
之後來看Future,Future
其實提供了和Runnable
能力並列的介面,簡單解釋一下為什麼這麼說,Runnable
介面提供了run
,也就是可以放線上程中執行的能力,Future
其實是賦予了執行緒執行後可以返回的能力,run
的宣告是void
,所以沒有返回值。
兩者結合,簡單易懂的一個類RunnableFuture
的介面就出來了。
那麼相當於Thread
的實現類,FutureTask
就出現了,它就是集大成者。
這麼說可能有點跳躍,先看下下面的實現,一看,誒,怎麼沒有Future?
Future本身是一個介面,跟Runnable是相同的級別,但區別通俗來講在於他沒有run的能力,這個能力來自於Runnable。
追溯一下FutureTask,發現它繼承了RunnableFuture,誒,這個單詞起的就有意思了,包含了Runnable,和Future。
點進去看下
1 2 3 4 5 6 7 |
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); } |
Future就在這,關於Future裡面有什麼,大家可以點進去看看,裡面最關鍵的就是get() throws InterruptedException, ExecutionException;
這個方法,就這這個方法,讓我們通過呼叫api,拿到執行緒裡面的值。
如果想使用這個東西,開啟執行緒,這個時候不能用new Thread(future)
這種方式了,因為Thread沒有這種能力,只實現了一個Runnable介面,
這個時候,一個新的類出現了,原始碼如下
1 2 3 4 5 6 7 8 9 10 |
@FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; } |
看下Callable
的宣告,是有返回值的,並且可以丟擲異常的。
這個返回值就很關鍵了,通過這個返回值,你可以把任何你想通過執行緒拿到的結果拿回來。
而拿結果的方法就是FutureTask的get()方法,之前我們看原始碼時又看到,這個get方法來自於Future介面的V get()方法
簡單看下如何使用一個有返回值的多執行緒操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public class CallableTest { public static void main(String[] args) throws ExecutionException, InterruptedException { Callable<Integer> callable = new Task(); FutureTask task = new FutureTask(callable); Thread oneThread = new Thread(task); oneThread.start(); System.out.println(">>> 工作結果 " + task.get().toString()); } } class Task implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println(">>> 執行緒開始工作"); Thread.sleep(1000); System.out.println(">>> 結束工作開始返回"); return 10; } } |
可以看到FutureTask
依然呼叫的是Thread
,走的是本地方法start0
。
Runbale就沒什麼好說的了,實現一個介面,放到Thread裡面去執行,基本沒什麼東西,能力與Thread差不多,區別是實現Runnable介面的類必須依託於Thread類才能啟動,
1 2 3 4 |
//使用這個構造方法 public Thread(Runnable target) { this(null, target, "Thread-" + nextThreadNum(), 0); } |
然後用Thread的start方法,需要注意的是,千萬不要調run方法, 要用start。
最後看下ExecutorService
這個類,ExecutorService
級別很高,他的爸爸直接就是Executor。
他的兒子,是AbstractExecutorService
,這裡實現了submit
,doInvokeAny
等方法。
而我們呼叫Executors.newFixedThreadPool(poolSize);
返回的是ThreadPoolExecutor
注:一般不建議使用Executors.newFixedThreadPool(poolSize);
,什麼東西全是預設,建議如下方式:
1 2 3 |
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("test-%d").build(); ExecutorService service = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(1024), factory, new ThreadPoolExecutor.AbortPolicy()); |
具體引數含義就自行百度吧,很多人講的。
那ThreadPoolExecutor
這個類,又是AbstractExecutorService
的兒子,所以這個關係就很明顯了ThreadPoolExecutor
-> AbstractExecutorService
-> ExecutorService
-> Executor
看到一堆submit
方法,然而沒什麼用,真正關鍵的方法,是execute方法,在ThreadPoolExecutor
中實現。
這個類有點意思,一上來就給我一個下馬威private final AtomicInteger ctl = new AtlmicInteger(ctlOf(RUNNING, 0));
這個ctl到底是什麼?
查了很久,在cnblogs裡找到一個大神的描述 – Okevin
這個變數使用來幹嘛的呢?它的作用有點類似我們在《7.ReadWriteLock介面及其實現ReentrantReadWriteLock》中提到的讀寫鎖有讀、寫兩個同步狀態,而AQS則只提供了state一個int型變數,此時將state高16位表示為讀狀態,低16位表示為寫狀態。這裡的clt同樣也是,它表示了兩個概念:
workerCount:當前有效的執行緒數
runState:當前執行緒池的五種狀態,Running、Shutdown、Stop、Tidying、Terminate。
int型變數一共有32位,執行緒池的五種狀態runState至少需要3位來表示,故workCount只能有29位,所以程式碼中規定執行緒池的有效執行緒數最多為2^29-1。
看到這先來聊一下執行緒提交任務的規則,–《java併發程式設計藝術》
- 首先會判斷核心執行緒池裡是否有執行緒可執行,有空閒執行緒則建立一個執行緒來執行任務。
- 當核心執行緒池裡已經沒有執行緒可執行的時候,此時將任務丟到任務佇列中去。
- 如果任務佇列(有界)也已經滿了的話,但執行的執行緒數小於最大執行緒池的數量的時候,此時將會新建一個執行緒用於執行任務,但如果執行的執行緒數已經達到最大執行緒池的數量的時候,此時將無法建立執行緒執行任務。
所以實際上對於執行緒池不僅是單純地將任務丟到執行緒池,執行緒池中有執行緒就執行任務,沒執行緒就等待。
最後附上大神對execute的註解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
/** * corePoolSize:核心執行緒池的執行緒數量 * * maximumPoolSize:最大的執行緒池執行緒數量 * * keepAliveTime:執行緒活動保持時間,執行緒池的工作執行緒空閒後,保持存活的時間。 * * unit:執行緒活動保持時間的單位。 * * workQueue:指定任務佇列所使用的阻塞佇列 */ //ThreadPoolExecutor#execute public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //由它可以獲取到當前有效的執行緒數和執行緒池的狀態 /*1.獲取當前正在執行執行緒數是否小於核心執行緒池,是則新建立一個執行緒執行任務,否則將任務放到任務佇列中*/ int c = ctl.get(); if (workerCountOf(c) < corePoolSize){ if (addWorker(command, tre)) //在addWorker中建立工作執行緒執行任務 return ; c = ctl.get(); } /*2.當前核心執行緒池中全部執行緒都在執行workerCountOf(c) >= corePoolSize,所以此時將執行緒放到任務佇列中*/ //執行緒池是否處於執行狀態,且是否任務插入任務佇列成功 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command))//執行緒池是否處於執行狀態,如果不是則使剛剛的任務出隊 reject(command);//丟擲RejectedExceptionException異常 else if (workerCountOf(recheck) == 0) addWorker(null, false); } /*3.插入佇列不成功,且當前執行緒數數量小於最大執行緒池數量,此時則建立新執行緒執行任務,建立失敗丟擲異常*/ else if (!addWorker(command, false)){ reject(command); //丟擲RejectedExceptionException異常 } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
//ThreadPoolExecutor#addWorker private boolean addWorker(Runnable firstTask, boolean core) { /*首先會再次檢查執行緒池是否處於執行狀態,核心執行緒池中是否還有空閒執行緒,都滿足條件過後則會呼叫compareAndIncrementWorkerCount先將正在執行的執行緒數+1,數量自增成功則跳出迴圈,自增失敗則繼續從頭繼續迴圈*/ ... if (compareAndIncrementWorkerCount(c)) break retry; ... /*正在執行的執行緒數自增成功後則將執行緒封裝成工作執行緒Worker*/ boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock;//全域性鎖 w = new Woker(firstTask);//將執行緒封裝為Worker工作執行緒 final Thread t = w.thread; if (t != null) { //獲取全域性鎖 mainLock.lock(); /*當持有了全域性鎖的時候,還需要再次檢查執行緒池的執行狀態等*/ try { int c = clt.get(); int rs = runStateOf(c); //執行緒池執行狀態 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)){ //執行緒池處於執行狀態,或者執行緒池關閉且任務執行緒為空 if (t.isAlive()) //執行緒處於活躍狀態,即執行緒已經開始執行或者還未死亡,正確的應執行緒在這裡應該是還未開始執行的 throw new IllegalThreadStateException(); //private final HashSet<Worker> wokers = new HashSet<Worker>(); //包含執行緒池中所有的工作執行緒,只有在獲取了全域性的時候才能訪問它。將新構造的工作執行緒加入到工作執行緒集合中 workers.add(w); int s = worker.size(); //工作執行緒數量 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; //新構造的工作執行緒加入成功 } } finally { mainLock.unlock(); } if (workerAdded) { //在被構造為Worker工作執行緒,且被加入到工作執行緒集合中後,執行執行緒任務 //注意這裡的start實際上執行Worker中run方法,所以接下來分析Worker的run方法 t.start(); workerStarted = true; } } } finally { if (!workerStarted) //未能成功建立執行工作執行緒 //在啟動工作執行緒失敗後,將工作執行緒從集合中移除 addWorkerFailed(w); } return workerStarted; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
//ThreadPoolExecutor$Worker,它繼承了AQS,同時實現了Runnable,所以它具備了這兩者的所有特性 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; public Worker(Runnable firstTask) { //設定AQS的同步狀態為-1,禁止中斷,直到呼叫runWorker setState(-1); this.firstTask = firstTask; //通過執行緒工廠來建立一個執行緒,將自身作為Runnable傳遞傳遞 this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); //執行工作執行緒 } } |
Okevin部落格
本人部落格 https://radiancel.github.io
轉載請備註來源