ConcurrentLinkedQueue 非阻塞佇列
要實現一個執行緒安全的佇列有兩種方式:阻塞和非阻塞。阻塞佇列無非就是鎖的應用,而非阻塞則是CAS演算法的應用。下面我們就開始一個非阻塞演算法的研究:CoucurrentLinkedQueue。
ConcurrentLinkedQueue是一個基於連結節點的無邊界的執行緒安全佇列,它採用FIFO原則對元素進行排序。採用“wait-free”演算法(即CAS演算法)來實現的。
CoucurrentLinkedQueue規定了如下幾個不變性:
- 在入隊的最後一個元素的next為null
- 佇列中所有未刪除的節點的item都不能為null且都能從head節點遍歷到
- 對於要刪除的節點,不是直接將其設定為null,而是先將其item域設定為null(迭代器會跳過item為null的節點)
- 允許head和tail更新滯後。這是什麼意思呢?意思就說是head、tail不總是指向第一個元素和最後一個元素(後面闡述)。
head的不變性和可變性:
- 不變性
- 所有未刪除的節點都可以通過head節點遍歷到
- head不能為null
- head節點的next不能指向自身
- 可變性
- head的item可能為null,也可能不為null 2.允許tail滯後head,也就是說呼叫succc()方法,從head不可達tail
tail的不變性和可變性
- 不變性
- tail不能為null
- 可變性
- tail的item可能為null,也可能不為null
- tail節點的next域可以指向自身 3.允許tail滯後head,也就是說呼叫succc()方法,從head不可達tail
這些特性是否已經暈了?沒關係,我們看下面的原始碼分析就可以理解這些特性了。
ConcurrentLinkedQueue原始碼分析
CoucurrentLinkedQueue的結構由head節點和tail節點組成,每個節點由節點元素item和指向下一個節點的next引用組成,而節點與節點之間的關係就是通過該next關聯起來的,從而組成一張連結串列的佇列。節點Node為ConcurrentLinkedQueue的內部類,定義如下:
private static class Node<E> { /** 節點元素域 */ volatile E item; volatile Node<E> next; //初始化,獲得item 和 next 的偏移量,為後期的CAS做準備 Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; /** 偏移量 */ private static final long itemOffset; /** 下一個元素的偏移量 */ private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } |
入列
入列,我們認為是一個非常簡單的過程:tail節點的next執行新節點,然後更新tail為新節點即可。從單執行緒角度我們這麼理解應該是沒有問題的,但是多執行緒呢?如果一個執行緒正在進行插入動作,那麼它必須先獲取尾節點,然後設定尾節點的下一個節點為當前節點,但是如果已經有一個執行緒剛剛好完成了插入,那麼尾節點是不是發生了變化?對於這種情況ConcurrentLinkedQueue怎麼處理呢?我們先看原始碼:
offer(E e):將指定元素插入都佇列尾部:
public boolean offer(E e) { //檢查節點是否為null checkNotNull(e); // 建立新節點 final Node<E> newNode = new Node<E>(e); //死迴圈 直到成功為止 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; // q == null 表示 p已經是最後一個節點了,嘗試加入到佇列尾 // 如果插入失敗,則表示其他執行緒已經修改了p的指向 if (q == null) { // --- 1 // casNext:t節點的next指向當前節點 // casTail:設定tail 尾節點 if (p.casNext(null, newNode)) { // --- 2 // node 加入節點後會導致tail距離最後一個節點相差大於一個,需要更新tail if (p != t) // --- 3 casTail(t, newNode); // --- 4 return true; } } // p == q 等於自身 else if (p == q) // --- 5 // p == q 代表著該節點已經被刪除了 // 由於多執行緒的原因,我們offer()的時候也會poll(),如果offer()的時候正好該節點已經poll()了 // 那麼在poll()方法中的updateHead()方法會將head指向當前的q,而把p.next指向自己,即:p.next == p // 這樣就會導致tail節點滯後head(tail位於head的前面),則需要重新設定p p = (t != (t = tail)) ? t : head; // --- 6 // tail並沒有指向尾節點 else // tail已經不是最後一個節點,將p指向最後一個節點 p = (p != t && t != (t = tail)) ? t : q; // --- 7 } } |
光看原始碼還是有點兒迷糊的,插入節點一次分析就會明朗很多。
初始化
ConcurrentLinkedQueue初始化時head、tail儲存的元素都為null,且head等於tail:
新增元素A
按照程式分析:第一次插入元素A,head = tail = dummyNode,所有q = p.next = null,直接走步驟2:p.casNext(null, newNode),由於 p == t成立,所以不會執行步驟3:casTail(t, newNode),直接return。插入A節點後如下:
新增元素B
q = p.next = A ,p = tail = dummyNode,所以直接跳到步驟7:p = (p != t && t != (t = tail)) ? t : q;。此時p = q,然後進行第二次迴圈 q = p.next = null,步驟2:p == null成立,將該節點插入,因為p = q,t = tail,所以步驟3:p != t 成立,執行步驟4:casTail(t, newNode),然後return。如下:
新增節點C
此時t = tail ,p = t,q = p.next = null,和插入元素A無異,如下:
這裡整個offer()過程已經分析完成了,可能p == q 有點兒難理解,p 不是等於q.next麼,怎麼會有p == q呢?這個疑問我們在出列poll()中分析
出列
ConcurrentLinkedQueue提供了poll()方法進行出列操作。入列主要是涉及到tail,出列則涉及到head。我們先看原始碼:
public E poll() { // 如果出現p被刪除的情況需要從head重新開始 restartFromHead: // 這是什麼語法?真心沒有見過 for (;;) { for (Node<E> h = head, p = h, q;;) { // 節點 item E item = p.item; // item 不為null,則將item 設定為null if (item != null && p.casItem(item, null)) { // --- 1 // p != head 則更新head if (p != h) // --- 2 // p.next != null,則將head更新為p.next ,否則更新為p updateHead(h, ((q = p.next) != null) ? q : p); // --- 3 return item; } // p.next == null 佇列為空 else if ((q = p.next) == null) { // --- 4 updateHead(h, p); return null; } // 當一個執行緒在poll的時候,另一個執行緒已經把當前的p從佇列中刪除——將p.next = p,p已經被移除不能繼續,需要重新開始 else if (p == q) // --- 5 continue restartFromHead; else p = q; // --- 6 } } } |
這個相對於offer()方法而言會簡單些,裡面有一個很重要的方法:updateHead(),該方法用於CAS更新head節點,如下:
final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); } |
我們先將上面offer()的連結串列poll()掉,新增A、B、C節點結構如下:
poll A
head = dumy,p = head, item = p.item = null,步驟1不成立,步驟4:(q = p.next) == null不成立,p.next = A,跳到步驟6,下一個迴圈,此時p = A,所以步驟1 item != null,進行p.casItem(item, null)成功,此時p == A != h,所以執行步驟3:updateHead(h, ((q = p.next) != null) ? q : p),q = p.next = B != null,則將head CAS更新成B,如下:
poll B
head = B , p = head = B,item = p.item = B,步驟成立,步驟2:p != h 不成立,直接return,如下:
poll C
head = dumy ,p = head = dumy,tiem = p.item = null,步驟1不成立,跳到步驟4:(q = p.next) == null,不成立,然後跳到步驟6,此時,p = q = C,item = C(item),步驟1成立,所以講C(item)設定為null,步驟2:p != h成立,執行步驟3:updateHead(h, ((q = p.next) != null) ? q : p),如下:
看到這裡是不是一目瞭然了,在這裡我們再來分析offer()的步驟5:
else if(p == q){ p = (t != (t = tail))? t : head; } |
ConcurrentLinkedQueue中規定,p == q表明,該節點已經被刪除了,也就說tail滯後於head,head無法通過succ()方法遍歷到tail,怎麼做? (t != (t = tail))? t : head;(這段程式碼的可讀性實在是太差了,真他媽難理解:不知道是否可以理解為t != tail ? tail : head)這段程式碼主要是來判讀tail節點是否已經發生了改變,如果發生了改變,則說明tail已經重新定位了,只需要重新找到tail即可,否則就只能指向head了。
就上面那個我們再次插入一個元素D。則p = head,q = p.next = null,執行步驟1: q = null且 p != t ,所以執行步驟4:,如下:
再插入元素E,q = p.next = null,p == t,所以插入E後如下:
到這裡ConcurrentLinkedQueue的整個入列、出列都已經分析完畢了,對於ConcurrentLinkedQueue LZ真心感覺難看懂,看懂之後也感嘆設計得太精妙了,利用CAS來完成資料操作,同時允許佇列的不一致性,這種弱一致性確實是非常強大。再次感嘆Doug Lea的天才