3.Java資料結構原理解析-Queue系列
Queue,也就是佇列,滿足FIFO的特性。
在Java中,Queue是一個介面,它的實現類有很多,其中非執行緒安全的代表是LinkedList,執行緒安全的有阻塞和非阻塞的,阻塞的大都實現了Queue的子介面BlockingQueue(阻塞佇列),例如:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue等。非阻塞的有ConcurrentLinkedQueue。
Queue介面方法定義:
//新增元素,成功返回true,容量不夠拋IllegalStateException
boolean add(E e)
//新增元素,成功返回true,容量不足返回false
boolean offer(E e)
//移除隊首元素,佇列為空時拋NoSuchElementException
E remove()
//移除隊首元素,佇列為空時返回null
E poll()
//檢視隊首元素,佇列為空時拋NoSuchElementException
E element()
//檢視隊首元素,佇列為空時返回null
E peek()
BlockingQueue介面定義(BlockingQueue除了繼承Queue定義的方法外,還加入了自己的阻塞方法):
//新增元素,容量不足阻塞
void put(E e) throws InterruptedException
//新增元素,容量不足時等待指定時間
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
//移除隊首元素,佇列為空時阻塞
E take() throws InterruptedException
//移除隊首元素,佇列為空時等待指定時間
E poll(long timeout, TimeUnit unit) throws InterruptedException
佇列大多數是在多執行緒環境下使用的,生產者執行緒往佇列中新增元素,消費者執行緒從佇列中取出元素。所以,下面重點討論ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue是採用什麼樣的資料結構和演算法來保證佇列的執行緒安全性的。
1.阻塞佇列 ArrayBlockingQueue
ArrayBlockingQueue底層的資料結構是陣列和迴圈佇列,使用一個可重入鎖和這個鎖的兩個條件物件進行併發控制。
首先,來看看ArrayBlockingQueue的屬性。
//存放元素的陣列
final Object[] items;
//迴圈佇列頭指標,起始值為0
int takeIndex;
//迴圈佇列尾指標,指向下一個元素插入的位置,起始值為0
int putIndex;
//元素的個數
int count;
//可重入鎖(被final修飾,之所以沒有初始化,是因為所有的構造方法裡面都對lock進行了初始化)
final ReentrantLock lock;
//佇列非空條件
private final Condition notEmpty;
//佇列未滿條件
private final Condition notFull;
ArrayBlockingQueue的長度是固定的,無法擴容,所以建立一個ArrayBlockingQueue物件時,必須指定佇列的容量,並且ArrayBlockingQueue不允許原始為null。從建構函式上可以看出這一點。
//建立一個指定容量的佇列,鎖預設是非公平的
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//建立一個指定容量、指定鎖的公平性的佇列
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
//建立鎖
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
//用現有的集合建立一個指定容量、指定鎖的公平性的佇列
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
//建立佇列
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion(鎖定只用於可見性,而不是互斥)
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
//尾指標
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
(1)插入元素add、offer、put
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
//佇列尾指標+1
putIndex = inc(putIndex);
++count;
//通知在notEmpty上等待的執行緒
notEmpty.signal();
}
//迴圈加。迴圈佇列的實現就體現在這裡
final int inc(int i) {
return (++i == items.length) ? 0 : i;
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//在鎖上等待,直到獲取鎖,但是會響應中斷,優先考慮響應中斷,而不是響應鎖的普通獲取或重入獲取。
//不明白為什麼add和offer方法使用lock,而put方法使用lockInterruptibly?
lock.lockInterruptibly();
try {
//佇列已滿,在notFull物件上等待
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
(2)取出元素remove、poll、take
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//佇列為空時返回null
return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
}
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
//佇列頭指標+1
takeIndex = inc(takeIndex);
--count;
//通知在notFull物件上等待的執行緒
notFull.signal();
return x;
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//佇列為空時,在notEmpty上等待
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
2.阻塞佇列 LinkedBlockingQueueue
LinkedBlockingQueueue底層的資料結構是單向連結串列,使用兩個可重入鎖(放鎖和拿鎖)和物件的條件物件來進行併發控制。
LinkedBlockingQueueue由於使用了兩個鎖,所以允許同時新增和取出元素。這一點是和ArrayBlockingQueue最大的區別。
一個類的屬性體現了這個類的資料結構,我們首先看看LinkedBlockingQueueue的屬性
//連結串列的節點。從節點可以看出該連結串列只有一個next指標,是單向的,
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
//佇列的容量,定義為final,說明所有的構造方法必須初始化容量
private final int capacity;
//元素的個數,因為使用了放鎖和拿鎖兩個鎖,所以同時新增和取出元素時存在併發問題,使用原子操作來保證元素的個數的準確性
private final AtomicInteger count = new AtomicInteger(0);
//單向連結串列頭指標,head.item永遠為null。(定義為transient說明不能序列化)
private transient Node<E> head;
//單向連結串列尾指標,last.next永遠為null。(定義為transient說明不能序列化)
private transient Node<E> last;
//拿鎖(控制remove、poll、take方法等)
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
//放鎖(控制add、offer、put方法等)
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
照常理來說,取出一個元素後,佇列應該是notFull,那麼拿鎖控制的是應該是notFull的條件變數,但是因為此處存在兩把鎖,可能在取出元素後,又有元素加入了。所有此處拿鎖控制的是notEmpty,取出元素後,只要判斷剩下的元素是否大於1就可以了,因為不可能有兩個執行緒同時執行取操作。
(1)插入元素add、offer、put
//add方法是在AbstractQueue實現了,所以跟ArrayBlockingQueue一樣
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node(e);
//鎖定放鎖
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//佇列未滿
if (count.get() < capacity) {
enqueue(node);
//佇列長度+1
c = count.getAndIncrement();
//插入之後,佇列還是未滿,通知在notFull物件上的等待的執行緒(例如:put方法)
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
//c==0表示插入之前佇列為空,佇列為空說明可能有讀執行緒在阻塞,如果c>0,說明肯定沒有讀執行緒在阻塞
if (c == 0)
signalNotEmpty();
return c >= 0;
}
//signalNotEmpty雖然用在offder/put中,但是從不在putLock的同步區內。這樣就保證同一時刻只持有一個鎖,這樣就不會出現死鎖問題。
//???關於此處為什麼加鎖的問題,暫時就是這樣理解的
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
//入隊。入隊操作很簡單,就是將連結串列尾指標的next節點指向當前節點,並把當前節點設定為尾指標
private void enqueue(Node<E> node) {
last = last.next = node;
}
(2)取出操作remove、poll、take
//remove()方法是在AbstractQueue中實現了,跟ArrayBlockingQueue一樣
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
//出隊
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
//出隊操作比較簡單,就是將單鏈表的頭指標指向下一個元素
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
//不是很明白這個,如果要幫助GC,直接將h.next=null不是更好嗎?
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
//在notEmpty條件上等待
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
3.阻塞佇列 SynchronousQueue
SynchronousQueue跟上面兩個阻塞佇列不同,它內部沒有容器,一個生產執行緒put的時候,如果當前沒有消費執行緒執行take,此生產執行緒必須阻塞,等待一個消費執行緒呼叫take操作,take操作將會喚醒該生產執行緒,同時消費執行緒會獲取生產執行緒的資料(即資料傳遞),這樣的一個過程稱為一次配對過程(當然也可以先take後put,原理是一樣的)。
4.非阻塞佇列 ConcurrentLinkedQueue
ArrayBlockingQueue和LinkedBlockingQueue都是阻塞的,阻塞體現在入隊和出隊的時候需要加鎖。
下面介紹的ConcurrentLinkedQueue是非阻塞的,ConcurrentLinkedQueue底層的資料結構和LinkedBlockingQueue相同,也是使用單鏈表,不同的是ConcurrentLinkedQueue通過sun.misc.Unsafe
類的CAS操作來保證執行緒安全的。
Unsafe類提供了硬體級別的原子操作,主要compareAndSwapXXX方法實現。
關於Unsafe,網上有很多資源,請自行查閱。
我們首先來看看ConcurrentLinkedQueue的成員變數。
//單鏈表頭節點
private transient volatile Node<E> head;
//單鏈表尾節點
private transient volatile Node<E> tail;
//節點型別。與LinkedBlockingQueue不同的是,所有的賦值操作都是通過Unsafe物件的CAS來完成的,所以是執行緒安全的
private static class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
//為item賦值
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
//為next指標賦值
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
//獲取item屬性和next屬性的記憶體地址
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
(1)入隊add、offer
需要注意的是,每次入隊之後,tail並不是總指向最後一個節點。奇數時是倒數第二個節點,偶數時是第一個節點。
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
//獲得p的下一個節點
Node<E> q = p.next;
//如果下一個節點是null,也就是p節點就是尾節點
if (q == null) {
//將單鏈表的尾節點的next指標指向新節點
if (p.casNext(null, newNode)) {
if (p != t)
//如果tail不是尾節點則將入隊節點設定為tail。
// 如果失敗了,那麼說明有其他執行緒已經把tail移動過
casTail(t, newNode);
return true;
}
// Lost CAS race to another thread; re-read next
}
// 如果p節點等於p的next節點,則說明p節點和q節點都為空,表示佇列剛初始化,所以返回
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
//為佇列的尾節點賦值
private boolean casTail(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}