1. 程式人生 > >java併發包詳解-3

java併發包詳解-3

 一 JDK 提供的併發容器總結

二 ConcurrentHashMap

三 CopyOnWriteArrayList

3.1 CopyOnWriteArrayList 簡介

3.2 CopyOnWriteArrayList 是如何做到的?

3.3 CopyOnWriteArrayList 讀取和寫入原始碼簡單分析

3.3.1 CopyOnWriteArrayList 讀取操作的實現

3.3.2 CopyOnWriteArrayList 寫入操作的實現

四 ConcurrentLinkedQueue

五 BlockingQueue

5.1 BlockingQueue 簡單介紹

5.2 ArrayBlockingQueue

5.3 LinkedBlockingQueue

5.4 PriorityBlockingQueue

 

 

  • ConcurrentHashMap: 執行緒安全的HashMap

  • CopyOnWriteArrayList: 執行緒安全的List,在讀多寫少的場合效能非常好,遠遠好於Vector.

  • ConcurrentLinkedQueue:高效的併發佇列,使用連結串列實現。可以看做一個執行緒安全的 LinkedList,這是一個非阻塞佇列。

  • BlockingQueue:

     這是一個介面,JDK內部通過連結串列、陣列等方式實現了這個介面。表示阻塞佇列,非常適合用於作為資料共享的通道。

JDK1.7

 

jdk 1.8示例圖:

 

 

ConcurrentHashMap取消了Segment分段鎖,採用CAS和synchronized來保證併發安全。資料結構跟HashMap1.8的結構類似,陣列+連結串列/紅黑二叉樹。

synchronized只鎖定當前連結串列或紅黑二叉樹的首節點,這樣只要hash不衝突,就不會產生併發,效率又提升N倍。

3.1 CopyOnWriteArrayList 簡介

public class CopyOnWriteArrayList<E>
extends Object
implements List<E>, RandomAccess, Cloneable, Serializable

在很多應用場景中,讀操作可能會遠遠大於寫操作。由於讀操作根本不會修改原有的資料,因此對於每次讀取都進行加鎖其實是一種資源浪費。我們應該允許多個執行緒同時訪問List的內部資料,畢竟讀取操作是安全的。

這和我們之前在多執行緒章節講過 ReentrantReadWriteLock 讀寫鎖的思想非常類似,也就是讀讀共享、寫寫互斥、讀寫互斥、寫讀互斥。JDK中提供了 CopyOnWriteArravList 類比相比於在讀寫鎖的思想又更進一步。為了將讀取的效能發揮到極致,CopyOnWriteArravList 讀取是完全不用加鎖的,並且更厲害的是:寫入也不會阻塞讀取操作。只有寫入和寫入之間需要進行同步等待。這樣一來,讀操作的效能就會大幅度提升。那它是怎麼做的呢?

3.2 CopyOnWriteArravList 是如何做到的?

CopyOnWriteArravList 類的所有可變操作(add,set等等)都是通過建立底層陣列的新副本來實現的。當 List 需要被修改的時候,我並不修改原有內容,而是對原有資料進行一次複製,將修改的內容寫入副本。寫完之後,再將修改完的副本替換原來的資料,這樣就可以保證寫操作不會影響讀操作了。

從 CopyOnWriteArravList 的名字就能看出CopyOnWriteArravList 是滿足CopyOnWrite 的ArrayList,所謂CopyOnWrite 也就是說:在計算機,如果你想要對一塊記憶體進行修改時,我們不在原有記憶體塊中進行寫操作,而是將記憶體拷貝一份,在新的記憶體中進行寫操作,寫完之後呢,就將指向原來記憶體指標指向新的記憶體,原來的記憶體就可以被回收掉了。

 

3.3 CopyOnWriteArrayList 讀取和寫入原始碼簡單分析

3.3.1 CopyOnWriteArravList 讀取操作的實現

讀取操作沒有任何同步控制和鎖操作,理由就是內部陣列 array 不會發生修改,只會被另外一個 array 替換,因此可以保證資料安全。

    /** The array, accessed only via getArray/setArray. */
    private transient volatile Object[] array;
    public E get(int index) {
        return get(getArray(), index);
    }
    @SuppressWarnings("unchecked")
    private E get(Object[] a, int index) {
        return (E) a[index];
    }
    final Object[] getArray() {
        return array;
    }

3.3.2 CopyOnWriteArravList 寫入操作的實現

CopyOnWriteArravList 寫入操作 add() 方法在新增元素的時候加了鎖,保證了同步,避免了多執行緒寫的時候會 copy 出多個副本出來。

    /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return {@code true} (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();//加鎖
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);//拷貝新陣列
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();//釋放鎖
        }
    }

四 ConcurrentLinkedQueue

Java提供的執行緒安全的 Queue 可以分為阻塞佇列非阻塞佇列,其中阻塞佇列的典型例子是 BlockingQueue,非阻塞佇列的典型例子是ConcurrentLinkedQueue,在實際應用中要根據實際需要選用阻塞佇列或者非阻塞佇列。 阻塞佇列可以通過加鎖來實現,非阻塞佇列可以通過 CAS 操作實現。

從名字可以看出,ConcurrentLinkedQueue這個佇列使用連結串列作為其資料結構.ConcurrentLinkedQueue 應該算是在高併發環境中效能最好的隊列了。它之所有能有很好的效能,是因為其內部複雜的實現。

ConcurrentLinkedQueue 內部程式碼我們就不分析了,大家知道ConcurrentLinkedQueue 主要使用 CAS 非阻塞演算法來實現執行緒安全就好了。

ConcurrentLinkedQueue 適合在對效能要求相對較高,同時對佇列的讀寫存在多個執行緒同時進行的場景。

五 BlockingQueue

5.1 BlockingQueue 簡單介紹

上面我們己經提到了 ConcurrentLinkedQueue 作為高效能的非阻塞佇列。下面我們要講到的是阻塞佇列——BlockingQueue。阻塞佇列(BlockingQueue)被廣泛使用在“生產者-消費者”問題中,其原因是BlockingQueue提供了可阻塞的插入和移除的方法。當佇列容器已滿,生產者執行緒會被阻塞,直到佇列未滿;當佇列容器為空時,消費者執行緒會被阻塞,直至佇列非空時為止。

BlockingQueue 是一個介面,繼承自 Queue,所以其實現類也可以作為 Queue 的實現來使用,而 Queue 又繼承自 Collection 介面。下面是 BlockingQueue 的相關實現類:

BlockingQueue 的實現類

下面主要介紹一下:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue,這三個 BlockingQueue 的實現類。

5.2 ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 介面的有界佇列實現類,底層採用陣列來實現。ArrayBlockingQueue一旦建立,容量不能改變。其併發控制採用可重入鎖來控制,不管是插入操作還是讀取操作,都需要獲取到鎖才能進行操作。當佇列容量滿時,嘗試將元素放入佇列將導致操作阻塞;嘗試從一個空佇列中取一個元素也會同樣阻塞。

ArrayBlockingQueue 預設情況下不能保證執行緒訪問佇列的公平性,所謂公平性是指嚴格按照執行緒等待的絕對時間順序,即最先等待的執行緒能夠最先訪問到 ArrayBlockingQueue。而非公平性則是指訪問 ArrayBlockingQueue 的順序不是遵守嚴格的時間順序,有可能存在,當 ArrayBlockingQueue 可以被訪問時,長時間阻塞的執行緒依然無法訪問到 ArrayBlockingQueue。如果保證公平性,通常會降低吞吐量。如果需要獲得公平性的 ArrayBlockingQueue,可採用如下程式碼:

private static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true);

5.3 LinkedBlockingQueue

LinkedBlockingQueue 底層基於單向連結串列實現的阻塞佇列,可以當做無界佇列也可以當做有界佇列來使用,同樣滿足FIFO的特性,與ArrayBlockingQueue 相比起來具有更高的吞吐量,為了防止 LinkedBlockingQueue 容量迅速增,損耗大量記憶體。通常在建立LinkedBlockingQueue 物件時,會指定其大小,如果未指定,容量等於Integer.MAX_VALUE。

相關構造方法:

    /**
     *某種意義上的無界佇列
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    /**
     *有界佇列
     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
     *
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if {@code capacity} is not greater
     *         than zero
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

5.4 PriorityBlockingQueue

PriorityBlockingQueue 是一個支援優先順序的無界阻塞佇列。預設情況下元素採用自然順序進行排序,也可以通過自定義類實現 compareTo() 方法來指定元素排序規則,或者初始化時通過構造器引數 Comparator 來指定排序規則。

PriorityBlockingQueue 併發控制採用的是 ReentrantLock,佇列為無界佇列(ArrayBlockingQueue 是有界佇列,LinkedBlockingQueue 也可以通過在建構函式中傳入 capacity 指定佇列最大的容量,但是 PriorityBlockingQueue 只能指定初始的佇列大小,後面插入元素的時候,如果空間不夠的話會自動擴容)。

簡單地說,它就是 PriorityQueue 的執行緒安全版本。不可以插入 null 值,同時,插入佇列的物件必須是可比較大小的(comparable),否則報 ClassCastException 異常。它的插入操作 put 方法不會 block,因為它是無界佇列(take 方法在佇列為空的時候會阻塞)。