同步容器和並發容器
一、同步容器
在Java中,同步容器包括兩個部分,一個是vector和HashTable,查看vector、HashTable的實現代碼,可以看到這些容器實現線程安全的方式就是將它們的狀態封裝起來,並在需要同步的方法上加上關鍵字synchornized。
另一個是Collections類中提供的靜態工廠方法創建的同步包裝類。
同步容器都是線程安全的。但是對於復合操作(叠代、缺少即加入、導航:根據一定的順序尋找下一個元素),有時可能需要使用額外的客戶端加鎖進行保護。在一個同步容器中,復合操作是安全的。但是當其他線程能夠並發修改容器的時候,它們就可能不會按照期望工作了。
有一些原因造成我們不願意在叠代期間對容器加鎖,當其它線程需要訪問容器時,必須等待,直到叠代結束,如果容器很大,或者對每一個元素執行的任務耗時比較長,它們可能需要等待很長一段時間。另外,如果對元素的操作還要持有另一個鎖,這是一個產生死鎖風險的因素。在叠代期間,對容器加鎖的一個替代方法是復制容器,因為復制是線程限制的,沒有其他的線程能夠在叠代期間對其進行修改,這樣就消除了ConcurrentModificationException發生的可能性。
二、並發容器
同步容器通過對容器的所有狀態進行串行訪問,從而實現它們的線程安全。這樣做的代價是削弱了並發性,當多個線程共同競爭容器級的鎖時,吞吐量就會降低。並發容器是為多線程並發訪問而設計的。Java 5.0中ConcurrentHashMap,來替代同步的哈希Map實現。Queue用來臨時保存正在等待被進一步處理的一系列元素,JDK提供了幾種實現,包括一個傳統的FIFO隊列ConcurrentLinkedQueue,一個具有優先級順序的隊列PriorityQueue。Queue的操作不會阻塞,如果隊列是空的,那麽從隊列中獲取元素的操作會返回空值。BlockingQueue擴展了Queue,增加了可阻塞的插入和獲取操作。如果隊列是空的,一個獲取操作就會一直阻塞直到隊列中存在可用元素,如果隊列是滿的,插入操作就會一直阻塞到隊列中存在可用空間。阻塞隊列在生產者和消費者設計中非常有用。
ConcurrentHashMap
ConcurrentHashMap與HashMap一樣是一個哈希表,但是它使用完全不同的鎖策略,可以提供更好的並發性和可伸縮性。在ConcurrentHashMap以前,程序使用一個公共鎖同步一個方法,並嚴格地控制只能在一個線程中可以同時訪問容器,而ConcurrentHashMap使用一個更為細化的鎖機制,名叫分離鎖。這個機制允許任意數量的讀線程可以並發訪問Map,讀者和寫者也可以並發訪問Map,並且有限數量的寫進程還可以並發修改Map,結果是為並發訪問帶來更高的吞吐量,同時幾乎沒有損失單個線程訪問的性能。
ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment是一種可重入鎖ReentrantLock,在ConcurrentHshMap裏扮演鎖的角色,HashEntry則用於存儲鍵值對數據。一個ConcurrentHashMap裏包含一個Segment數組,Segment的結構和HashMap類似,是一種數組和鏈表結構,一個Segment裏包含一個HashEntry數組,每一個HashEntry是一個鏈表結構的元素,每個Segment守護著HashEntry數組裏的元素,當對HashEntry數組的數據進行修改時,必須首先獲得它對應的Segment的鎖。
HashEntry類用來封裝散列表中的鍵值對。類定義如下:
static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } /** * Sets next field with volatile write semantics. (See above * about use of putOrderedObject.) */ final void setNext(HashEntry<K,V> n) { UNSAFE.putOrderedObject(this, nextOffset, n); } // Unsafe mechanics static final sun.misc.Unsafe UNSAFE; static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = HashEntry.class; nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
在JDK1.6中,HashEntry中的next域定義為final類型,新節點只能在鏈表的表頭插入。而且在每次刪除節點之前的所有節點拷貝一份組成一個新的鏈,而將當前節點的next指向當前節點的下一個節點,從而在刪除之後有兩條鏈存在,因而可以保證即使在同一條鏈中,有一個線程在刪除,而另一個線程在遍歷,它們都能工作良好。如果遍歷線程在刪除線程結束後開始,則它能看到刪除後的變化,如果它發生在刪除線程正在執行中間,則它會使用原有的鏈,而不會等到刪除線程結束後再執行,即看不到刪除線程的影響。
HashEntry類的value域被聲明為Volatile類型,Java的內存模型可以保證:某個寫線程對value域的寫入可以馬上被後續的某個讀線程“看”到。
Segment類繼承於ReentrantLock,從而使得Segment對象能充當鎖的角色。
static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; transient volatile HashEntry<K,V>[] table; transient int count; //在本segment範圍內,包含的HashEntry的個數 transient int modCount; //table被更新的次數 transient int threshold; //當table中包含的HashEntry元素超過threshold時,觸發table再散列 /* * table是由HashEntry對象組成的數組 * 如果散列時發生碰撞,碰撞的HashEntry對象就以鏈表的形式鏈接成一個鏈表 * table數組的數組成員代表散列映射表的一個桶 * 每一個table守護整個ConcurrentHashMap包含桶總數的一部分 * 如果並發級別為16,table則守護ConcurrentHashMap包含桶總數的1/16 */ final float loadFactor; //裝填因子 Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } ... }
每個Segment對象用來守護其(成員對象table中)包含的若幹個桶。table是一個由HashEntry對象組成的數組,table數組的每一個數組成員就是散列表的一個桶。count變量是一個計數器,它表示每個Segment對象管理的table數組包含的HashEntry對象的個數。每一個Segment對象都有一個count對象來表示本Segment中包含的HashEntry對象的總數。在ConcurrentHashMap中,每一個Segment對象都有一個count對象來表示本Segment中包含的HashEntry對象的個數,這樣當需要更新計數器時,不用鎖定整個ConcurrentHashMap。
ConcurrentHashMap類在默認並發級別會創建包含16個Segment對象的數組 。每個 Segment 的成員對象 table 包含若幹個散列表的桶。每個桶是由 HashEntry 鏈接起來的一個鏈表。如果鍵能均勻散列,每個 Segment 大約守護整個散列表中桶總數的 1/16。
類定義如下:
//創建一個帶有指定初始容量、加載因子和並發級別的空映射 public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; //段偏移量 this.segmentMask = ssize - 1; //Segment的掩碼值,key的散列碼的高位來選擇具體的Segment if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; //創建 segments和segments[0] Segment<K,V> s0 =new Segment<K,V>(loadFactor, (int)(cap * loadFactor),(HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
在ConcurrentHashMap中,線程對映射表做讀操作,一般情況下不要加鎖就可以完成,對容器做結構性修改的操作才需要加鎖。
以put操作為例說明對ConcurrentHashMap做結構性修改的過程:
public V put(K key, V value) { Segment<K,V> s; if (value == null) //ConcurrentHashMap 中不允許用 null作為映射值,當讀到null時,便知道產生了沖突-發生了重排序現象,需加鎖後重新讀入這個value值 throw new NullPointerException(); int hash = hash(key); //根據hash值找到相應的Segment int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
首先,根據key計算出對應的hash值,然後根據hash值找到對應的Segment對象:
private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) { // recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
最後,在這個Segment中執行具體的put操作:
final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);//加鎖 V oldValue; try { HashEntry<K,V>[] tab = table; /* * 把hash值與table數組的長度減1的值相與 * 得到該hash值對應的table數組的下標值 */ int index = (tab.length - 1) & hash; //找到hash值對應的具體的那個桶 HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) //如果鍵值對已經存在 { K k; if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else //鍵值對不存在 { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first);//創建新的節點到鏈表中 int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); //解鎖 } return oldValue; }
這裏的加鎖操作是針對(鍵的hash值對應的)某個具體的Segment,鎖定的是該Segment而不是整個的ConcurrentHashMap,其他寫線程對另外的15個Segment的加鎖並不會因為當前線程對這個Segment的加鎖而阻塞。同時,所有讀線程幾乎不會因本線程的加鎖而阻塞(除非讀線程剛好讀到這個Segment中某個HashEntry的value域值為null,此時需要加鎖後重新讀取該值)。
相較於HashTable和由同步包裝器包裝的HashMap每次只能有一個線程執行讀或寫操作,ConcurrentHashMap在並發訪問性能上有了質的提高。在理想狀態下,ConcurrentHashMap可以支持16個線程執行並發寫操作(如果並發級別設置為16),及任意數量的讀操作。
線程寫入有兩種情形:對散列表做非結構性修改的操作和對散列表做結構性修改的操作。
非結構性修改操作只是更改某個HashEntry的value域的值。由於對 Volatile 變量的寫入操作將與隨後對這個變量的讀操作進行同步。當一個寫線程修改了某個 HashEntry 的 value 域後,另一個讀線程讀這個值域,Java 內存模型能夠保證讀線程讀取的一定是更新後的值。所以,寫線程對鏈表的非結構性修改能夠被後續不加鎖的讀線程“看到”。
對 ConcurrentHashMap 做結構性修改,實質上是對某個桶指向的鏈表做結構性修改。如果能夠確保:在讀線程遍歷一個鏈表期間,寫線程對這個鏈表所做的結構性修改不影響讀線程繼續正常遍歷這個鏈表。那麽讀 / 寫線程之間就可以安全並發訪問這個 ConcurrentHashMap。結構性修改操作包括 put,remove,clear。
下面來看一下remove的操作:
public V remove(Object key) { int hash = hash(key); Segment<K,V> s = segmentForHash(hash); return s == null ? null : s.remove(key, hash, null); }
首先根據key來計算對應的hash值,然後根據hash值找到對應的Segment對象,下面是真正的remove操作:
final V remove(Object key, int hash, Object value) { if (!tryLock()) //加鎖 scanAndLock(key, hash); V oldValue = null; try { HashEntry<K,V>[] tab = table; //根據hash值找到table的下標值 int index = (tab.length - 1) & hash; //找到hash對應的那個桶 HashEntry<K,V> e = entryAt(tab, index); HashEntry<K,V> pred = null; while (e != null) { K k; HashEntry<K,V> next = e.next; if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) { V v = e.value; if (value == null || value == v || value.equals(v)) //找到要刪除的節點 { if (pred == null) setEntryAt(tab, index, next); else pred.setNext(next); ++modCount; --count; oldValue = v; } break; } pred = e; e = next; } } finally { unlock(); //解鎖 } return oldValue; }
clear操作的源碼如下,對每個段Segment進行遍歷,然後進行clear操作:
public void clear() { final Segment<K,V>[] segments = this.segments; for (int j = 0; j < segments.length; ++j) { Segment<K,V> s = segmentAt(segments, j); if (s != null) s.clear(); } }
final void clear() { lock(); try { HashEntry<K,V>[] tab = table; for (int i = 0; i < tab.length ; i++) setEntryAt(tab, i, null); ++modCount; count = 0; } finally { unlock(); } }
三、總結
ConcurrentHashMap 是一個並發散列映射表的實現,它允許完全並發的讀取,並且支持給定數量的並發更新。相比於 HashTable 和用同步包裝器包裝的 HashMap(Collections.synchronizedMap(new HashMap())),ConcurrentHashMap 擁有更高的並發性。在 HashTable 和由同步包裝器包裝的 HashMap 中,使用一個全局的鎖來同步不同線程間的並發訪問。同一時間點,只能有一個線程持有鎖,也就是說在同一時間點,只能有一個線程能訪問容器。
ConcurrentHashMap 的高並發性主要來自於三個方面:
- 用分離鎖實現多個線程間的更深層次的共享訪問。
- 用 HashEntery 對象的不變性來降低執行讀操作的線程在遍歷鏈表期間對加鎖的需求。
- 通過對同一個 Volatile 變量的寫 / 讀訪問,協調不同線程間讀 / 寫操作的內存可見性。
在實際的應用中,散列表一般的應用場景是:除了少數插入操作和刪除操作外,絕大多數都是讀取操作,而且讀操作在大多數時候都是成功的。正是基於這個前提,ConcurrentHashMap 針對讀操作做了大量的優化。通過 HashEntry 對象的不變性和用 volatile 型變量協調線程間的內存可見性,使得 大多數時候,讀操作不需要加鎖就可以正確獲得值。
######
同步容器和並發容器