自己實現一個執行緒池及分析java執行緒池原始碼
前言
和設計模式一樣,打算花三個月的時間,結合《Java併發程式設計實戰》一書,來總結下併發方面的知識。第一章從執行緒池的原理開始總結,希望自己能堅持下來,加油!
1. 如何實現一個執行緒池?
執行緒池的概念這裡不多說,在講它的原理前,我們先自己想一下,如果我來寫,那如何實現一個執行緒池?
1.1 執行緒池的重要變數
首先要定義一個存放所有執行緒的集合;
另外,每有一個任務分配給執行緒池,我們就從執行緒池中分配一個執行緒處理它。但當執行緒池中的執行緒都在執行狀態,沒有空閒執行緒時,我們還需要一個佇列來儲存提交給執行緒池的任務。
/**存放執行緒的集合*/
private ArrayList<MyThead> threads;
/**任務佇列*/
private ArrayBlockingQueue<Runnable> taskQueue;
- 1
- 2
- 3
- 4
初始化一個執行緒池時,要指定這個執行緒池的大小;另外,我們還需要一個變數來儲存已經執行的執行緒數目。
/**執行緒池初始限定大小*/
private int threadNum;
/**已經工作的執行緒數目*/
private int workThreadNum;
- 1
- 2
- 3
- 4
1.2 執行緒池的核心方法
執行緒池執行任務
接下來就是執行緒池的核心方法,每當向執行緒池提交一個任務時。如果 已經執行的執行緒<執行緒池大小,則建立一個執行緒執行任務,並把這個執行緒放入執行緒池;否則將任務放入緩衝佇列中。
public void execute(Runnable runnable) {
try {
mainLock.lock();
//執行緒池未滿,每加入一個任務則開啟一個執行緒
if(workThreadNum < threadNum) {
MyThead myThead = new MyThead(runnable);
myThead.start();
threads.add(myThead);
workThreadNum++;
}
//執行緒池已滿,放入任務佇列,等待有空閒執行緒時執行
else {
//佇列已滿,無法新增時,拒絕任務
if(!taskQueue.offer(runnable)) {
rejectTask();
}
}
} finally {
mainLock.unlock();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
到這裡,一個執行緒池已經實現的差不多了,我們還有最後一個難點要解決:從任務佇列中取出任務,分配給執行緒池中“空閒”的執行緒完成。
分配任務給執行緒的第一種思路
很容易想到一種解決思路:額外開啟一個執行緒,時刻監控執行緒池的執行緒空餘情況,一旦有執行緒空餘,則馬上從任務佇列取出任務,交付給空餘執行緒完成。
這種思路理解起來很容易,但仔細思考,實現起來很麻煩(1. 如何檢測到執行緒池中的空閒執行緒 2. 如何將任務交付給一個.start()
執行狀態中的空閒執行緒)。而且使執行緒池的架構變的更復雜和不優雅。
分配任務給執行緒的第二種思路
現在我們來講第二種解決思路:執行緒池中的所有執行緒一直都是執行狀態的,執行緒的空閒只是代表此刻它沒有在執行任務而已;我們可以讓執行中的執行緒,一旦沒有執行任務時,就自己從佇列中取任務來執行。
為了達到這種效果,我們要重寫run方法
,所以要寫一個自定義Thread
類,然後讓執行緒池都放這個自定義執行緒類
class MyThead extends Thread{
private Runnable task;
public MyThead(Runnable runnable) {
this.task = runnable;
}
@Override
public void run() {
//該執行緒一直啟動著,不斷從任務佇列取出任務執行
while (true) {
//如果初始化任務不為空,則執行初始化任務
if(task != null) {
task.run();
task = null;
}
//否則去任務佇列取任務並執行
else {
Runnable queueTask = taskQueue.poll();
if(queueTask != null)
queueTask.run();
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
1.3 最後生成的簡單執行緒池
/**
* 自定義簡單執行緒池
*/
public class MyThreadPool{
/**存放執行緒的集合*/
private ArrayList<MyThead> threads;
/**任務佇列*/
private ArrayBlockingQueue<Runnable> taskQueue;
/**執行緒池初始限定大小*/
private int threadNum;
/**已經工作的執行緒數目*/
private int workThreadNum;
private final ReentrantLock mainLock = new ReentrantLock();
public MyThreadPool(int initPoolNum) {
threadNum = initPoolNum;
threads = new ArrayList<>(initPoolNum);
//任務佇列初始化為執行緒池執行緒數的四倍
taskQueue = new ArrayBlockingQueue<>(initPoolNum*4);
threadNum = initPoolNum;
workThreadNum = 0;
}
public void execute(Runnable runnable) {
try {
mainLock.lock();
//執行緒池未滿,每加入一個任務則開啟一個執行緒
if(workThreadNum < threadNum) {
MyThead myThead = new MyThead(runnable);
myThead.start();
threads.add(myThead);
workThreadNum++;
}
//執行緒池已滿,放入任務佇列,等待有空閒執行緒時執行
else {
//佇列已滿,無法新增時,拒絕任務
if(!taskQueue.offer(runnable)) {
rejectTask();
}
}
} finally {
mainLock.unlock();
}
}
private void rejectTask() {
System.out.println("任務佇列已滿,無法繼續新增,請擴大您的初始化執行緒池!");
}
public static void main(String[] args) {
MyThreadPool myThreadPool = new MyThreadPool(5);
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"執行中");
}
};
for (int i = 0; i < 20; i++) {
myThreadPool.execute(task);
}
}
class MyThead extends Thread{
private Runnable task;
public MyThead(Runnable runnable) {
this.task = runnable;
}
@Override
public void run() {
//該執行緒一直啟動著,不斷從任務佇列取出任務執行
while (true) {
//如果初始化任務不為空,則執行初始化任務
if(task != null) {
task.run();
task = null;
}
//否則去任務佇列取任務並執行
else {
Runnable queueTask = taskQueue.poll();
if(queueTask != null)
queueTask.run();
}
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
現在我們來總結一下,這個自定義執行緒池的整個工作過程:
- 初始化執行緒池,指定執行緒池的大小。
- 向執行緒池中放入任務執行。
- 如果執行緒池中建立的執行緒數目未到指定大小,則建立我們自定義的執行緒類放入執行緒池集合,並執行任務。執行完了後該執行緒會一直監聽佇列
- 如果執行緒池中建立的執行緒數目已滿,則將任務放入緩衝任務佇列
- 執行緒池中所有建立的執行緒,都會一直從快取任務佇列中取任務,取到任務馬上執行
2. Java中的執行緒池實現
自己實現一個執行緒池後,接下來我們來結合原始碼,看下Java中執行緒池是怎麼實現的。
2.1 執行緒池框架
-
Executor
:
最頂部介面,提供了execute()
方法將任務提交和任務執行分離。當你把一個Runnable
任務提交給Executor
後,如何執行任務要看它的實現類。 -
ExecutorService
:
繼承Executor
,增加了對執行緒池中任務生命週期的管理,可強制取消正在執行的任務,拒絕再接受任務。提供了submit()
方法來擴充套件Executor.execute()
,使任務執行有返回值。 -
AbstractExecutorService
:ExecutorService
介面的預設實現,執行緒池的大部分功能已在這個類中被編寫。 -
ThreadPoolExecutor
:
執行緒池最核心的一個類,繼承了AbstractExecutorService
,完整的實現了一個執行緒池。 -
ScheduledExecutorService
:
繼承ExecutorService
介面,在其基礎上增加了定時執行任務的功能 -
ScheduledThreadPoolExecutor
:
擁有定時排程任務功能的執行緒池,實現了ScheduledExecutorService
介面,以及繼承了ThreadPoolExecutor
類
2.2ThreadPoolExecutor
原始碼分析
通過類圖框架,我們已經知道,Java執行緒池最關鍵的一個類就是ThreadPoolExecutor
。
那這個類是如何實現執行緒池的呢?其實大致原理跟第一節我們手動寫的簡單執行緒池差不多,下面我們結合原始碼來完整的走一遍。
構造方法
首先看一下它的構造方法:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
//構造方法
……
}
- 1
- 2
- 3
- 4
-
corePoolSize:即執行緒池大小,執行緒池中預設執行緒數目。
-
maximumPoolSize:執行緒池中最大執行緒數目。注意,當執行緒池執行的任務過多時,允許執行緒池額外建立執行緒。此額外執行緒在任務變少後會被執行緒池關閉。
-
keepAliveTime:執行緒空閒時間超過該值時,會被關閉。預設只有當
執行緒數目>corePoolSize
時才會生效。也可配置執行緒數目<corePoolSize
就生效 -
unit:keepAliveTime的時間單位
-
workQueue:任務快取佇列的實現
重要變數
執行緒池的重要變數主要兩個:runState
(執行緒池執行狀態:執行還是關閉)和workerCount
(執行緒池中的執行緒數目)
在JDK1.8中,這兩個變數被放入了一個執行緒安全的int型別變數中
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- 1
這裡可以好好講一下,貌似網上查到的執行緒池原始碼資料也沒有好好分析過為什麼要把兩個變數合併成一個變數。
首先,我們來看看它是如何將兩個變數存在一起的
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
- 1
- 2
- 3
這段程式碼宣告很神奇,我還是第一次知道int型別的變數還可以進行構造器式的宣告。ctlOf
就是合併變數ctl
的值。它通過“或運算”,將runState
和workerCount
合併成了一個數字。
為什麼“或運算“可以將兩個數字合併成一個呢?實際上,ctlOf
是一個32位的數字,前三位存的是runState
, 後27位存的是workerCount
。類似1110000……(27個0)
與00000000……001
進行或運算,兩個值就存在了同一個int變數中。取出來也只要通過位運算獲取前3位和後27位數字即可(也就是runStateOf(int c)
和workerCountOf(int c)
)。原始碼裡面也有部分提示:
// runState is stored in the high-order bits(runState存在高位中)
private static final int RUNNING = -1 << COUNT_BITS;
- 1
- 2
好,瞭解了兩個變數合併成一個的實現,我們再來想想,為什麼要這麼麻煩的進行變數合併?我的思考答案是:為了不加鎖實現執行緒安全。
我們可以宣告兩個執行緒安全的int型別來分別儲存runState
和workerCount
。對它們單獨加減時確實是執行緒安全的,但如果一段程式碼有兩個變數的共同參與呢?舉個簡單例子:
//當執行狀態是執行中時,增加執行緒數
if(runState == RUNNING) {
workCount.incrementAndGet();
}
- 1
- 2
- 3
- 4
這段程式碼顯然是執行緒不安全的,假設執行緒A判斷成功後進入方法塊,掛起;這時執行緒B修改了runState
的狀態為SHUTDOWN
。當執行緒A重新恢復執行時,雖然runState
不是執行狀態,但仍會對workCount
進行增加操作。
通過將兩個變數合併成了一個執行緒安全變數,即完美的解決了這個問題,再有兩個變數一起的程式碼時,也不需要加鎖就實現了執行緒安全
核心方法
最後看一下重頭戲,ThreadPoolExecutor
的核心方法execute()
。這個方法其實就是實現了Executor
介面中的execute
方法,執行你丟入執行緒池的任務。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
其實原始碼註釋已經寫的很清楚,當你將一個任務Runnable
提交給執行緒池執行時,主要分三步:
-
如果執行緒池中的執行緒數(workCount)< 執行緒池大小(corePoolSize),則建立一個執行緒,執行這個任務,並把這個執行緒放入執行緒池。新增任務時會對執行緒池狀態進行檢查,以防止執行緒池狀態為關閉時還新增執行緒。
-
如果執行緒池中的執行緒數(workCount)>= 執行緒池大小(corePoolSize),或者上一步新增任務最後失敗,將任務放入快取佇列中。當任務成功加入快取佇列,仍需要對執行緒池狀態進行二次檢查,防止執行緒池狀態改為關閉或執行緒池中已經沒有可以執行的執行緒。
-
如果上一步將任務放入快取佇列失敗,試著去增加一個新的執行緒來執行它(超過執行緒池大小的額外執行緒)。如果新增新執行緒失敗(可能是執行緒數已到達
maximumPoolSize
),則丟擲異常拒絕執行該任務。
接下來,我們跟著這三個步驟來看看原始碼。
當執行緒池中的執行緒數(workCount)< 執行緒池大小(corePoolSize),會執行addWorker()
方法來增加一個執行緒,執行任務並放入執行緒池。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
這個方法分為了兩個部分。
第一部分,也就是for迴圈裡面的程式碼,主要是檢查執行緒池執行狀態(關閉或其他狀態則新增失敗),以及執行緒數目是否超過了指定值(超過最大執行緒數則新增失敗)。
為了保持執行緒安全,在檢查的最後都會重新獲取一遍runState
和workCount
,與for迴圈最開始取到的兩個值進行對比,如果不一樣,則證明別的執行緒修改了它們,重新迴圈進行校驗。
第二部分,for迴圈校驗完畢後,證明此時是可以新增執行緒的,然後我們可以發現程式碼中建立了一個新物件:w = new Worker(firstTask)
。這個Worker
是什麼呢?
我們看下Workder
的程式碼會發現,它其實就是前面我們寫的自定義執行緒池裡面的自定義執行緒類。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
//構造方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
……
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
在Worker
的構造方法中,它會儲存傳進來的任務firstTask
,並且建立一個包裹自身的執行緒thread
,然後暴露給addWorker()
方法。
w = new Worker(firstTask);
final Thread t = w.thread;
- 1
- 2
如上面這段addWorker()
中的程式碼,其實等同於:
w = new Worker(firstTask);
final Thread t = new Thread(w);
- 1
- 2
addWorker()
中獲取到執行緒t
後,會將t
放入執行緒池,以及啟動它:
//新增進執行緒池
workers.add(w);
……
//新增成功後,啟動執行緒
if (workerAdded) {
t.start();
workerStarted = true;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
啟動執行緒t
其實最後執行的是Worker
中的run()
方法,而run()
方法又執行的runWorker(w)
方法:
public void run() {
runWorker(this);
}
- 1
- 2
- 3
我們觀察runWorker(w)
方法可以發現,和第一節提到的自定義執行緒一樣,實際這個方法就是執行提交給它的第一個任務後,繼續持續的去快取佇列中取任務執行。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
參照第一節,相信你很容易理解runWorker()
方法以及Worker
類的本質。
到這裡,執行緒池原始碼的分析也就差不多了,有了上面的基礎,execute()
的剩下兩步也很簡單,這裡不多講了。
3. 執行緒池的使用
通過第二節,我們已經知道,通過ThreadPoolExecutor
類,可以構造出各種需求的執行緒池。
在實際應用中,我們不採取每次自己根據需求分析new一個ThreadPoolExecutor
的做法,有一個類已經幫我們做好了一切:Executors
想要建立什麼樣性質的執行緒池,直接呼叫Executors
中的靜態方法就行了
Executors.newCachedThreadPool(); //建立一個緩衝池,緩衝池容量大小為Integer.MAX_VALUE
Executors.newSingleThreadExecutor(); //建立容量為1的緩衝池
Executors.newFixedThreadPool(int); //建立固定容量大小的緩衝池
- 1
- 2
- 3
這裡對Executors
不多講解了,想對Executors
有進一步瞭解可看:
Java併發程式設計(19):併發新特性—Executor框架與執行緒池(含程式碼)
本文部分內容參考以下文章:
深入理解Java之執行緒池
歡迎關注微信公眾號:shoshana