併發容器學習—ConcurrentLinkedQueue和ConcurrentLinkedDuque
阿新 • • 發佈:2019-05-02
一、ConcurrentLinkedQueue併發容器
1.
ConcurrentLinkedQueue的底層資料結構
ConcurrentLinkedQueue是一個底層基於連結串列實現的無界且執行緒安全的佇列。遵循先進先出(FIFO)的原則
。佇列的頭部是佇列中時間最長的元素。佇列的尾部是佇列中時間最短的元素。它採用CAS演算法來實現同步,是個非阻塞的佇列。
底層連結串列由一個個Node結點組成,Node的定義如下:
private static class Node<E> { volatile E item; //存放資料 volatile Node<E> next; //指向下個結點 //構造方法 Node(E item) { UNSAFE.putObject(this, itemOffset, item); //unsafe操作賦值 } //CAS方式嘗試更新資料 boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } //CAS方式更新下個結點地址 boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; //item的記憶體地址偏移量 private static final long nextOffset; //next的記憶體地址偏移量 static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
2. ConcurrentLinkedQueue的繼承關係 瞭解了底層的基本實現,再來看看 ConcurrentLinkedQueue的繼承關係,如下圖所示, ConcurrentLinkedQueue繼承了AbstractQueue即實現了Queue介面。 之前在ArrayList及LinkedList的學習時 Queue及 AbstractCollection都已學過,不在贅言,直接來看 AbstractQueue的原始碼:
public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> { protected AbstractQueue() { } //向佇列末尾新增e元素 public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } //刪除隊首元素,並將其返回 public E remove() { E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); } //獲取隊首元素,但不移除出佇列 public E element() { E x = peek(); if (x != null) return x; else throw new NoSuchElementException(); } //清空佇列中的所有元素 public void clear() { while (poll() != null) ; } //將集合c中的所有元素一次新增到佇列末尾 public boolean addAll(Collection<? extends E> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); boolean modified = false; for (E e : c) if (add(e)) modified = true; return modified; } }
3. 重要屬性及構造方法 瞭解了 ConcurrentLinkedQueue的繼承關係,再來看構造方法和一些重要的屬性
//底層連結串列的頭結點
private transient volatile Node<E> head;
//底層連結串列的尾結點
private transient volatile Node<E> tail;
//空構造,建立了一個空佇列
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
//以結合c中的元素建立一個佇列
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {
checkNotNull(e);
Node<E> newNode = new Node<E>(e);
if (h == null)
h = t = newNode;
else {
t.lazySetNext(newNode);
t = newNode;
}
}
if (h == null)
h = t = new Node<E>(null);
head = h;
tail = t;
}
4.入隊的實現 佇列中新增的方法有兩個,分別add和offer,效果沒有什麼區別,接下來看看實現的過程:
//由原始碼可見,add的本質還是呼叫了offer方法
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
//判斷待新增的元素是否為null,說明ConcurrentLinkedQueue中不允許null元素
checkNotNull(e);
final Node<E> newNode = new Node<E>(e); //新建結點
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
//判斷p是否為尾結點
if (q == null) {
//CAS方式嘗試更新p結點的next結點為newNode結點,失敗的話繼續迴圈嘗試
if (p.casNext(null, newNode)) {
//p的next結點更新成功,說明佇列尾結點改變了就繼續嘗試更新tail的值
//這裡判斷p!=t,說明tail不是實際的尾結點,應該要更新了,但並不強制
//要求一定要更新成功,即不要求tail一定要指向佇列的尾結點,允許tail滯後
//於真正的尾結點
if (p != t)
casTail(t, newNode); //更新tail,失敗也沒關係
return true;
}
}
else if (p == q)
/**
* p == q說明當前p結點已經被移除出隊了,需要重新獲取head來進行入隊操作
*
* 對於已經移除出隊的元素,會將next置為本身,
* 用於判斷當前元素已經出隊,接著從head繼續遍歷。
*
* 在整個offer方法的執行過程中,p一定是等於t或者在t的後面的,
* 因此如果p已經不在佇列中的話,t也一定不在佇列中了(FIFO)。
*
* 所以重新讀取一次tail到快照t,
* 如果t未發生變化,就從head開始繼續下去。
* 否則讓p從新的t開始繼續嘗試入隊是一個更好的選擇(此時新的t很可能在head後面)
*/
p = (t != (t = tail)) ? t : head;
else
/**
* 若p與t相等,則讓p指向next結點。
* 若p和t不相等,則說明已經經歷多次入隊失敗了(可能被插隊了),
* 則重新讀取一次tail到t,如果t發生了變化(確實被插隊了),則從t開始再次嘗試入隊。
*/
p = (p != t && t != (t = tail)) ? t : q;
}
}
5.出隊的過程
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
//判斷item是否為null,即判斷p結點是否要被移除出隊
//若item不為null,則嘗試更新item為null,
//因為item若為null表示結點標記為要被移除
if (item != null && p.casItem(item, null)) {
//判斷p與h是否還相同
//p與h不相同,說明head可能滯後,即head可能已經不是指向隊首結點,
//嘗試更新head為p.next(p.next若為null,則說明p為隊尾了,head只能更新為p)
//p與h相同則直接返回
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
//判斷p是否為隊尾,也就是佇列是否已經空了
//若佇列已經空了,則嘗試更新head為p
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
//p的next結點若是存在,還需要判斷是否在佇列中
//若p==q,說明p已經不再佇列中了,此時需要重新獲取head
//的快照h,並讓p=h,嘗試移除結點
else if (p == q)
continue restartFromHead;
else
p = q; // 繼續向後走一個節點嘗試移除結點
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h); //h的next結點設定為自身
}
6.出入隊的過程 ConcurrentLinkedQueue的出入隊操作並不是使用加鎖的方式實現的執行緒安全,而是通過無鎖的CAS演算法實現的,這就使得其程式碼實現雖然簡單,但理解起來晦澀難懂。 ConcurrentLinkedQueue是不允許入隊元素時null值的(結點的item不能為null),因為 ConcurrentLinkedQueue對已出隊的結點會將item賦上null值。也就是說某個結點的item若為null,則說明該結點是要被刪除的結,那麼久可以將其重佇列中移除了。 ConcurrentLinkedQueue除了對結點有以上要求外,其自身則有如下特點: 1.佇列中的所有結點在任意時刻只有最後一個結點的next是為null的。 2.要求head和tail屬性不能是null(可以是空結點,即item和next為null)。 3.head和tail具有滯後性,head指向的不一定是隊首結點,tail指向的也不一定是隊尾結點。 下面以圖解的形式先演示入隊的過程: 初始狀態,佇列中沒有結點,此時head==tail,指向一個空結點。 有第一個結點要入隊,通過自旋嘗試入隊,此時q為p.next,即為null,那麼就嘗試更新p.next為要新增的結點,如果p.next更新成功,入隊成功,再判斷p與t是否相同,即是否需要嘗試更新tail(p!=t說明tail沒有指向隊尾),然後結束入隊操作;更新p.next失敗則繼續嘗試,直到成功為止(如上右圖所示)。 再有第二個結點入隊,得到如上圖所示,此時q==node1不為null,且p!=q,令p=q指向下個結點重新嘗試入隊。 此時q==null,嘗試更新p.next為要新增的結點,如果p.next更新成功,入隊成功(如上右圖所示);失敗則繼續嘗試。 此時判斷p!=t,說明tail的指向已經滯後了,沒有指向隊尾結點,可以嘗試更新了,更新成不成功都沒有關係,因為不成功也沒事,不成功說明有其他執行緒已經搶先更新過了。成功則tail指向新增結點2. 再接下來,入隊結點3,此時p=t=tail,q為null,與加入第一個結點過程相同,嘗試更新p.next為要新增的結點,成功則結束入隊操作;失敗則迴圈繼續嘗試。 繼續入隊結點4,此時q==node3不為null,且p!=q,令p=q重新嘗試入隊。 此時q==null,嘗試更新p.next為要新增的結點,這裡假定更新失敗,即有其他執行緒搶先入隊了結點x,且tail也被更新。 此時p與t不相同,且t與tail也不相同,即tail已經改變,此時結點4要入隊只能在新的tail之後去嘗試入隊,因此直接令p=tail去繼續嘗試入隊。 到此在重複前面的入隊步驟,q==null,嘗試更新p.next為node4.成功則結束。 出隊的過程,以上面入完5個結點開始出隊過程的分析: 結點1開始出隊,此時p=h=head,p.item==null,q=p.next;則可知head結點現在是滯後狀態,指向的並不是隊首結點,需要查詢隊首結點,令p=q。 這時p.item!=null,嘗試將p.item的值更新為null,因為head之後第一個item不為null的結點即是隊首結點,也就是要移除出隊的結點,而要被移除的結點的item要被標記成null值,標記成功說明該結點可以刪除出隊了;若嘗試更新失敗,說明被其他執行緒搶先出隊,那麼就重複上一步繼續查詢新隊首,再嘗試出隊操作。 若p.item更新成功則判斷此時p與h是否相同,若是相同則直接返回item;若是不相同,說明head此時已經滯後了,那麼可以嘗試更新head(head若是更新成功則h結點的next指向h自身,說明該結點已不再佇列中)。 到此,第一個結點的移除就結束了。 此時,再繼續移除隊首結點2,如上圖所示,有p=h=head,p.item為null,q=p.next且不為null(有後繼結點),令p=q往後繼續查詢隊首。 此時p.item=2,不為null,說明找到隊首,可以嘗試更新結點2的item值,假定此時更新失敗,則說明結點2被其他執行緒搶先移除出隊了,那麼此時需要繼續查詢佇列中第一個item不為null的結點來出隊。 到此則有p指向node3,此時p.item依舊不為null,則可以執行更新結點3的item,若是更新成功,且head更新失敗,則可得到如下圖所示結果。 若是繼續移除結點x,那麼就需要重head開始,遍歷到結點x出才可能執行移除出隊操作,我們假定在遍歷時(在p=h=head之後,結點x正好被移除),有其它執行緒搶先移除了結點x,並且更新了head的位置,且原本的h的nexr指向h自身。 此時有p.item==null,且p==q且不為null,那麼需要重新獲取p=h=head,得到如下圖所示結果。 到此,又回到移除隊首的初始狀態,此時p.item==null,令p=q獲取下個結點。 此時,p.item不為null,那麼久要嘗試更新p.item,假設更新失敗,那麼此時q=p.next為null,即node4為隊尾結點且已經被其他執行緒搶先移除出隊了,那麼能做的只剩嘗試更新head結點,並且返回null了(佇列中沒有結點可以移除了,只能返回null)。 從這裡還可以看出在 ConcurrentLinkedQueue中head是可以在tail的後面的,這是由於head和tail的滯後性帶來的影響。 7.其他的方法
//peek的原理與poll差不多,只是peek中獲取到隊首後,不去進行CAS的更新item操作
//只將item值返回即可
public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
//判斷是不是隊首結點,即通過item是否為null,判斷當前結點
//是否已被移除出隊
//判斷p.next是否為null,則是為判斷佇列是否是空佇列
//若是空佇列也可結束了
if (item != null || (q = p.next) == null) {
updateHead(h, p); //嘗試更新滯後的head
return item;
}
//判斷結點是否已經被移除出隊,是的話要重新獲取head來查詢隊首
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
//統計佇列中的元素個數,瞬時值,不能太過依賴
public int size() {
int count = 0;
//獲取隊首結點,然後遍歷佇列挨個統計
for (Node<E> p = first(); p != null; p = succ(p))
//判斷結點是不是要被移除,或已被移除(item為null,說明是被遺棄的結點,不需要統計)
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
//獲取隊首元素,與peek基本一致,只不過返回的是結點,而peek返回的是item
Node<E> first() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
return hasItem ? p : null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
//獲取後繼結點,若是後繼結點是自身(已被移除),那麼返回head
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
return (p == next) ? head : next;
}
二、ConcurrentLinkedDueue併發容器
1.ConcurrentLinkedDueue
ConcurrentLinkedDueue的底層資料實現與ConcurrentLinkedQueue類似,都是連結串列,不同的是ConcurrentLinkedDueue是雙向連結串列,因此ConcurrentLinkedDueue既可以當做佇列也可當做棧來使用。並且ConcurrentLinkedDueue實現執行緒安全的,非阻塞的方式與ConcurrentLinkedQueue一樣都是採用CAS演算法。若ConcurrentLinkedDueue當做佇列使用那麼與ConcurrentLinkedQueue沒有區別,效率也相同,原始碼也十分類似,這裡就不做過多分析。