並發容器(一)同步容器 與 並發容器
一、同步容器
同步容器包括兩類:
- Vector、Hashtable、Stack
- 同步的封裝器類由 Collections.synchronizedXXX 等工廠方法創建的。(JDK1.2加入)
??這些類實現線程安全的方式是:將他們的狀態封裝起來,並對每個公有方法都進行同步,使得每一次只有一個線程能訪問容器的狀態。 同步容器類的出現是為了解決 Collection、Map 不能同步,線程不安全的問題。
同步容器類的問題
??同步容器類都是線程安全的,但不是絕對的線程安全 (所謂線程安全僅僅是在每一個方法上加鎖,保持原子)。在某些情況下,需要額外加鎖來保護復合操作。復合類操作如:叠代(反復訪問元素,遍歷完容器中的所有元素)、跳轉
看下面三種“意外”情況:
1. 獲取與刪除的復合操作
??下面的代碼看起來沒什麽問題,但如果一旦出現:線程A執行 getLast() 方法,線程B執行 deleteLast() 方法;線程A,線程B 交替執行,getLast() 方法就可能會拋出 ArrayIndexOutOfBoundsException(數組下標越界)。
public static Object getLast(Vector list) {
int lastIndex = list.size () - 1;
return list.get(lastIndex);
}
public static void deleteLast(Vector list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
??為防止這種情況出現,就要額外加鎖,使 getLast()、deleteLast() 方法成為原子性操作。正確的寫法如下:
public static Object getLast(Vector list){
synchronized (this){
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
}
public static void deleteLast(Vector list){
synchronized(this){
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}
2. 普通叠代
for(int i = 0 ;i < vector.size(); i++){
doSome(vector.get(i));
}
??這種叠代方法的正確性完全依賴於運氣:我們無法保證在調用size與get直接按有沒有其他線程對所操作的這個Vector進行了修改。但是這並不代表Vector就不是線程安全的。Vector仍然是線程安全的,而拋出的異常也與其規範保持一致。和 getLast()的例子一樣,如果 遍歷列表的線程 與 刪除的線程 交替執行,同樣也會拋出 ArrayIndexOutOfBoundsException。
改進的寫法:
synchronized(vector){
for(int i = 0 ;i < vector.size(); i++){
doSome(vector.get(i));
}
}
3. 叠代器 Iterator 與 ConcurrentModificationException
??對容器的標準叠代方式是使用 Iterator。然而,在叠代的期間,如果有線程並發地修改同步容器的,那麽即使使用 Iterator 也無法避免對容器進行加鎖。這是由於早期叠代器設計的時候並沒有考慮並發修改的問題。而且,一旦失敗,將會拋出 ConcurrentModificationException .
Collection c = Collections.synchronizedCollection(myCollection);
...
synchronized(c) {
Iterator i = c.iterator(); // Must be in the synchronized block
while (i.hasNext())
foo(i.next());
}
??有時候程序員並不希望在叠代期間對容器加鎖。特別是容器叠代的規模大的時候,就可能需要長時間加鎖,會造成鎖的競爭激烈,降低程序的伸縮性。替代的方法是,克隆出一個副本,在副本上叠代。但也要進行權衡,因為克隆復制也需要額外的開銷。
隱藏的叠代器
??容器的有些方法是進行叠代的,這些方法也要記得對其加鎖。我們看看下面這個程序:
public class HiddenIterator{
@GuardedBy(this)
private final Set<Integer> set = new HashSet<Integer>();
public synchronized void add(Integer i ){ set.add(i); }
public synchronized void remove(Integer i ){ set.remove(i); }
public void addTenThings(){
Random r = new Random();
for(int i = 0 ; i < 10; i++)
add(r.nextInt());
System.out.println("DEBUG : added ten elements to" + set);
}
}
??上面的程序看起來好像也沒問題,add、remove都加鎖了。然而 addTenThings() 的最一行輸出中,調用了 Set.toString()方法,toString()方法又是對容器進行了叠代,也可能拋出 ConcurrentModificationException 。所以,也要對 addTenThings()方法加鎖。
??隱藏了叠代容器操作的方法: toString()、hashcode()、equals()、containsAll()、removeAll()、retainAll()。還有 forEach 的寫法。
二、並發容器類
??同步類容器的狀態都是串行化的(使用 synchronized 加鎖的,同一時間只能一個線程訪問容器,一個個排隊訪問,這就是串行化)。他們雖然實現了線程安全,但是嚴重降低了並發性,在多線程環境時,嚴重降低了應用的吞吐量。
看一下源代碼,更加直觀:
下面是 Collections.SynchronizedCollection() 方法的源代碼。
public static <T> Collection<T> synchronizedCollection(Collection<T> c) {
return new SynchronizedCollection<>(c);
}
synchronizedCollection()方法是直接創建並返回一個 SynchronizedCollection 類的對象,這個類是 Collections 的靜態內部類,繼續跟蹤。
static class SynchronizedCollection<E> implements Collection<E>, Serializable {
private static final long serialVersionUID = 3053995032091335093L;
final Collection<E> c; // 非線程安全的 Collection
final Object mutex; // Object on which to synchronize
SynchronizedCollection(Collection<E> c) {
//判斷集合c是否是為null,為null就拋異常
this.c = Objects.requireNonNull(c);
mutex = this;
}
SynchronizedCollection(Collection<E> c, Object mutex) {
//判斷集合c是否是為null,為null就拋異常
this.c = Objects.requireNonNull(c);
this.mutex = Objects.requireNonNull(mutex);
}
//封裝集合c的size()方法,synchronized修飾,變成同步方法
public int size() {
synchronized (mutex) {return c.size();}
}
public boolean isEmpty() {
synchronized (mutex) {return c.isEmpty();}
}
public boolean contains(Object o) {
synchronized (mutex) {return c.contains(o);}
}
public Object[] toArray() {
synchronized (mutex) {return c.toArray();}
}
//........
??可以看出,Collections.SynchronizedCollection類其實就是封裝了 非線程安全的Collection 類對象,在 Collection 的每個方法上加上 synchronized。
再看一下 Vector add()方法的源代碼:
public synchronized boolean add(E e) {
modCount++;
ensureCapacityHelper(elementCount + 1);
elementData[elementCount++] = e;
return true;
}
??從上面的源碼可以得知:同步容器的線程安全都是用 synchronized 來實現的,而且鎖住整個方法區,即方法區的所有代碼都是臨界區,這就導致了同一時刻,只能有一個線程訪問容器。換句話說,只能同步地訪問容器,無法並發地訪問容器,在高並發的情況下,將會非常地糟糕。
這時候,高性能的並發容器出現了
??java5.0之後提供了多種並發容器來改善同步容器的性能,如 ConcurrentHashMap、CopyOnWriteArrayList、CopyOnWriteArraySet、ConcurrentSkipListMap、ConcurrentSkipListSet、ConcurrentLinkedQueue;其中 ConcurrentHashMap 用來替代 Hashtable ,CopyOnWriteArrayList 用來替代 Vector;
??並發容器類采用各種優化手段,盡可能讓多線程並發訪問容器:ConcurrentHashMap 的分段鎖、ConcurrentLinkedQueue 的非阻塞的CAS算法、鎖的粒度更細、以及針對多讀少寫的情況下的 “寫時復制”。
下面重點說一下 ConcurrentHashMap
??ConcurrentHashMap 采用分段鎖技術 ,同步容器中,是一個容器一個鎖,但在ConcurrentHashMap中,會將hash表的數據分成若幹段,每段維護一個鎖,以達到高效的並發訪問;
??ConcurrentHashMap 與 其他並發容器一樣,在叠代的過程不需要加鎖,叠代器具有弱一致性,叠代期間不會拋出ConcurrentModificationException異常,並非“立即失敗”;所謂 弱一致性 ,就是返回的元素將反映叠代器創建時或創建後某一時刻的映射狀態。同時,需要在整個Map上進行計算的方法,如 size()、isEmpty(),這些方法的語義被略微減弱,以反映並發的特性,換句話說,這些方法的值是一個估計值,並不是很精確。事實上,這些方法在並發環境下用處很小,因為在並發的情況下,它們的返回值總是在變化。如果需要強一致性,那麽就得考慮加鎖。同步容器類便是強一致性的。
??由於 ConcurrentHashMap 不能被加鎖來執行獨占訪問,因此無法通過加鎖來創建新的原子操作。不過,ConcurrentHashMap 提供了以下幾個原子操作(由其父接口 ConcurrentMap 提供),基本滿足需求了:
//如果指定鍵已經不再與某個值相關聯,則將它與給定值關聯。
V putIfAbsent(K key, V value);
//只有目前將鍵的條目映射到給定值時,才移除該鍵的條目。
boolean remove(Object key, Object value);
//只有目前將鍵的條目映射到某一值時,才替換該鍵的條目。
V replace(K key, V value);
//只有目前將鍵的條目映射到給定值時,才替換該鍵的條目。
boolean replace(K key,V oldValue, V newValue);
JDK 提供的並發容器還包括以下7個阻塞隊列,如下:
- ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。
- LinkedBlockingQueue:一個由鏈表結構組成的有界阻塞隊列。
- PriorityBlockingQueue:一個支持優先級排序的無界阻塞隊列。
- DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列。
- LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
- LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。
補充說明:上面的 ConcurrentHashMap 的介紹是基於 JDK1.6 版本的,JDK1.8 有所修改,可參考後續文章。
參考文獻:
- 《並發編程的藝術》
- 《並發編程實戰》
並發容器(一)同步容器 與 並發容器