1. 程式人生 > >併發容器學習—ConcurrentLinkedQueue和ConcurrentLinkedDuque

併發容器學習—ConcurrentLinkedQueue和ConcurrentLinkedDuque

一、ConcurrentLinkedQueue併發容器 1. ConcurrentLinkedQueue的底層資料結構     ConcurrentLinkedQueue是一個底層基於連結串列實現的無界且執行緒安全的佇列。遵循先進先出(FIFO)的原則 佇列的頭部是佇列中時間最長的元素。佇列的尾部是佇列中時間最短的元素。它採用CAS演算法來實現同步,是個非阻塞的佇列。     底層連結串列由一個個Node結點組成,Node的定義如下:  
private static class Node<E> {
    volatile E item;    //存放資料
    volatile Node<E> next;    //指向下個結點

    //構造方法
    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);    //unsafe操作賦值
    }


    //CAS方式嘗試更新資料
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    
    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    //CAS方式更新下個結點地址
    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;    //item的記憶體地址偏移量
    private static final long nextOffset;    //next的記憶體地址偏移量

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

 

2. ConcurrentLinkedQueue的繼承關係     瞭解了底層的基本實現,再來看看 ConcurrentLinkedQueue的繼承關係,如下圖所示, ConcurrentLinkedQueue繼承了AbstractQueue即實現了Queue介面。     之前在ArrayList及LinkedList的學習時 Queue及 AbstractCollection都已學過,不在贅言,直接來看 AbstractQueue的原始碼:
 
public abstract class AbstractQueue<E>
    extends AbstractCollection<E>
    implements Queue<E> {

    protected AbstractQueue() {
    }

    //向佇列末尾新增e元素
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

    //刪除隊首元素,並將其返回
    public E remove() {
        E x = poll();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }
    
    //獲取隊首元素,但不移除出佇列
    public E element() {
        E x = peek();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }

    //清空佇列中的所有元素
    public void clear() {
        while (poll() != null)
            ;
    }

    //將集合c中的所有元素一次新增到佇列末尾
    public boolean addAll(Collection<? extends E> c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        boolean modified = false;
        for (E e : c)
            if (add(e))
                modified = true;
        return modified;
    }

}

 

3. 重要屬性及構造方法     瞭解了 ConcurrentLinkedQueue的繼承關係,再來看構造方法和一些重要的屬性  
//底層連結串列的頭結點
private transient volatile Node<E> head;

//底層連結串列的尾結點
private transient volatile Node<E> tail;

//空構造,建立了一個空佇列
public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

//以結合c中的元素建立一個佇列
public ConcurrentLinkedQueue(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) {
        checkNotNull(e);
        Node<E> newNode = new Node<E>(e);
        if (h == null)
            h = t = newNode;
        else {
            t.lazySetNext(newNode);
            t = newNode;
        }
    }
    if (h == null)
        h = t = new Node<E>(null);
    head = h;
    tail = t;
}

 

  4.入隊的實現     佇列中新增的方法有兩個,分別add和offer,效果沒有什麼區別,接下來看看實現的過程:  
//由原始碼可見,add的本質還是呼叫了offer方法
public boolean add(E e) {
    return offer(e);
}


public boolean offer(E e) {
    //判斷待新增的元素是否為null,說明ConcurrentLinkedQueue中不允許null元素
    checkNotNull(e);    
    final Node<E> newNode = new Node<E>(e);    //新建結點

    
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        //判斷p是否為尾結點
        if (q == null) {
            //CAS方式嘗試更新p結點的next結點為newNode結點,失敗的話繼續迴圈嘗試
            if (p.casNext(null, newNode)) {
                //p的next結點更新成功,說明佇列尾結點改變了就繼續嘗試更新tail的值
                //這裡判斷p!=t,說明tail不是實際的尾結點,應該要更新了,但並不強制
                //要求一定要更新成功,即不要求tail一定要指向佇列的尾結點,允許tail滯後
                //於真正的尾結點
                if (p != t) 
                    casTail(t, newNode);  //更新tail,失敗也沒關係
                return true;
            }
        }
        else if (p == q)

            /** 
            * p == q說明當前p結點已經被移除出隊了,需要重新獲取head來進行入隊操作
            * 
            * 對於已經移除出隊的元素,會將next置為本身,
            * 用於判斷當前元素已經出隊,接著從head繼續遍歷。 
            * 
            * 在整個offer方法的執行過程中,p一定是等於t或者在t的後面的, 
            * 因此如果p已經不在佇列中的話,t也一定不在佇列中了(FIFO)。 
            * 
            * 所以重新讀取一次tail到快照t, 
            * 如果t未發生變化,就從head開始繼續下去。 
            * 否則讓p從新的t開始繼續嘗試入隊是一個更好的選擇(此時新的t很可能在head後面) 
            */
            p = (t != (t = tail)) ? t : head;
        else
            /** 
            * 若p與t相等,則讓p指向next結點。 
            * 若p和t不相等,則說明已經經歷多次入隊失敗了(可能被插隊了), 
            * 則重新讀取一次tail到t,如果t發生了變化(確實被插隊了),則從t開始再次嘗試入隊。 
            */
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

 

  5.出隊的過程  
public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            //判斷item是否為null,即判斷p結點是否要被移除出隊
            //若item不為null,則嘗試更新item為null,
            //因為item若為null表示結點標記為要被移除
            if (item != null && p.casItem(item, null)) {
                //判斷p與h是否還相同
                //p與h不相同,說明head可能滯後,即head可能已經不是指向隊首結點,
                //嘗試更新head為p.next(p.next若為null,則說明p為隊尾了,head只能更新為p)
                //p與h相同則直接返回
                if (p != h) 
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }

            //判斷p是否為隊尾,也就是佇列是否已經空了
            //若佇列已經空了,則嘗試更新head為p
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            //p的next結點若是存在,還需要判斷是否在佇列中
            //若p==q,說明p已經不再佇列中了,此時需要重新獲取head
            //的快照h,並讓p=h,嘗試移除結點
            else if (p == q)
                continue restartFromHead;
            else
                p = q;    // 繼續向後走一個節點嘗試移除結點
        }
    }
}


final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);    //h的next結點設定為自身
}

 

  6.出入隊的過程     ConcurrentLinkedQueue的出入隊操作並不是使用加鎖的方式實現的執行緒安全,而是通過無鎖的CAS演算法實現的,這就使得其程式碼實現雖然簡單,但理解起來晦澀難懂。 ConcurrentLinkedQueue是不允許入隊元素時null值的(結點的item不能為null),因為 ConcurrentLinkedQueue對已出隊的結點會將item賦上null值。也就是說某個結點的item若為null,則說明該結點是要被刪除的結,那麼久可以將其重佇列中移除了。 ConcurrentLinkedQueue除了對結點有以上要求外,其自身則有如下特點:     1.佇列中的所有結點在任意時刻只有最後一個結點的next是為null的。     2.要求head和tail屬性不能是null(可以是空結點,即item和next為null)。     3.head和tail具有滯後性,head指向的不一定是隊首結點,tail指向的也不一定是隊尾結點。       下面以圖解的形式先演示入隊的過程:     初始狀態,佇列中沒有結點,此時head==tail,指向一個空結點。         有第一個結點要入隊,通過自旋嘗試入隊,此時q為p.next,即為null,那麼就嘗試更新p.next為要新增的結點,如果p.next更新成功,入隊成功,再判斷p與t是否相同,即是否需要嘗試更新tail(p!=t說明tail沒有指向隊尾),然後結束入隊操作;更新p.next失敗則繼續嘗試,直到成功為止(如上右圖所示)。     再有第二個結點入隊,得到如上圖所示,此時q==node1不為null,且p!=q,令p=q指向下個結點重新嘗試入隊。        此時q==null,嘗試更新p.next為要新增的結點,如果p.next更新成功,入隊成功(如上右圖所示);失敗則繼續嘗試。     此時判斷p!=t,說明tail的指向已經滯後了,沒有指向隊尾結點,可以嘗試更新了,更新成不成功都沒有關係,因為不成功也沒事,不成功說明有其他執行緒已經搶先更新過了。成功則tail指向新增結點2.     再接下來,入隊結點3,此時p=t=tail,q為null,與加入第一個結點過程相同,嘗試更新p.next為要新增的結點,成功則結束入隊操作;失敗則迴圈繼續嘗試。     繼續入隊結點4,此時q==node3不為null,且p!=q,令p=q重新嘗試入隊。     此時q==null,嘗試更新p.next為要新增的結點,這裡假定更新失敗,即有其他執行緒搶先入隊了結點x,且tail也被更新。     此時p與t不相同,且t與tail也不相同,即tail已經改變,此時結點4要入隊只能在新的tail之後去嘗試入隊,因此直接令p=tail去繼續嘗試入隊。     到此在重複前面的入隊步驟,q==null,嘗試更新p.next為node4.成功則結束。       出隊的過程,以上面入完5個結點開始出隊過程的分析:     結點1開始出隊,此時p=h=head,p.item==null,q=p.next;則可知head結點現在是滯後狀態,指向的並不是隊首結點,需要查詢隊首結點,令p=q。     這時p.item!=null,嘗試將p.item的值更新為null,因為head之後第一個item不為null的結點即是隊首結點,也就是要移除出隊的結點,而要被移除的結點的item要被標記成null值,標記成功說明該結點可以刪除出隊了;若嘗試更新失敗,說明被其他執行緒搶先出隊,那麼就重複上一步繼續查詢新隊首,再嘗試出隊操作。     若p.item更新成功則判斷此時p與h是否相同,若是相同則直接返回item;若是不相同,說明head此時已經滯後了,那麼可以嘗試更新head(head若是更新成功則h結點的next指向h自身,說明該結點已不再佇列中)。       到此,第一個結點的移除就結束了。     此時,再繼續移除隊首結點2,如上圖所示,有p=h=head,p.item為null,q=p.next且不為null(有後繼結點),令p=q往後繼續查詢隊首。     此時p.item=2,不為null,說明找到隊首,可以嘗試更新結點2的item值,假定此時更新失敗,則說明結點2被其他執行緒搶先移除出隊了,那麼此時需要繼續查詢佇列中第一個item不為null的結點來出隊。     到此則有p指向node3,此時p.item依舊不為null,則可以執行更新結點3的item,若是更新成功,且head更新失敗,則可得到如下圖所示結果。     若是繼續移除結點x,那麼就需要重head開始,遍歷到結點x出才可能執行移除出隊操作,我們假定在遍歷時(在p=h=head之後,結點x正好被移除),有其它執行緒搶先移除了結點x,並且更新了head的位置,且原本的h的nexr指向h自身。     此時有p.item==null,且p==q且不為null,那麼需要重新獲取p=h=head,得到如下圖所示結果。     到此,又回到移除隊首的初始狀態,此時p.item==null,令p=q獲取下個結點。     此時,p.item不為null,那麼久要嘗試更新p.item,假設更新失敗,那麼此時q=p.next為null,即node4為隊尾結點且已經被其他執行緒搶先移除出隊了,那麼能做的只剩嘗試更新head結點,並且返回null了(佇列中沒有結點可以移除了,只能返回null)。     從這裡還可以看出在 ConcurrentLinkedQueue中head是可以在tail的後面的,這是由於head和tail的滯後性帶來的影響。   7.其他的方法     
//peek的原理與poll差不多,只是peek中獲取到隊首後,不去進行CAS的更新item操作
//只將item值返回即可
public E peek() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            //判斷是不是隊首結點,即通過item是否為null,判斷當前結點
            //是否已被移除出隊
            //判斷p.next是否為null,則是為判斷佇列是否是空佇列
            //若是空佇列也可結束了
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);    //嘗試更新滯後的head
                return item;
            }
            //判斷結點是否已經被移除出隊,是的話要重新獲取head來查詢隊首
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

//統計佇列中的元素個數,瞬時值,不能太過依賴
public int size() {
    int count = 0;
    //獲取隊首結點,然後遍歷佇列挨個統計
    for (Node<E> p = first(); p != null; p = succ(p))
        //判斷結點是不是要被移除,或已被移除(item為null,說明是被遺棄的結點,不需要統計)
        if (p.item != null)    
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}

//獲取隊首元素,與peek基本一致,只不過返回的是結點,而peek返回的是item
Node<E> first() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) {
                updateHead(h, p);
                return hasItem ? p : null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

//獲取後繼結點,若是後繼結點是自身(已被移除),那麼返回head
final Node<E> succ(Node<E> p) {
    Node<E> next = p.next;
    return (p == next) ? head : next;
}

二、ConcurrentLinkedDueue併發容器

1.ConcurrentLinkedDueue

    ConcurrentLinkedDueue的底層資料實現與ConcurrentLinkedQueue類似,都是連結串列,不同的是ConcurrentLinkedDueue是雙向連結串列,因此ConcurrentLinkedDueue既可以當做佇列也可當做棧來使用。並且ConcurrentLinkedDueue實現執行緒安全的,非阻塞的方式與ConcurrentLinkedQueue一樣都是採用CAS演算法。若ConcurrentLinkedDueue當做佇列使用那麼與ConcurrentLinkedQueue沒有區別,效率也相同,原始碼也十分類似,這裡就不做過多分析。