1. 程式人生 > >Java 集合框架分析:執行緒安全的集合

Java 集合框架分析:執行緒安全的集合

說明:同步集合是說它的操作是同步的,(mutative operations :add, set, and so on),但是它的組合操作的同步性要自己控制。

執行緒安全的集合

什麼是執行緒安全

在多執行緒環境下,不會產生不一致的行為(執行緒安全:有一定的標準,能夠斷定發生的先後順序,如先獲得鎖的會執行,然後….)

分類

1.Copy*
CopyOnWriteArraySet,CopyOnWriteArrayList

2.Blocking*
ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, LinkedTransferQueue, PriorityBlockingQueue, SynchronousQueue

3.Concurent*
ConcurentHashMap, ConcurrentSkipListMap,ConcurrentSkipListSet

CopyOnWriteArrayList

實現:同步的方式是通過在需要修改集合時,通過copy底層的集合資料,在這之上操作。具體表現為:
1.The “snapshot” style iterator method uses a reference to the state of the array at the point that the iterator was created,迭代器在迭代器被建立時的集合快照上迭代,. This array never changes during the lifetime of the iterator, so interference is impossible and the iterator is guaranteed not to throw ConcurrentModificationException。
2.The iterator will not reflect additions, removals, or changes to the list since the iterator was created. Element-changing operations on iterators themselves (remove, set, and add) are not supported. These methods throw UnsupportedOperationException。當迭代器建立後,集合上的任何變化都不會反映到迭代器上,通過迭代器進行修改集合會丟擲不支援異常。
簡單來說:迭代器在建立後就不變(所引用的集合元素是不變的),集合元素在迭代過程中是可變的。另外,因為迭代器在建立後所引用的集合元素不可變,即通過迭代器進行修改集合的操作都會丟擲UnsupportedOperationException異常。
這裡寫圖片描述

優缺點:
1.缺點:copy代價高
2.優點:迭代快

方法舉例:

    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1
); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); }

ArrayBlockingQueue

同步通過可重入鎖保證

  /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

方法舉例

    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

注意:Blocking*系列的提供有多種類似的方法,不同的方法的處理有點不一樣。
這裡寫圖片描述

這些方法的核心主體還是一樣的,但是有的會根據具體的結果會進一步的處理,如:add方法,最終還是呼叫的offer方法,但是根據offer的返回值(插入成功還是不成功進一步的處理,看是否需要丟擲異常)

   public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

其中offer和put的區別在offer是用lock.lock(),而put是用的lock.lockInterruptibly()方法進行加鎖的,即put方法在等待鎖時,你可以中斷等待,使執行緒丟擲InterruptedException 異常。(這兩個的區別見:http://blog.csdn.net/youyou1543724847/article/details/52174968

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

ConcurentHashMap

這個最令人頭疼了,看起原始碼來。裡面的同步主要是通過CAS操作來保證的。鎖的粒度為桶級別,即同一個hashcode的鏈上的資料會被鎖住。

>get操作沒有加鎖,是通過entry的volatile屬性保證可見性。

 public V get(Object key) 
    {
        Node<K,V>[] tab;
        Node<K,V> e, p; 
        int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) 
        {
            if ((eh = e.hash) == h) 
            {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)// 樹,或是 forwardnode等
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) // 連結串列  
            {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
    final V putVal(K key, V value, boolean onlyIfAbsent) 
    {
        if (key == null || value == null) throw new NullPointerException();

        int hash = spread(key.hashCode());

        int binCount = 0;
        for (Node<K,V>[] tab = table;;) 
        {
            Node<K,V> f; 
            int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();// table為空,初始化table  
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) 
            {//如果當前的bin是空的,則通過cas插入,不用加鎖
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)//檢測到正在擴容,則幫助其擴容 
                tab = helpTransfer(tab, f);
            else 
            {
                V oldVal = null;
                synchronized (f) //對相應的bin進行上鎖
                {
                    if (tabAt(tab, i) == f) //檢查鎖的有效性,即之前鎖住的物件還是bin的頭結點,如果無效,則重試(重新走一邊for迴圈)
                    {
                        if (fh >= 0) //當前的鏈還是list結構
                        {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) //// 樹節點 
                        {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }//插入節點
        }
        addCount(1L, binCount);
        return null;
    }

總結

在需要使用執行緒安全的集合時,選擇適合的容器非常重要。可通過併發度、效能來考量
另外,有時候,當資料量較大時,可能這些容器其實已經不太適合,你需要通過別的方法來保證安全,如直接將資料放入到資料庫中(或是記憶體NOSQL系統中)