1. 程式人生 > >多執行緒底層程式碼跟蹤

多執行緒底層程式碼跟蹤

來總結一下多執行緒開發,加深學習。

首先何為執行緒(程序/執行緒)?
程序作業系統動態執行基本單元,系統為每個程序分配記憶體,包括一般情況下,包括文字區域(text region/指令)、資料區域(data region/變數)和堆疊(stack region/物件)。

我們的程式雖然可以做到多程序,但是,多程序需要切換上下文,什麼是上下文?
當程式執行到多程序的指令,那麼會把當前的執行環境-堆疊等,複製一份,到另一塊記憶體區域,而又因為cpu是輪尋機制,不是順序執行,關於CPU執行原理,百度上有篇文章寫的很好,我這邊引用一下。
頻繁切換上下文(大概一次20微秒),屬於沒必要的昂貴消耗。另外就是程序間通訊需要通過管道機制,比較複雜。

那麼多執行緒就成了我們最好的選擇。
執行緒的定義是,一個執行緒有且至少有一個執行緒,執行緒利用程序的資源,且本身不擁有系統資源,所以對執行緒的排程的開銷就會小很多。

因為這篇文章我定義到Java的分類下面,所以還是要通過Java來描述
其實我認為要真的好好深入學習執行緒程序,cpu排程這塊,還是要通過C來學
日後有時間,我會用C語言來模擬實現一遍

既然瞭解了什麼是執行緒,看下Java怎麼實現多執行緒:
Thread,Runnable,Future
至於網上有些說4種的,其實就是用ExecutorService來管理了一下。
那麼從頭聊一聊。

Thread 其實是Runnable的實現類,類宣告如下

1
public class Thread implements Runnable

看下最核心的一個方法
首先判定現成的狀態,0狀態表示該執行緒是新建立的,一切不是新建狀態的執行緒,都視為非法
第二部新增到執行緒組,執行緒組預設初始長度為4,如果滿了就闊為2倍。
之後可以看到,呼叫了一個本地方法start0,如果成功,則更改started標籤量
最後一個判定,啟動失敗,從執行緒組中移除當前執行緒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public synchronized void start() {
    /**
     * This method is not invoked for the main method thread or "system"
     * group threads created/set up by the VM. Any new functionality added
     * to this method in the future may have to also be added to the VM.
     *
     * A zero status value corresponds to state "NEW".
     */
    if (threadStatus != 0)
        throw new IllegalThreadStateException();

    /* Notify the group that this thread is about to be started
     * so that it can be added to the group's list of threads
     * and the group's unstarted count can be decremented. */
    group.add(this);

    boolean started = false;
    try {
        start0();
        started = true;
    } finally {
        try {
            if (!started) {
                group.threadStartFailed(this);
            }
        } catch (Throwable ignore) {
            /* do nothing. If start0 threw a Throwable then
              it will be passed up the call stack */
        }
    }
}

 

Thread中出現的Runnable,作為一個介面,只有一個方法,就是run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@FunctionalInterface
public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

 

之後來看Future,
Future其實提供了和Runnable能力並列的介面,簡單解釋一下為什麼這麼說,Runnable介面提供了run,也就是可以放線上程中執行的能力,Future其實是賦予了執行緒執行後可以返回的能力,run的宣告是void,所以沒有返回值。

兩者結合,簡單易懂的一個類RunnableFuture的介面就出來了。
那麼相當於Thread的實現類,FutureTask就出現了,它就是集大成者。

這麼說可能有點跳躍,先看下下面的實現,一看,誒,怎麼沒有Future?
Future本身是一個介面,跟Runnable是相同的級別,但區別通俗來講在於他沒有run的能力,這個能力來自於Runnable。
追溯一下FutureTask,發現它繼承了RunnableFuture,誒,這個單詞起的就有意思了,包含了Runnable,和Future。
點進去看下

1
2
3
4
5
6
7
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

 

Future就在這,關於Future裡面有什麼,大家可以點進去看看,裡面最關鍵的就是get() throws InterruptedException, ExecutionException;這個方法,就這這個方法,讓我們通過呼叫api,拿到執行緒裡面的值。

如果想使用這個東西,開啟執行緒,這個時候不能用new Thread(future)這種方式了,因為Thread沒有這種能力,只實現了一個Runnable介面,
這個時候,一個新的類出現了,原始碼如下

1
2
3
4
5
6
7
8
9
10
@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

看下Callable的宣告,是有返回值的,並且可以丟擲異常的。
這個返回值就很關鍵了,通過這個返回值,你可以把任何你想通過執行緒拿到的結果拿回來。
而拿結果的方法就是FutureTask的get()方法,之前我們看原始碼時又看到,這個get方法來自於Future介面的V get()方法

簡單看下如何使用一個有返回值的多執行緒操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class CallableTest {

	public static void main(String[] args) throws ExecutionException, InterruptedException {
		Callable<Integer> callable = new Task();
		FutureTask task = new FutureTask(callable);

		Thread oneThread = new Thread(task);
		oneThread.start();

		System.out.println(">>>  工作結果 " + task.get().toString());
	}

}

class Task implements Callable<Integer> {

	@Override
	public Integer call() throws Exception {
		System.out.println(">>>  執行緒開始工作");
		Thread.sleep(1000);
		System.out.println(">>>  結束工作開始返回");
		return 10;
	}

}

 

可以看到FutureTask依然呼叫的是Thread,走的是本地方法start0
Runbale就沒什麼好說的了,實現一個介面,放到Thread裡面去執行,基本沒什麼東西,能力與Thread差不多,區別是實現Runnable介面的類必須依託於Thread類才能啟動,

1
2
3
4
//使用這個構造方法
public Thread(Runnable target) {
    this(null, target, "Thread-" + nextThreadNum(), 0);
}

 

然後用Thread的start方法,需要注意的是,千萬不要調run方法, 要用start。

最後看下ExecutorService這個類,ExecutorService級別很高,他的爸爸直接就是Executor。
他的兒子,是AbstractExecutorService,這裡實現了submit,doInvokeAny等方法。

而我們呼叫Executors.newFixedThreadPool(poolSize);返回的是ThreadPoolExecutor

注:一般不建議使用Executors.newFixedThreadPool(poolSize);,什麼東西全是預設,建議如下方式:

1
2
3
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("test-%d").build();
ExecutorService service = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS,
         new LinkedBlockingDeque<>(1024), factory, new ThreadPoolExecutor.AbortPolicy());

 

具體引數含義就自行百度吧,很多人講的。

ThreadPoolExecutor這個類,又是AbstractExecutorService的兒子,所以這個關係就很明顯了
ThreadPoolExecutor -> AbstractExecutorService -> ExecutorService -> Executor

看到一堆submit方法,然而沒什麼用,真正關鍵的方法,是execute方法,在ThreadPoolExecutor中實現。

這個類有點意思,一上來就給我一個下馬威
private final AtomicInteger ctl = new AtlmicInteger(ctlOf(RUNNING, 0));

這個ctl到底是什麼?
查了很久,在cnblogs裡找到一個大神的描述 – Okevin

這個變數使用來幹嘛的呢?它的作用有點類似我們在《7.ReadWriteLock介面及其實現ReentrantReadWriteLock》中提到的讀寫鎖有讀、寫兩個同步狀態,而AQS則只提供了state一個int型變數,此時將state高16位表示為讀狀態,低16位表示為寫狀態。這裡的clt同樣也是,它表示了兩個概念:

workerCount:當前有效的執行緒數
runState:當前執行緒池的五種狀態,Running、Shutdown、Stop、Tidying、Terminate。
  int型變數一共有32位,執行緒池的五種狀態runState至少需要3位來表示,故workCount只能有29位,所以程式碼中規定執行緒池的有效執行緒數最多為2^29-1。

看到這先來聊一下執行緒提交任務的規則,–《java併發程式設計藝術》

  1. 首先會判斷核心執行緒池裡是否有執行緒可執行,有空閒執行緒則建立一個執行緒來執行任務。
  2. 當核心執行緒池裡已經沒有執行緒可執行的時候,此時將任務丟到任務佇列中去。
  3. 如果任務佇列(有界)也已經滿了的話,但執行的執行緒數小於最大執行緒池的數量的時候,此時將會新建一個執行緒用於執行任務,但如果執行的執行緒數已經達到最大執行緒池的數量的時候,此時將無法建立執行緒執行任務。
      所以實際上對於執行緒池不僅是單純地將任務丟到執行緒池,執行緒池中有執行緒就執行任務,沒執行緒就等待。

最後附上大神對execute的註解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
  * corePoolSize:核心執行緒池的執行緒數量
  * 
  * maximumPoolSize:最大的執行緒池執行緒數量
  * 
  * keepAliveTime:執行緒活動保持時間,執行緒池的工作執行緒空閒後,保持存活的時間。
  * 
  * unit:執行緒活動保持時間的單位。
  * 
  * workQueue:指定任務佇列所使用的阻塞佇列
*/
//ThreadPoolExecutor#execute
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
   //由它可以獲取到當前有效的執行緒數和執行緒池的狀態
   /*1.獲取當前正在執行執行緒數是否小於核心執行緒池,是則新建立一個執行緒執行任務,否則將任務放到任務佇列中*/
   int c = ctl.get();
    if (workerCountOf(c) < corePoolSize){
        if (addWorker(command, tre))     //在addWorker中建立工作執行緒執行任務
            return ;
        c = ctl.get();
    }
    /*2.當前核心執行緒池中全部執行緒都在執行workerCountOf(c) >= corePoolSize,所以此時將執行緒放到任務佇列中*/
    //執行緒池是否處於執行狀態,且是否任務插入任務佇列成功
    if (isRunning(c) && workQueue.offer(command))    {
        int recheck = ctl.get();
     if (!isRunning(recheck) && remove(command))//執行緒池是否處於執行狀態,如果不是則使剛剛的任務出隊
       reject(command);//丟擲RejectedExceptionException異常
     else if (workerCountOf(recheck) == 0)
       addWorker(null, false);
  }
    /*3.插入佇列不成功,且當前執行緒數數量小於最大執行緒池數量,此時則建立新執行緒執行任務,建立失敗丟擲異常*/
  else if (!addWorker(command, false)){
    reject(command);    //丟擲RejectedExceptionException異常
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
//ThreadPoolExecutor#addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
/*首先會再次檢查執行緒池是否處於執行狀態,核心執行緒池中是否還有空閒執行緒,都滿足條件過後則會呼叫compareAndIncrementWorkerCount先將正在執行的執行緒數+1,數量自增成功則跳出迴圈,自增失敗則繼續從頭繼續迴圈*/
  ...
  if (compareAndIncrementWorkerCount(c))
    break retry;
  ...
  /*正在執行的執行緒數自增成功後則將執行緒封裝成工作執行緒Worker*/
  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    final ReentrantLock mainLock = this.mainLock;//全域性鎖
    w = new Woker(firstTask);//將執行緒封裝為Worker工作執行緒
    final Thread t = w.thread;
    if (t != null) {
      //獲取全域性鎖
      mainLock.lock();
      /*當持有了全域性鎖的時候,還需要再次檢查執行緒池的執行狀態等*/
      try {
        int c = clt.get();
        int rs = runStateOf(c);        //執行緒池執行狀態
        if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)){          //執行緒池處於執行狀態,或者執行緒池關閉且任務執行緒為空
          if (t.isAlive())    //執行緒處於活躍狀態,即執行緒已經開始執行或者還未死亡,正確的應執行緒在這裡應該是還未開始執行的
            throw new IllegalThreadStateException();
          //private final HashSet<Worker> wokers = new HashSet<Worker>();
          //包含執行緒池中所有的工作執行緒,只有在獲取了全域性的時候才能訪問它。將新構造的工作執行緒加入到工作執行緒集合中
          workers.add(w);
          int s = worker.size();    //工作執行緒數量
          if (s > largestPoolSize)
            largestPoolSize = s;
          workerAdded = true;    //新構造的工作執行緒加入成功
        }
      } finally {
        mainLock.unlock();
      }
      if (workerAdded) {
        //在被構造為Worker工作執行緒,且被加入到工作執行緒集合中後,執行執行緒任務
        //注意這裡的start實際上執行Worker中run方法,所以接下來分析Worker的run方法
        t.start();
        workerStarted = true;
      }
    }
  } finally {
    if (!workerStarted)   //未能成功建立執行工作執行緒
      //在啟動工作執行緒失敗後,將工作執行緒從集合中移除
      addWorkerFailed(w);    
  }
  return workerStarted;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//ThreadPoolExecutor$Worker,它繼承了AQS,同時實現了Runnable,所以它具備了這兩者的所有特性
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  final Thread thread;
  Runnable firstTask;
  public Worker(Runnable firstTask) {
    //設定AQS的同步狀態為-1,禁止中斷,直到呼叫runWorker
    setState(-1);    
    this.firstTask = firstTask;
    //通過執行緒工廠來建立一個執行緒,將自身作為Runnable傳遞傳遞
    this.thread = getThreadFactory().newThread(this);    
  }
  public void run() {
    runWorker(this);    //執行工作執行緒
  }
}

Okevin部落格

本人部落格 https://radiancel.github.io

轉載請備註來源