JDK原始碼分析—— ArrayBlockingQueue 和 LinkedBlockingQueue
招賢納士:
我們叫數瀾
我們核心技術團隊來自阿里、華為、金蝶、移動、GE等
我們獲得來自阿里巴巴集團聯合創始人、湖畔山南總裁謝世煌、IDG合夥人牛奎光、洪泰等投資
我們的官網:https://www.dtwave.com
我們提供:期權、五險一金、試用期全薪、商業保險、免費體檢、入職即享年休、每週團建、生日party、生日禮物、節日禮品、技術分享、技術專題小組等。
我們的地盤在杭州餘杭區倉前夢想小鎮和北京望京soho
我們是一個技術氛圍濃厚,高執行力,充滿激情的團隊,期待你的加入,共同成長!如有興趣,煩請投遞簡歷,我們將快速處理:
資料開發工程師、Node.js、演算法工程師、測試開發工程師、架構
Java工程師、前端工程師,請投遞簡歷至[email protected]
運維工程師、專案經理,請投遞簡歷至[email protected]
目的:本文通過分析JDK原始碼來對比ArrayBlockingQueue 和LinkedBlockingQueue,以便日後靈活使用。
1. 在Java的Concurrent包中,添加了阻塞佇列BlockingQueue,用於多執行緒程式設計。BlockingQueue的核心方法有:
boolean add(E e) ,把 e 新增到BlockingQueue裡。如果BlockingQueue可以容納,則返回true,否則丟擲異常。
boolean offer(E e),表示如果可能的話,將 e 加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false。
void put(E e),把 e 新增到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻塞直到BlockingQueue裡面有空間再繼續。
E poll(long timeout, TimeUnit unit) ,取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null。
E take() ,取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,則呼叫此方法的執行緒被阻塞直到BlockingQueue有新的資料被加入。
int drainTo(Collection<? super E> c) 和 int drainTo(Collection<? super E> c, int maxElements) ,一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取 資料的個數),通過該方法,可以提升獲取資料效率,不需要多次分批加鎖或釋放鎖。
注意:BlockingQueue 不接受null 元素。試圖add、put 或offer 一個null 元素時,某些實現會丟擲NullPointerException。null 被用作指示poll 操作失敗的警戒值。
2. BlockingQueue常用的四個實現類
ArrayBlockingQueue:規定大小的BlockingQueue,其建構函式必須帶一個int引數來指明其大小.其所含的物件是以FIFO(先入先出)順序排序的.
2) LinkedBlockingQueue:大小不定的BlockingQueue,若其建構函式帶一個規定大小的引數,生成的BlockingQueue有大小限制,若不帶大小引數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的物件是以FIFO(先入先出)順序排序的
3) PriorityBlockingQueue:類似於LinkedBlockQueue,但其所含物件的排序不是FIFO,而是依據物件的自然排序順序或者是建構函式的Comparator決定的順序.
4) SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的.
3. ArrayBlockingQueue原始碼分析
ArrayBlockingQueue是一個由陣列支援的有界阻塞佇列。此佇列按 FIFO(先進先出)原則對元素進行排序。佇列的頭部 是在佇列中存在時間最長的元素,佇列的尾部 是在佇列中存在時間最短的元素。新元素插入到佇列的尾部,佇列檢索操作則是從佇列頭部開始獲得元素。
這是一個典型的“有界快取區”,固定大小的陣列在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的快取區,就不能再增加其容量。試圖向已滿佇列中放入元素會導致放入操作受阻塞;試圖從空佇列中檢索元素將導致類似阻塞。
ArrayBlockingQueue建立的時候需要指定容量capacity(可以儲存的最大的元素個數,因為它不會自動擴容)。其中一個構造方法為:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = (E[]) new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
ArrayBlockingQueue類中定義的變數有:
/** The queued items */
private final E[] items;
/** items index for next take, poll or remove */
private int takeIndex;
/** items index for next put, offer, or add. */
private int putIndex;
/** Number of items in the queue */
private int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
使用陣列items來儲存元素,由於是迴圈佇列,使用takeIndex和putIndex來標記put和take的位置。可以看到,該類中只定義了一個鎖ReentrantLock,定義兩個Condition物件:notEmputy和notFull,分別用來對take和put操作進行所控制。注:本文主要講解put()和take()操作,其他方法類似。
put(E e)方法的原始碼如下。進行put操作之前,必須獲得鎖並進行加鎖操作,以保證執行緒安全性。加鎖後,若發現佇列已滿,則呼叫notFull.await()方法,如當前執行緒陷入等待。直到其他執行緒take走某個元素後,會呼叫notFull.signal()方法來啟用該執行緒。啟用之後,繼續下面的插入操作。
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
*/
public void put(E e) throws InterruptedException {
//不能存放 null 元素
if (e == null) throw new NullPointerException();
final E[] items = this.items; //陣列佇列
final ReentrantLock lock = this.lock;
//加鎖
lock.lockInterruptibly();
try {
try {
//當佇列滿時,呼叫notFull.await()方法,使該執行緒阻塞。
//直到take掉某個元素後,呼叫notFull.signal()方法啟用該執行緒。
while (count == items.length)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
//把元素 e 插入到隊尾
insert(e);
} finally {
//解鎖
lock.unlock();
}
}
insert(E e) 方法如下:
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void insert(E x) {
items[putIndex] = x;
//下標加1或者等於0
putIndex = inc(putIndex);
++count; //計數加1
//若有take()執行緒陷入阻塞,則該操作啟用take()執行緒,繼續進行取元素操作。
//若沒有take()執行緒陷入阻塞,則該操作無意義。
notEmpty.signal();
}
/**
* Circularly increment i.
*/
final int inc(int i) {
//此處可以看到使用了迴圈佇列
return (++i == items.length)? 0 : i;
}
take()方法程式碼如下。take操作和put操作相反,故不作詳細介紹。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //加鎖
try {
try {
//當佇列空時,呼叫notEmpty.await()方法,使該執行緒阻塞。
//直到take掉某個元素後,呼叫notEmpty.signal()方法啟用該執行緒。
while (count == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
//取出隊頭元素
E x = extract();
return x;
} finally {
lock.unlock(); //解鎖
}
}
extract() 方法如下:
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E extract() {
final E[] items = this.items;
E x = items[takeIndex];
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return x;
}
小結:進行put和take操作,共用同一個鎖物件。也即是說,put和take無法並行執行!4. LinkedBlockingQueue 原始碼分析
基於連結串列的阻塞佇列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成),當生產者往佇列中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立即返回;只有當佇列緩衝區達到最大值快取容量時(LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列,直到消費者從佇列中消費掉一份資料,生產者執行緒會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理併發資料,還因為其對於生產者端和消費者端分別採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以此來提高整個佇列的併發效能。
作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue物件,而沒有指定其容量大小,LinkedBlockingQueue會預設一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大於消費者的速度,也許還沒有等到佇列滿阻塞產生,系統記憶體就有可能已被消耗殆盡了。
LinkedBlockingQueue 類中定義的變數有:
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger(0);
/** Head of linked list */
private transient Node<E> head;
/** Tail of linked list */
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
該類中定義了兩個ReentrantLock鎖:putLock和takeLock,分別用於put端和take端。也就是說,生成端和消費端各自獨立擁有一把鎖,避免了讀(take)寫(put)時互相競爭鎖的情況。
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); //加 putLock 鎖
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from
* capacity. Similarly for all other uses of count in
* other wait guards.
*/
//當佇列滿時,呼叫notFull.await()方法釋放鎖,陷入等待狀態。
//有兩種情況會啟用該執行緒
//第一、 某個put執行緒新增元素後,發現佇列有空餘,就呼叫notFull.signal()方法啟用阻塞執行緒
//第二、 take執行緒取元素時,發現佇列已滿。則其取出元素後,也會呼叫notFull.signal()方法啟用阻塞執行緒
while (count.get() == capacity) {
notFull.await();
}
// 把元素 e 新增到佇列中(隊尾)
enqueue(e);
c = count.getAndIncrement();
//發現佇列未滿,呼叫notFull.signal()啟用阻塞的put執行緒(可能存在)
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
//佇列空,說明已經有take執行緒陷入阻塞,故呼叫signalNotEmpty啟用阻塞的take執行緒
signalNotEmpty();
}
enqueue(E e)方法如下:
/**
* Creates a node and links it at end of queue.
* @param x the item
*/
private void enqueue(E x) {
// assert putLock.isHeldByCurrentThread();
last = last.next = new Node<E>(x);
}
take()方法程式碼如下。take操作和put操作相反,故不作詳細介紹。 public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
dequeue()方法如下:
/**
* Removes a node from head of queue.
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
小結:take和put操作各有一把鎖,可並行讀取。
參考地址: