java多線程4.構建並發模塊
委托是創建線程安全類的一個最有效策略:只需讓現有的線程安全類管理所有的狀態即可
Java類庫包含豐富的並發基礎構建模塊,如線程安全的容器以及各種用於協調多個相互協作的線程控制流的同步工具類
- 1. 同步容器類
這些類的方式是:將它們的狀態封裝起來,並對每個公有方法都進行同步,使得每次只有一個線程能訪問容器的狀態。
同步容器類都是線程安全的,但在某些情況下可能需要額外的客戶端加鎖類保護復合操作如:叠代或者條件運算(若沒有則添加)。
不過同步容器類是通過其自身的鎖來保護它的每個方法,因此通過獲得容器類的鎖,可以在客戶端構造原子操作。
public static Object getLast(Vector list){synchronized(list){ int lastIndex = list.size() - 1; return list.get(lastIndex); } }
通過在客戶端加鎖可以解決不可靠的叠代問題,但要犧牲一些伸縮性。在叠代期間導致其他線程無法訪問,降低了並發性
//在調用size和get之間存在並發訪問 synchronized(vector){ for(int i=0; i<vector.size(); i++){ doSomething(vector.get(i)); } }
同步容器類的叠代器並沒有考慮並發修改的問題,且它們表現出的行為是‘及時失敗’,即當它們發現容器在叠代過程中被修改時會拋出ConcurrentModificationException。
要想避免ConcurrentModificationException,就必須在叠代過程中持有容器的鎖。
如果不想在叠代期間對容器加鎖,一種替代的方法就是克隆容器(克隆過程仍然要對容器加鎖),在副本上進行叠代。
List<String> list = Collections.synchronizedList(new ArrayList<String>());//可能拋出ConcurrentModificationException for(String s : list){ doSomething(s); }
調用容器的toString,hsahVode,equals,containsAll,removeAll,retainAll等方法時,以及把容器作為參數的構造函數,都會間接的對容器進行叠代。
所有這些叠代操作都可能拋出ConcurrentModificationException。
- 2. 並發容器
同步容器將所有對容器狀態的訪問都串行化,以實現他們的線程安全性,這種方法的代價是驗證降低並發性。
Java 5.0提供了多種並發容器來改進同步容器的性能。
- 2.1 Queue用來臨時保存一組等待處理的元素。它提供幾種實現包括:ConcurentLinkedQueue,傳統的先進先出隊列
BlockingQueue擴展了Queue,增加了可阻塞的插入和獲取等操作。即如果隊列為空,那麽獲取元素的操作將一直阻塞,直到隊列中出現一個可用的元素;如果隊列已滿,那麽插入元素的操作將一直阻塞,直到隊列中出現可用的空間。在“生產者-消費者”模式中,阻塞隊列是常用的方式。
- 2.2 ConcurrentHashMap
同步容器類在執行每個操作期間都持有一個鎖。在一些操作中,例如:HashMap.get或List.contains可能包含大量的工作,當遍歷散列桶或鏈表來查找某個特定的對象時,必須在許多元素上調用equals(equals本身包含一定的計算量)。在基於散列的容器中,如果hashCode不能很均勻地分布散列值,那麽容器中的元素就不會均勻地分布在整個容器中。某些情況下,某個糟糕的散列函數還會把一個散列表變成線性鏈表。當遍歷很長的鏈表並且在某些或者全部元素上調用equals方法時,會花費相當長的時間,這段時間內其他線程都不能訪問該容器。
與HashMap一樣,ConcurrentHashMap也是基於散列的Map,但它使用了一種完全不同的加鎖策略來提供更高的並發性和伸縮性。ConcurrentHashMap並不是將每個方法都在同一個鎖上同步使得每次只能有一個線程訪問容器,而是用一種粒度更細的加鎖機制來實現更大程度的共享,這種機制稱為分段鎖。在這種機制中,任意數量的讀取線程可以並發的訪問Map,執行讀取操作的線程和執行寫入操作的線程可以並發的訪問Map,並且一定數量的寫入線程可以並發地修改Map。ConcurrentHashMap帶來的結果是,在並發訪問環境下將實現更高的吞吐量,而在單線程環境中只損失非常小的性能。
ConcurrentHashMap與其他並發容器一起增強了同步容器類:他們提供的叠代器不會拋出ConcurrentModificationException,因此不需要再叠代過程中對容器加鎖。ConcurrentHashMap返回的叠代器具有弱一致性,弱一致性的叠代器可以容忍並發的修改,當創建叠代器時會遍歷已有的元素,並可以(但不保證)在叠代器被構造後將修改操作反映給容器。
盡管有這些改進,但仍然有一些需要權衡的因素。對於一些需要在整個Map上進行計算的方法,例如:size和isEmpty,這些方法的語義被略微減弱了以反映容器的並發特性,由於size返回的結果在計算時可能已經過期了,它實際只是一個近似值,但這是允許的,雖然看上去令人有些不安,但事實上size和isEmpty這樣的方法在並發環境下的用處很小,因為它們的返回值總在不斷變化。從而這些操作的需求被弱化了,以換取對其他更重要操作的性能優化,如get,put,containsKey,remove等。
由於ConcurrentHashMap不能被加鎖來執行獨占訪問,因此無法使用客戶端加鎖來創建新的原子操作,但一些常見的復合操作,若沒有則添加putIfAbsent,若相等則去除,若相等則替換等,都已經實現為原子操作並在ConcurrentMap中聲明。
- 2.3 CopyOnWriteArrayList
CopyOnWriteArrayList(CopyOnWriteArraySet類似) 用於替代同步List,在某些情況下提供更好的並發性能,並且在叠代期間不需要對容器進行加鎖和復制。
寫入時復制容器的線程安全性在於,只要正確地發布一個事實不可變對象,那麽在訪問該對象時就不再需要進一步的同步。在每次修改時,都會創建並重新發布一個新的容器副本,從而實現可變性。容器的叠代器保留一個指向底層基礎數組的引用,這個數組當前位於數組的起始位置,由於它不會被修改,因此在對其進行同步時只需確保數組內容的可見性。因此,多個線程可以同時對這個容器進行叠代,而不會彼此幹擾或者與修改容器的線程相互幹擾。容器返回的叠代器不會拋出ConCurrentModificationException,並且返回的元素與叠代器創建時的元素完全一致,而不必考慮之後修改所帶來的影響。
顯然,每當修改容器時都會復制底層數組,這需要一定的開銷,尤其當容器的規模較大時。因此,僅當叠代操作遠遠多於修改操作時,才應該使用“寫入時復制”容器。
- 2.4 阻塞隊列、生產者-消費者模式
阻塞隊列提供可阻塞的put和take方法,以及定時的offer和poll方法。如果隊列已經滿了,那麽put方法阻塞直到有空間可用;如果隊列為空,那麽take方法將會阻塞直到 有元素可用。隊列可用有界也可用無界,無界隊列永遠都不會充滿,因此無界隊列的put方法永遠也不會阻塞。
阻塞隊列支持生產者-消費者模式。該模式將要完成的工作與執行工作兩個過程分離開來,這樣簡化了開發過程,消除了生產者類和消費者類之間的代碼依賴性,此外,該模式還將生產數據的過程與使用數據的過程解耦開來以簡化工作負載的管理,因為這兩個過程在處理數據的速率上有所不同。
一種最常見的生產者-消費者模式就是線程池與工作隊列的組合,在Executor任務執行框架中體現了這種模式。
BlockingQueue的幾種實現:
LinkedBlockigQueue和ArrayBlockingQueue是FIFO隊列,二者區別分別與LinkedList與ArrayList類似,但比同步List擁有更好的並發性。
PriorityBlockingQueue是一個按優先級排序的隊列,既可以根據元素的自然順序來比較元素(元素實現了Comparable方法),也可以使用Comparator來比較。
SynchronousQueue 並不是一個真正的隊列,因為它不會為隊列中元素維護存儲空間。與其他隊列不同的是,它維護一組線程,這些線程在等待著把元素加入或移出隊列。這種區別就好像將文件直接交給同事還是將文件放到他的郵箱中希望他能盡快拿到文件。因為沒有存儲功能,因此put和take會一直阻塞,直到有另一個線程已經準備好參與到交付過程中。僅當有足夠多的消費者,並且總是有一個消費者準備好獲取交付的工作時,才適合使用同步隊列。
- 示例:桌面搜索
/** * 在某個文件層次結構中搜索符合索引標準的文件,並將它們的名稱放入工作隊列。 * 在Indexer中給出了一個消費者任務,即從隊列中取出文件名稱並對它們建立索引。 * 將文件遍歷與建立索引分解為獨立操作,每個操作只需完成一個任務,並且阻塞隊列將負責所有的控制流,因此每個功能的代碼都更加簡單清晰。 * 生產者和消費者可以並發地執行。 * 如果一個是I/O密集型,另一個是CPU密集型,那麽並發執行的吞吐率要高於串行執行的吞吐率。 * 如果二者並行度不同,那麽耦合在一起的結果會把整體並行度降為二者中更小的並行度。 * 由於每個程序都在各自的線程中運行,消費者線程永遠不會退出,因而程序無法終止。 */ public class FileCrawler implements Runnable{ private final BlockingQueue<File> fileQueue; private final FileFilter fileFilter; private final File root; //... public void run() { try{ crawl(root); }catch(InterruptedException e){ Thread.currentThread().interrupt(); } } private void crawl(File root) throws InterruptedException{ File[] entries = root.listFiles(fileFilter); if(entries != null){ for(File entry : entries){ if(entry.isDirectory()){ crawl(entry); }else if(!alreadyIndexed(entry)){ fileQueue.put(entry); } } } } } class Indexer implements Runnable{ private final BlockingQueue<File> queue; public Indexer(BlockingQueue<File> queue){ this.queue = queue; } public void run(){ try{ while(true){ indexFile(queue.take()); } }catch(InterruptedException e){ Thread.currentThread().interrupt(); } } }
#筆記內容來自《java並發編程實戰》
java多線程4.構建並發模塊