1. 程式人生 > >Java同步佇列(非阻塞佇列與阻塞佇列)——java併發容器

Java同步佇列(非阻塞佇列與阻塞佇列)——java併發容器

在併發程式設計中,有時候需要使用執行緒安全的佇列。如果要實現一個執行緒安全的佇列有兩種方式:一種是使用阻塞演算法,另一種是使用非阻塞演算法。

使用阻塞演算法的佇列可以用一個鎖(入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實現。

非阻塞的實現方式則可以使用迴圈CAS的方式來實現。

ConcurrentLinkedQueue

我們一起來研究一下如何使用非阻的方式來實現執行緒安全佇列ConcurrentLinkedQueue的
ConcurrentLinkedQueue是一個基於連結節點的無界執行緒安全佇列,它採用先進先出的規則對節點進行排序,當我們新增一個元素的時候,它會新增到佇列的尾部;當我們獲取一個元

素時,它會返回佇列頭部的元素。

ConcurrentLinkedQueue結構

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {
    private transient volatile Node<E> head;//頭指標
    private transient volatile Node<E> tail;//尾指標
    public ConcurrentLinkedQueue() {//初始化,head=tail=(一個空的頭結點)
        head = tail = new Node<E>(null);
    }
    private static class Node<E> {
        volatile E item;
        volatile Node<E> next;//內部是使用單向連結串列實現
        ......
    }
    ......
}

入隊

    public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);//入隊前,建立一個新節點

        for (Node<E> t = tail, p = t;;) {//除非插入成功並返回,否則反覆迴圈
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                if (p.casNext(null, newNode)) {//利用CAS操作,將p的next指標從舊值null更新為newNode 
                    if (p != t) // hop two nodes at a time
                        casTail(t, newNode);  // Failure is OK.利用CAS操作更新tail,如果失敗說明其他執行緒添加了元素,由其他執行緒負責更新tail
                    return true;
                }
                // Lost CAS race to another thread; re-read next 如果新增元素失敗,說明其他執行緒添加了元素,p後移,並繼續嘗試
            }
            else if (p == q) //如果p被移除出連結串列,我們需要調整指標重新指向head,否則我們指向新的tail
                p = (t != (t = tail)) ? t : head;
            else
                //p指向tail或者q
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

casTail(cmp,value)方法用於更新tail節點。tail被設定為volatile保證可見性。

p.casNext(cmp,value)方法用於將入隊節點設定為當前佇列尾節點的next節點。value也被設定為volatile。

對於出隊操作,也是使用CAS的方式迴圈嘗試將元素從頭部移除。

因為採用CAS操作,允許多個執行緒併發執行,並且不會因為加鎖而阻塞執行緒,使得併發效能更好。

Java中的阻塞佇列

本節將介紹什麼是阻塞佇列,以及Java中阻塞佇列的4種處理方式,並介紹Java 7中提供的7種阻塞佇列,最後分析阻塞佇列的一種實現方式。

什麼是阻塞佇列

阻塞佇列(BlockingQueue)是一個支援兩個附加操作的佇列。這兩個附加的操作支援阻塞的插入和移除方法。
1)支援阻塞的插入方法:意思是當佇列滿時,佇列會阻塞插入元素的執行緒,直到佇列不滿。
2)支援阻塞的移除方法:意思是在佇列為空時,獲取元素的執行緒會等待佇列變為非空。
阻塞佇列常用於生產者和消費者的場景,生產者是向佇列裡新增元素的執行緒,消費者是從佇列裡取元素的執行緒。阻塞佇列就是生產者用來存放元素、消費者用來獲取元素的容器。

JDK 7提供了7個阻塞佇列,如下。
·ArrayBlockingQueue:一個由陣列結構組成的有界阻塞佇列。
·LinkedBlockingQueue:一個由連結串列結構組成的有界阻塞佇列。
·PriorityBlockingQueue:一個支援優先順序排序的無界阻塞佇列。
·DelayQueue:一個使用優先順序佇列實現的無界阻塞佇列。
·SynchronousQueue:一個不儲存元素的阻塞佇列。
·LinkedTransferQueue:一個由連結串列結構組成的無界阻塞佇列。
·LinkedBlockingDeque:一個由連結串列結構組成的雙向阻塞佇列。

內容來自:

《併發程式設計的藝術》