1. 程式人生 > >ConcurrentLinkedQueue 非阻塞佇列

ConcurrentLinkedQueue 非阻塞佇列

要實現一個執行緒安全的佇列有兩種方式:阻塞和非阻塞。阻塞佇列無非就是鎖的應用,而非阻塞則是CAS演算法的應用。下面我們就開始一個非阻塞演算法的研究:CoucurrentLinkedQueue。

ConcurrentLinkedQueue是一個基於連結節點的無邊界的執行緒安全佇列,它採用FIFO原則對元素進行排序。採用“wait-free”演算法(即CAS演算法)來實現的。

CoucurrentLinkedQueue規定了如下幾個不變性:

  1. 在入隊的最後一個元素的next為null
  2. 佇列中所有未刪除的節點的item都不能為null且都能從head節點遍歷到
  3. 對於要刪除的節點,不是直接將其設定為null,而是先將其item域設定為null(迭代器會跳過item為null的節點)
  4. 允許head和tail更新滯後。這是什麼意思呢?意思就說是head、tail不總是指向第一個元素和最後一個元素(後面闡述)。

head的不變性和可變性:

  • 不變性
    1. 所有未刪除的節點都可以通過head節點遍歷到
    2. head不能為null
    3. head節點的next不能指向自身
  • 可變性
    1. head的item可能為null,也可能不為null 2.允許tail滯後head,也就是說呼叫succc()方法,從head不可達tail

tail的不變性和可變性

  • 不變性
    1. tail不能為null
  • 可變性
    1. tail的item可能為null,也可能不為null
    2. tail節點的next域可以指向自身 3.允許tail滯後head,也就是說呼叫succc()方法,從head不可達tail

這些特性是否已經暈了?沒關係,我們看下面的原始碼分析就可以理解這些特性了。

ConcurrentLinkedQueue原始碼分析

CoucurrentLinkedQueue的結構由head節點和tail節點組成,每個節點由節點元素item和指向下一個節點的next引用組成,而節點與節點之間的關係就是通過該next關聯起來的,從而組成一張連結串列的佇列。節點Node為ConcurrentLinkedQueue的內部類,定義如下:

private static class Node<E> {
      /** 節點元素域 */
      volatile E item;
      volatile Node<E> next;

      //初始化,獲得item 和 next 的偏移量,為後期的CAS做準備

      Node(E item) {
          UNSAFE.putObject(this, itemOffset, item);
      }

      boolean casItem(E cmp, E val) {
          return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
      }

      void lazySetNext(Node<E> val) {
          UNSAFE.putOrderedObject(this, nextOffset, val);
      }

      boolean casNext(Node<E> cmp, Node<E> val) {
          return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
      }

      // Unsafe mechanics

      private static final sun.misc.Unsafe UNSAFE;
      /** 偏移量 */
      private static final long itemOffset;
      /** 下一個元素的偏移量 */

     private static final long nextOffset;

      static {
          try {
              UNSAFE = sun.misc.Unsafe.getUnsafe();
              Class<?> k = Node.class;
              itemOffset = UNSAFE.objectFieldOffset
                      (k.getDeclaredField("item"));
              nextOffset = UNSAFE.objectFieldOffset
                      (k.getDeclaredField("next"));
          } catch (Exception e) {
              throw new Error(e);
          }
      }
  }

入列

入列,我們認為是一個非常簡單的過程:tail節點的next執行新節點,然後更新tail為新節點即可。從單執行緒角度我們這麼理解應該是沒有問題的,但是多執行緒呢?如果一個執行緒正在進行插入動作,那麼它必須先獲取尾節點,然後設定尾節點的下一個節點為當前節點,但是如果已經有一個執行緒剛剛好完成了插入,那麼尾節點是不是發生了變化?對於這種情況ConcurrentLinkedQueue怎麼處理呢?我們先看原始碼:

offer(E e):將指定元素插入都佇列尾部:

public boolean offer(E e) {
        //檢查節點是否為null
        checkNotNull(e);
        // 建立新節點
        final Node<E> newNode = new Node<E>(e);

        //死迴圈 直到成功為止
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            // q == null 表示 p已經是最後一個節點了,嘗試加入到佇列尾
            // 如果插入失敗,則表示其他執行緒已經修改了p的指向
            if (q == null) {                                // --- 1
                // casNext:t節點的next指向當前節點
                // casTail:設定tail 尾節點
                if (p.casNext(null, newNode)) {             // --- 2
                    // node 加入節點後會導致tail距離最後一個節點相差大於一個,需要更新tail
                    if (p != t)                             // --- 3
                        casTail(t, newNode);                    // --- 4
                    return true;
                }
            }
            // p == q 等於自身
            else if (p == q)                                // --- 5
                // p == q 代表著該節點已經被刪除了
                // 由於多執行緒的原因,我們offer()的時候也會poll(),如果offer()的時候正好該節點已經poll()了
                // 那麼在poll()方法中的updateHead()方法會將head指向當前的q,而把p.next指向自己,即:p.next == p
                // 這樣就會導致tail節點滯後head(tail位於head的前面),則需要重新設定p
                p = (t != (t = tail)) ? t : head;           // --- 6
            // tail並沒有指向尾節點
            else
                // tail已經不是最後一個節點,將p指向最後一個節點
                p = (p != t && t != (t = tail)) ? t : q;    // --- 7
        }
    }

光看原始碼還是有點兒迷糊的,插入節點一次分析就會明朗很多。

初始化

ConcurrentLinkedQueue初始化時head、tail儲存的元素都為null,且head等於tail:

201703160001

新增元素A

按照程式分析:第一次插入元素A,head = tail = dummyNode,所有q = p.next = null,直接走步驟2:p.casNext(null, newNode),由於 p == t成立,所以不會執行步驟3:casTail(t, newNode),直接return。插入A節點後如下:

201703160002

新增元素B

q = p.next = A ,p = tail = dummyNode,所以直接跳到步驟7:p = (p != t && t != (t = tail)) ? t : q;。此時p = q,然後進行第二次迴圈 q = p.next = null,步驟2:p == null成立,將該節點插入,因為p = q,t = tail,所以步驟3:p != t 成立,執行步驟4:casTail(t, newNode),然後return。如下:201703160003

新增節點C

此時t = tail ,p = t,q = p.next = null,和插入元素A無異,如下:201703160004

這裡整個offer()過程已經分析完成了,可能p == q 有點兒難理解,p 不是等於q.next麼,怎麼會有p == q呢?這個疑問我們在出列poll()中分析

出列

ConcurrentLinkedQueue提供了poll()方法進行出列操作。入列主要是涉及到tail,出列則涉及到head。我們先看原始碼:

public E poll() {
        // 如果出現p被刪除的情況需要從head重新開始
        restartFromHead:        // 這是什麼語法?真心沒有見過
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {

                // 節點 item
                E item = p.item;

                // item 不為null,則將item 設定為null
                if (item != null && p.casItem(item, null)) {                    // --- 1
                    // p != head 則更新head
                    if (p != h)                                                 // --- 2
                        // p.next != null,則將head更新為p.next ,否則更新為p
                        updateHead(h, ((q = p.next) != null) ? q : p);          // --- 3
                    return item;
                }
                // p.next == null 佇列為空
                else if ((q = p.next) == null) {                                // --- 4
                    updateHead(h, p);
                    return null;
                }
                // 當一個執行緒在poll的時候,另一個執行緒已經把當前的p從佇列中刪除——將p.next = p,p已經被移除不能繼續,需要重新開始
                else if (p == q)                                                // --- 5
                    continue restartFromHead;
                else
                    p = q;                                                      // --- 6
            }
        }
    }

這個相對於offer()方法而言會簡單些,裡面有一個很重要的方法:updateHead(),該方法用於CAS更新head節點,如下:

final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

我們先將上面offer()的連結串列poll()掉,新增A、B、C節點結構如下:

201703160004_2

poll A

head = dumy,p = head, item = p.item = null,步驟1不成立,步驟4:(q = p.next) == null不成立,p.next = A,跳到步驟6,下一個迴圈,此時p = A,所以步驟1 item != null,進行p.casItem(item, null)成功,此時p == A != h,所以執行步驟3:updateHead(h, ((q = p.next) != null) ? q : p),q = p.next = B != null,則將head CAS更新成B,如下:

201703160005

poll B

head = B , p = head = B,item = p.item = B,步驟成立,步驟2:p != h 不成立,直接return,如下:

201703160006

poll C

head = dumy ,p = head = dumy,tiem = p.item = null,步驟1不成立,跳到步驟4:(q = p.next) == null,不成立,然後跳到步驟6,此時,p = q = C,item = C(item),步驟1成立,所以講C(item)設定為null,步驟2:p != h成立,執行步驟3:updateHead(h, ((q = p.next) != null) ? q : p),如下:

201703160007

看到這裡是不是一目瞭然了,在這裡我們再來分析offer()的步驟5:

else if(p == q){
	p = (t != (t = tail))? t : head;
}

ConcurrentLinkedQueue中規定,p == q表明,該節點已經被刪除了,也就說tail滯後於head,head無法通過succ()方法遍歷到tail,怎麼做? (t != (t = tail))? t : head;(這段程式碼的可讀性實在是太差了,真他媽難理解:不知道是否可以理解為t != tail ? tail : head)這段程式碼主要是來判讀tail節點是否已經發生了改變,如果發生了改變,則說明tail已經重新定位了,只需要重新找到tail即可,否則就只能指向head了。

就上面那個我們再次插入一個元素D。則p = head,q = p.next = null,執行步驟1: q = null且 p != t ,所以執行步驟4:,如下:

201703160008_2

再插入元素E,q = p.next = null,p == t,所以插入E後如下:

201703160009_2

到這裡ConcurrentLinkedQueue的整個入列、出列都已經分析完畢了,對於ConcurrentLinkedQueue LZ真心感覺難看懂,看懂之後也感嘆設計得太精妙了,利用CAS來完成資料操作,同時允許佇列的不一致性,這種弱一致性確實是非常強大。再次感嘆Doug Lea的天才