三、執行緒安全阻塞佇列 BlockingQueue 詳解
轉載自: https://www.cnblogs.com/WangHaiMing/p/8798709.html
本篇將詳細介紹BlockingQueue,以下是涉及的主要內容:
- BlockingQueue的核心方法
- 阻塞佇列的成員的概要介紹
- 詳細介紹DelayQueue、ArrayBlockingQueue、LinkedBlockingQueue的原理
- 執行緒池與BlockingQueue
一、初識阻塞佇列
在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題。通過這些高效並且執行緒安全的佇列類,為我們快速搭建高質量的多執行緒程式帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員,包括他們各自的功能以及常見使用場景。
BlockingQueue的核心方法:
public interface BlockingQueue<E> extends Queue<E> { //將給定元素設定到佇列中,如果設定成功返回true, 否則拋異常。如果是往限定了長度的佇列中設定值,推薦使用offer()方法。 boolean add(E e); //將給定的元素設定到佇列中,如果設定成功返回true, 否則返回false. e的值不能為空,否則丟擲空指標異常。 boolean offer(E e); //將元素設定到佇列中,如果佇列中沒有多餘的空間,該方法會一直阻塞,直到佇列中有多餘的空間。 void put(E e) throws InterruptedException; //將給定元素在給定的時間內設定到佇列中,如果設定成功返回true, 否則返回false. boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //從佇列中獲取值,如果佇列中沒有值,執行緒會一直阻塞,直到佇列中有值,並且該方法取得了該值。 E take() throws InterruptedException; //在給定的時間裡,從佇列中獲取值,時間到了直接呼叫普通的poll方法,為null則直接返回null。 E poll(long timeout, TimeUnit unit) throws InterruptedException; //獲取佇列中剩餘的空間。 int remainingCapacity(); //從佇列中移除指定的值。 boolean remove(Object o); //判斷佇列中是否擁有該值。 public boolean contains(Object o); //將佇列中值,全部移除,併發設定到給定的集合中。 int drainTo(Collection<? super E> c); //指定最多數量限制將佇列中值,全部移除,併發設定到給定的集合中。 int drainTo(Collection<? super E> c, int maxElements); }
在深入之前先了解下下ReentrantLock 和 Condition:
重入鎖ReentrantLock:
ReentrantLock鎖在同一個時間點只能被一個執行緒鎖持有;而可重入的意思是,ReentrantLock鎖可以被單個執行緒多次獲取。
ReentrantLock分為“公平鎖”和“非公平鎖”。它們的區別體現在獲取鎖的機制上是否公平。“鎖”是為了保護競爭資源,防止多個執行緒同時操作執行緒而出錯,ReentrantLock在同一個時間點只能被一個執行緒獲取(當某執行緒獲取到“鎖”時,其它執行緒就必須等待);ReentrantLock是通過一個FIFO的等待佇列來管理獲取該鎖所有執行緒的。在“公平鎖”的機制下,執行緒依次排隊獲取鎖;而“非公平鎖”在鎖是可獲取狀態時,不管自己是不是在佇列的開頭都會獲取鎖。
主要方法:
- lock()獲得鎖
- lockInterruptibly()獲得鎖,但優先響應中斷
- tryLock()嘗試獲得鎖,成功返回true,否則false,該方法不等待,立即返回
- tryLock(long time,TimeUnit unit)在給定時間內嘗試獲得鎖
- unlock()釋放鎖
Condition:await()、signal()方法分別對應之前的Object的wait()和notify()
- 和重入鎖一起使用
- await()是當前執行緒等待同時釋放鎖
- awaitUninterruptibly()不會在等待過程中響應中斷
- signal()用於喚醒一個在等待的執行緒,還有對應的singalAll()方法
二、阻塞佇列的成員
佇列 | 有界性 | 鎖 | 資料結構 |
---|---|---|---|
ArrayBlockingQueue | bounded(有界) | 加鎖 | arrayList |
LinkedBlockingQueue | optionally-bounded | 加鎖 | linkedList |
PriorityBlockingQueue | unbounded | 加鎖 | heap |
DelayQueue | unbounded | 加鎖 | heap |
SynchronousQueue | bounded | 加鎖 | 無 |
LinkedTransferQueue | unbounded | 加鎖 | heap |
LinkedBlockingDeque | unbounded | 無鎖 | heap |
下面分別簡單介紹一下:
ArrayBlockingQueue:是一個用陣列實現的有界阻塞佇列,此佇列按照先進先出(FIFO)的原則對元素進行排序。支援公平鎖和非公平鎖。【注:每一個執行緒在獲取鎖的時候可能都會排隊等待,如果在等待時間上,先獲取鎖的執行緒的請求一定先被滿足,那麼這個鎖就是公平的。反之,這個鎖就是不公平的。公平的獲取鎖,也就是當前等待時間最長的執行緒先獲取鎖】
- LinkedBlockingQueue:一個由連結串列結構組成的有界佇列,此佇列的長度為Integer.MAX_VALUE。此佇列按照先進先出的順序進行排序。
- PriorityBlockingQueue: 一個支援執行緒優先順序排序的無界佇列,預設自然序進行排序,也可以自定義實現compareTo()方法來指定元素排序規則,不能保證同優先順序元素的順序。
- DelayQueue: 一個實現PriorityBlockingQueue實現延遲獲取的無界佇列,在建立元素時,可以指定多久才能從佇列中獲取當前元素。只有延時期滿後才能從佇列中獲取元素。(DelayQueue可以運用在以下應用場景:1.快取系統的設計:可以用DelayQueue儲存快取元素的有效期,使用一個執行緒迴圈查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示快取有效期到了。2.定時任務排程。使用DelayQueue儲存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。)
- SynchronousQueue: 一個不儲存元素的阻塞佇列,每一個put操作必須等待take操作,否則不能新增元素。支援公平鎖和非公平鎖。SynchronousQueue的一個使用場景是線上程池裡。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個執行緒池根據需要(新任務到來時)建立新的執行緒,如果有空閒執行緒則會重複使用,執行緒空閒了60秒後會被回收。
- LinkedTransferQueue: 一個由連結串列結構組成的無界阻塞佇列,相當於其它佇列,LinkedTransferQueue佇列多了transfer和tryTransfer方法。
LinkedBlockingDeque: 一個由連結串列結構組成的雙向阻塞佇列。佇列頭部和尾部都可以新增和移除元素,多執行緒併發時,可以將鎖的競爭最多降到一半。
接下來重點介紹下:ArrayBlockingQueue、LinkedBlockingQueue以及DelayQueue
三、阻塞佇列原理以及使用
(1)DelayQueue
DelayQueue的泛型引數需要實現Delayed介面,Delayed介面繼承了Comparable介面,DelayQueue內部使用非執行緒安全的優先佇列(PriorityQueue),並使用Leader/Followers模式,最小化不必要的等待時間。DelayQueue不允許包含null元素。
Leader/Followers模式:
- 有若干個執行緒(一般組成執行緒池)用來處理大量的事件
- 有一個執行緒作為領導者,等待事件的發生;其他的執行緒作為追隨者,僅僅是睡眠。
- 假如有事件需要處理,領導者會從追隨者中指定一個新的領導者,自己去處理事件。
- 喚醒的追隨者作為新的領導者等待事件的發生。
- 處理事件的執行緒處理完畢以後,就會成為追隨者的一員,直到被喚醒成為領導者。
- 假如需要處理的事件太多,而執行緒數量不夠(能夠動態建立執行緒處理另當別論),則有的事件可能會得不到處理。
所有執行緒會有三種身份中的一種:leader和follower,以及一個幹活中的狀態:proccesser。它的基本原則就是,永遠最多隻有一個leader。而所有follower都在等待成為leader。執行緒池啟動時會自動產生一個Leader負責等待網路IO事件,當有一個事件產生時,Leader執行緒首先通知一個Follower執行緒將其提拔為新的Leader,然後自己就去幹活了,去處理這個網路事件,處理完畢後加入Follower執行緒等待佇列,等待下次成為Leader。這種方法可以增強CPU快取記憶體相似性,及消除動態記憶體分配和執行緒間的資料交換。
引數以及建構函式:
// 可重入鎖
private final transient ReentrantLock lock = new ReentrantLock();
// 儲存佇列元素的佇列——優先佇列
private final PriorityQueue<E> q = new PriorityQueue<E>();
//用於優化阻塞通知的執行緒元素leader,Leader/Followers模式
private Thread leader = null;
//用於實現阻塞和通知的Condition物件
private final Condition available = lock.newCondition();
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
先看offer()方法:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
// 如果原來佇列為空,重置leader執行緒,通知available條件
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
//因為DelayQueue不限制長度,因此新增元素的時候不會因為佇列已滿產生阻塞,因此帶有超時的offer方法的超時設定是不起作用的
public boolean offer(E e, long timeout, TimeUnit unit) {
// 和不帶timeout的offer方法一樣
return offer(e);
}
普通的poll()方法:如果延遲時間沒有耗盡的話,直接返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
再看看take()方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 如果佇列為空,需要等待available條件被通知
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
// 如果延遲時間已到,直接返回第一個元素
if (delay <= 0)
return q.poll();
// leader執行緒存在表示有其他執行緒在等待,那麼當前執行緒肯定需要等待
else if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
// 如果沒有leader執行緒,設定當前執行緒為leader執行緒
// 嘗試等待直到延遲時間耗盡(可能提前返回,那麼下次
// 迴圈會繼續處理)
try {
available.awaitNanos(delay);
} finally {
// 如果leader執行緒還是當前執行緒,重置它用於下一次迴圈。
// 等待available條件時,鎖可能被其他執行緒佔用從而導致
// leader執行緒被改變,所以要檢查
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果沒有其他執行緒在等待,並且佇列不為空,通知available條件
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
最後看看帶有timeout的poll方法:
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
if (nanos <= 0)
return null;
else
// 嘗試等待available條件,記錄剩餘的時間
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return q.poll();
if (nanos <= 0)
return null;
// 當leader執行緒不為空時(此時delay>=nanos),等待的時間
// 似乎delay更合理,但是nanos也可以,因為排在當前執行緒前面的
// 其他執行緒返回時會喚醒available條件從而返回,
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
// nanos需要更新
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
(2)ArrayBlockingQueue
引數以及建構函式:
// 儲存佇列元素的陣列
final Object[] items;
// 拿資料的索引,用於take,poll,peek,remove方法
int takeIndex;
// 放資料的索引,用於put,offer,add方法
int putIndex;
// 元素個數
int count;
// 可重入鎖
final ReentrantLock lock;
// notEmpty條件物件,由lock建立
private final Condition notEmpty;
// notFull條件物件,由lock建立
private final Condition notFull;
public ArrayBlockingQueue(int capacity) {
this(capacity, false);//預設構造非公平鎖的阻塞佇列
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
//初始化ReentrantLock重入鎖,出隊入隊擁有這同一個鎖
lock = new ReentrantLock(fair);
//初始化非空等待佇列
notEmpty = lock.newCondition();
//初始化非滿等待佇列
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
//將集合新增進陣列構成的佇列中
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
新增的實現原理:
這裡的add方法和offer方法最終呼叫的是enqueue(E x)方法,其方法內部通過putIndex索引直接將元素新增到陣列items中,這裡可能會疑惑的是當putIndex索引大小等於陣列長度時,需要將putIndex重新設定為0,這是因為當前佇列執行元素獲取時總是從佇列頭部獲取,而新增元素從中從佇列尾部獲取所以當佇列索引(從0開始)與陣列長度相等時,下次我們就需要從陣列頭部開始添加了,如下圖演示
//入隊操作
private void enqueue(E x) {
final Object[] items = this.items;
//通過putIndex索引對陣列進行賦值
items[putIndex] = x;
//索引自增,如果已是最後一個位置,重新設定 putIndex = 0;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
接著看put方法:
put方法是一個阻塞的方法,如果佇列元素已滿,那麼當前執行緒將會被notFull條件物件掛起加到等待佇列中,直到佇列有空檔才會喚醒執行新增操作。但如果佇列沒有滿,那麼就直接呼叫enqueue(e)方法將元素加入到陣列佇列中。到此我們對三個新增方法即put,offer,add都分析完畢,其中offer,add在正常情況下都是無阻塞的新增,而put方法是阻塞新增。這就是阻塞佇列的新增過程。說白了就是當佇列滿時通過條件物件Condtion來阻塞當前呼叫put方法的執行緒,直到執行緒又再次被喚醒執行。總得來說新增執行緒的執行存在以下兩種情況,一是,佇列已滿,那麼新到來的put執行緒將新增到notFull的條件佇列中等待,二是,有移除執行緒執行移除操作,移除成功同時喚醒put執行緒,如下圖所示
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//當佇列元素個數與陣列長度相等時,無法新增元素
while (count == items.length)
//將當前呼叫執行緒掛起,新增到notFull條件佇列中等待喚醒
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
移除實現原理:
poll方法,該方法獲取並移除此佇列的頭元素,若佇列為空,則返回 null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//判斷佇列是否為null,不為null執行dequeue()方法,否則返回null
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//刪除佇列頭元素並返回
private E dequeue() {
//拿到當前陣列的資料
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//獲取要刪除的物件
E x = (E) items[takeIndex];
將陣列中takeIndex索引位置設定為null
items[takeIndex] = null;
//takeIndex索引加1並判斷是否與陣列長度相等,
//如果相等說明已到盡頭,恢復為0
if (++takeIndex == items.length)
takeIndex = 0;
count--;//佇列個數減1
if (itrs != null)
itrs.elementDequeued();//同時更新迭代器中的元素資料
//刪除了元素說明佇列有空位,喚醒notFull條件物件新增執行緒,執行新增操作
notFull.signal();
return x;
}
接著看remove(Object o)方法
public boolean remove(Object o) {
if (o == null) return false;
//獲取陣列資料
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();//加鎖
try {
//如果此時佇列不為null,這裡是為了防止併發情況
if (count > 0) {
//獲取下一個要新增元素時的索引
final int putIndex = this.putIndex;
//獲取當前要被刪除元素的索引
int i = takeIndex;
//執行迴圈查詢要刪除的元素
do {
//找到要刪除的元素
if (o.equals(items[i])) {
removeAt(i);//執行刪除
return true;//刪除成功返回true
}
//當前刪除索引執行加1後判斷是否與陣列長度相等
//若為true,說明索引已到陣列盡頭,將i設定為0
if (++i == items.length)
i = 0;
} while (i != putIndex);//繼承查詢
}
return false;
} finally {
lock.unlock();
}
}
//根據索引刪除元素,實際上是把刪除索引之後的元素往前移動一個位置
void removeAt(final int removeIndex) {
final Object[] items = this.items;
//先判斷要刪除的元素是否為當前佇列頭元素
if (removeIndex == takeIndex) {
//如果是直接刪除
items[takeIndex] = null;
//當前佇列頭元素加1並判斷是否與陣列長度相等,若為true設定為0
if (++takeIndex == items.length)
takeIndex = 0;
count--;//佇列元素減1
if (itrs != null)
itrs.elementDequeued();//更新迭代器中的資料
} else {
//如果要刪除的元素不在佇列頭部,
//那麼只需迴圈迭代把刪除元素後面的所有元素往前移動一個位置
//獲取下一個要被新增的元素的索引,作為迴圈判斷結束條件
final int putIndex = this.putIndex;
//執行迴圈
for (int i = removeIndex;;) {
//獲取要刪除節點索引的下一個索引
int next = i + 1;
//判斷是否已為陣列長度,如果是從陣列頭部(索引為0)開始找
if (next == items.length)
next = 0;
//如果查詢的索引不等於要新增元素的索引,說明元素可以再移動
if (next != putIndex) {
items[i] = items[next];//把後一個元素前移覆蓋要刪除的元
i = next;
} else {
//在removeIndex索引之後的元素都往前移動完畢後清空最後一個元素
items[i] = null;
this.putIndex = i;
break;//結束迴圈
}
}
count--;//佇列元素減1
if (itrs != null)
itrs.removedAt(removeIndex);//更新迭代器資料
}
notFull.signal();//喚醒新增執行緒
}
remove(Object o)方法的刪除過程相對複雜些,因為該方法並不是直接從佇列頭部刪除元素。首先執行緒先獲取鎖,再一步判斷佇列count>0,這點是保證併發情況下刪除操作安全執行。接著獲取下一個要新增源的索引putIndex以及takeIndex索引 ,作為後續迴圈的結束判斷,因為只要putIndex與takeIndex不相等就說明佇列沒有結束。然後通過while迴圈找到要刪除的元素索引,執行removeAt(i)方法刪除,在removeAt(i)方法中實際上做了兩件事,一是首先判斷佇列頭部元素是否為刪除元素,如果是直接刪除,並喚醒新增執行緒,二是如果要刪除的元素並不是佇列頭元素,那麼執行迴圈操作,從要刪除元素的索引removeIndex之後的元素都往前移動一個位置,那麼要刪除的元素就被removeIndex之後的元素替換,從而也就完成了刪除操作。
接著看take()方法
take方法其實很簡單,有就刪除沒有就阻塞,注意這個阻塞是可以中斷的,如果佇列沒有資料那麼就加入notEmpty條件佇列等待(有資料就直接取走,方法結束),如果有新的put執行緒添加了資料,那麼put操作將會喚醒take執行緒,執行take操作。圖示如下
//從佇列頭部刪除,佇列沒有元素就阻塞,可中斷
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//中斷
try {
//如果佇列沒有元素
while (count == 0)
//執行阻塞操作
notEmpty.await();
return dequeue();//如果佇列有元素執行刪除操作
} finally {
lock.unlock();
}
}
最後看看peek()方法,比較簡單,直接返回當前佇列的頭元素但不刪除任何元素。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//直接返回當前佇列的頭元素,但不刪除
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}
(3)LinkedBlockingQueue
引數以及建構函式:
//節點類,用於儲存資料
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
// 容量大小
private final int capacity;
// 元素個數,因為有2個鎖,存在競態條件,使用AtomicInteger
private final AtomicInteger count = new AtomicInteger(0);
// 頭結點
private transient Node<E> head;
// 尾節點
private transient Node<E> last;
// 獲取並移除元素時使用的鎖,如take, poll, etc
private final ReentrantLock takeLock = new ReentrantLock();
// notEmpty條件物件,當佇列沒有資料時用於掛起執行刪除的執行緒
private final Condition notEmpty = takeLock.newCondition();
// 新增元素時使用的鎖如 put, offer, etc
private final ReentrantLock putLock = new ReentrantLock();
// notFull條件物件,當佇列資料已滿時用於掛起執行新增的執行緒
private final Condition notFull = putLock.newCondition();
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
四、執行緒池中的BlockingQueue
首先看下建構函式
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){...}
TimeUnit:時間單位;BlockingQueue:等待的執行緒存放佇列;keepAliveTime:非核心執行緒的閒置超時時間,超過這個時間就會被回收;RejectedExecutionHandler:執行緒池對拒絕任務的處理策略。
自定義執行緒池:這個構造方法對於佇列是什麼型別比較關鍵。
- 在使用有界佇列時,若有新的任務需要執行,如果執行緒池實際執行緒數小於corePoolSize,則優先建立執行緒,
- 若大於corePoolSize,則會將任務加入佇列,
- 若佇列已滿,則在匯流排程數不大於maximumPoolSize的前提下,建立新的執行緒,
- 若佇列已經滿了且執行緒數大於maximumPoolSize,則執行拒絕策略。或其他自定義方式。
接下來看下原始碼:
public void execute(Runnable command) {
if (command == null) //不能是空任務
throw new NullPointerException();
//如果還沒有達到corePoolSize,則新增新執行緒來執行任務
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//如果已經達到corePoolSize,則不斷的向工作佇列中新增任務
if (runState == RUNNING && workQueue.offer(command)) {
//執行緒池已經沒有任務
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
//如果執行緒池不處於執行中或者工作佇列已經滿了,但是當前的執行緒數量還小於允許最大的maximumPoolSize執行緒數量,則繼續建立執行緒來執行任務
else if (!addIfUnderMaximumPoolSize(command))
//已達到最大執行緒數量,任務佇列也已經滿了,則呼叫飽和策略執行處理器
reject(command); // is shutdown or saturated
}
}
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
//更改幾個重要的控制欄位需要加鎖
try {
//池裡執行緒數量小於核心執行緒數量,並且還需要是執行時
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start(); //建立後,立即執行該任務
return true;
}
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w); //委託執行緒工廠來建立,具有相同的組、優先順序、都是非後臺執行緒
if (t != null) {
w.thread = t;
workers.add(w); //加入到工作者執行緒集合裡
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}