1. 程式人生 > >基礎構建模塊

基礎構建模塊

part 繼續 latch 一定的 2.3 利用 方案 方式 系統

5 基礎構建模塊

Java平臺類庫包含了豐富的並發基礎構建模塊,例如線程安全的容器類以及各種用於協調多個相互協作的線程控制流的同步工具類(Synchronizer)。本章將介紹其中一些最有用的並發構建模塊。

5.1同步容器類

同步容器類包括Vector和Hashtable,二者是早期JDK的一部分,此外還包括在JDK 1.2

中添加的一些功能相似的類,這些同步的封裝器類是由Collections.synchronizedXxx等工廠方

法創建的。這些類實現線程安全的方式是:將它們的狀態封裝起來,並對每個公有方法都進行同步,使得每次只有一個線程能訪問容器的狀態。

5.1.1同步容器類的問題

技術分享圖片

所以需要客戶端加鎖,而同步容器正好是支持的。例如下操作:

技術分享圖片

2 : 在調用size和相應的get之間,Vector的長度可能會發生變化。

技術分享圖片

我們可以通過在客戶端加鎖來解決不可靠叠代的問題,但要犧牲一些伸縮性:

技術分享圖片

5.1.2 叠代器與ConcurrentModificationException

Java 5.0人的for-each循環語法中,對容器類進行叠代的標準方式都是使用Iterator,也無法避免在叠代期間對容器加鎖。因為它們表現出的行為是"及時失敗"( fail-fast)的。它們采用的實現方式是,將計數器的變化與容器關聯起來。如果在叠代期間計數器被修改,那麽hasNext或next將拋出ConcurrentModificationException。

技術分享圖片

5.1.3 隱藏叠代器

技術分享圖片

技術分享圖片

在System.out.println()中叠代set的時候,可能會拋出ConcurrentModificationException。這裏得到的教訓是,如果狀態與保護它的同步代碼之間相隔越遠,那麽開發人員就越容易忘記在訪問狀態時使用正確的同步。如果Hiddenlterator用synchronizedSet來包裝HashSet, 並且對同步代碼進行封裝,那麽就不會發生這種錯誤。容器的hashCode和equals等方法也會間接地執行叠代操作

正如封裝對象的狀態有助於雄持不變性條件一樣,封裝對_象的同步機制同_樣有助於確保實施同步策略。

5.2並發容器

同步容器將所有對容器狀態的訪問都串行化,以實現它們的線程安全性。這種方法的代價是嚴重降低並發性,Java 5.0提供了多種並發容器類來改進同步容器的性能。

通過LinkedList來實現Queue。Queue上的操作不會阻塞,如果隊列為空,那麽獲取元素的操作將返回空值。BlockingQueu。擴展了Queue,增加了可阻塞的插入和獲取等操作。

在Java 5.0中增加了Concurrent HashMap,用來替代同步且基於散列的Map。以及CopyOnWriteArrayList。

Java 6也引人了Concurrent SkipListMap和ConcurrentSkipListSet,分別作為同步的SortedMap和S ortedS et的並發替代品。(以前用synchronizedMap包裝TreeMap或TreeSet)

5.2.1 ConcurrentHashMap

采用分段鎖。ConcurrentHashMap與其他並發容器一起增強了同步容器類:它們提供的叠代器不會拋出ConcurrentModificationException。

盡管有這些改進,但仍然有一些需要權衡的因素。對於一些需要在整個Map上進行計算的方法,例如size和isEmpty,這些方法的語義被略微減弱了以反映容器的並發特性。由於size返回的結果在計算時可能已經過期了,它實際上只是一個估計值,因此允許size返回一個近似值而不是一個精確值。雖然這看上去有些令人不安,但事實上size和isEmpty這樣的方法在並發環境下的用處很小,因為它們的返回值總在不斷變化。因此,這些操作的需求被弱化了,以換取對其他更重要操作的性能優化,包括get, put, containsKey和remove等。

只有當應用程序需要加鎖Map以進行獨占訪問.時,才應該放棄使用ConcurrentHashhiap 。

5.2.2 額外的原子Map操作

由於ConcurrentHashMap不能被加鎖來執行獨占訪問,因此我們無法使用客戶端加鎖來創建新的原子操作,例如4.4.1節中對Vector增加原子操作"若沒有則添加"。但是,一些常見的復合操作,例如"若沒有則添加"、"若相等則移除(Remove-lf-Equal)"和"若相等則替換

( Replace-If-Equal)"等,都已經實現為原子操作並且在ConcurrentMap的接口中聲明,如程序清單5-7所示。如果你需要在現有的同步Map中添加這樣的功能,那麽很可能就意味著應該考慮使用ConcurrentMap了。

技術分享圖片

5.2.3寫入時復制容器:CopyOnWrite

CopyOnWriteArrayList用於替代同步List,類似地,CopyOnWriteArraySet替代同步Set。

只要正確地發布一個事實不可變的對象,那麽在訪問該對象時就不再需要進一步的同步。

"寫入時復制"容器的叠代器保留一個指向底層基礎數組的引用(所以不會被修改因此在對其進行同步時只需確保數組內容的可見性),這個數組當前位於叠代器的起始位置。當多個線程可以同時對這個容器進行叠代,不會彼此幹擾(各自遍歷各自的引用),叠代器不會拋出ConcurrentModificationException(引用不會被修改)。也不會與修改容器的線程相互幹擾。返回的並且返回的元素與叠代器創建時的元素完全一致。在每次修改時,都會創建並重新發布一個新的容器副本,從而實現可變性(因為發布所以可見)。

每當修改容器時都會復制底層數組,這需要一定的開銷,特別是當容器的規模較大

時。僅當叠代操作遠遠多於修改操作時,才應該使用"寫入時復制"容器。(事件通知系統)

5.3 阻塞隊列和生產者——消費者模式

阻塞隊列提供了可阻塞的put和take方法,以及支持定時的。ffer和poll方法。BlockingQueue簡化了生產者一消費者設計的實現過程,它支持任意數量的生產者和消費者。最常見的就是線程池與隊列的組合。

在構建高可靠的應用程序時,有界隊列是一種強有力的資添管理工具,一倉們能抑制並防止產生過多的工作項,使程序更加健壯。

5.3.2 串行線程封閉

在java.util.coricurrent中實現的各種阻塞隊列都包含了足夠的內部同步機制,從而安全地將對象從生產者線程發布到消費者線程。這種安全的發布確保了對象狀態對於新的所有者來說是可見的,並且由於最初的所有者不會再訪問它,因此對象將被封閉在新的線程中。新的所有者線程可以有獨占的訪問權。

對象池利用了串行線程封閉,將對象"借給"一個請求線程。只要對象池包含足夠的內部同步來安全地發布池中的對象,並且只要客戶代碼本身不會發布池中的對象,或者在將對象返回給對象池後就不再使用它(總之不會再讓第三個人來操作此對象),那麽就可以安全地在線程之間傳遞所有權。

我們也可以使用其他發布機制來傳遞可變對象的所有權,但必須確保只有一個線程能接受被轉移的對象。阻塞隊列簡化了這項工作。除此之外,還可以通過ConcurrentMap的原子方法remove或者AtomicReference的原子方法compareAndSet來完成這項工作。

5.3.3 雙端隊列與工作密取

工作密取非常適用於既是消費者也是生產者的場景。

Java 6增加了兩種容器類型,Deque(發音為"deck")和BfockingDeque,它們分別對Queue和BlockingQueue進行了擴展。Deque是一個雙端隊列,實現了在隊列頭和隊列尾的高效插人和移除。具體實現包括ArrayDeque和LinkedBlockingDeque。

工作密取設計中,每個消費者都有各自的雙端隊列。如果一個消費者完成了自己雙端隊列中的全部工作,那麽它可以從其他消費者雙端隊列末尾秘密地獲取工作。在大多數時候,它們都只是訪問自己的雙端隊列,從而極大地減少了競爭。它會從隊列的尾部而不是從頭部獲取工作,因此進一步降低了隊列上的競爭程度。

5.4阻塞方法與中斷方法

Thread提供了interrupt方法,用於中斷線程或者查詢線程是否已經被中斷。每個線程都有一個布爾類型的屬性,表示線程的中斷狀態,當中斷線程時將設置這個狀態。

當在代碼中調用了一個將拋出InterruptedException異常的方法時,你自己的方法也就變成了一個阻塞方法,並且必須要處理對中斷的響應。對於庫代碼來說,有兩種基本選擇:

傳遞InterruptedException 。避開這個異常通常是最明智的策略。只需把

InterruptedException傳遞給方法的調用者。傳遞InterruptedException的方法包括,根本不捕獲該異常,或者捕獲該異常,然後在執行某種簡單的清理工作後再次拋出這個異常。

恢復中斷。有時候不能拋出InterruptedException,例如當代碼是Runnable的一部分時。在這些情況下,必須捕獲InterruptedException,並通過調用當前線程上的interrupt方法恢復中斷狀態,這樣在調用棧中更高層的代碼將看到引發了一個中斷,如程序清單5-10所示。

技術分享圖片

在出現InterruptedException時不應該做的事情是,捕獲它但不做出任何響應。這將使調用棧上更高層的代碼無法對中斷采取處理措施,因為線程被中斷的證據已經丟失。

5.5同步工具類

同步工具類可以是任何一個對象,只要它根據其自身的狀態來協調線程的控制流。阻塞隊列可以作為同步工具類,其他類型的同步工具類還包括信號量( Semaphore )、柵欄(Barrier )以及閉鎖(Latch )。

所有的同步工具類都包含一些特定的結構化屬性:它們封裝了一些狀態,這些狀態將決定執行同步工具類的線程是繼續執行還是等待,此外還提供了一些方法對狀態進行操作,以及另一些方法用於高效地等待同步工具類進入到預期狀態。

5.5.1閉鎖

閉鎖是一種同步工具類,可以延遲線程的進度直到其到達終止狀態。閉鎖的作用相當於一扇門:在閉鎖到達結束狀態之前,這扇門一直是關閉的,並且沒有任何線程能通過,當到達結束狀態時,這扇門會打開並允許所有的線程通過。當閉鎖到達結束狀態後,將不會再改變狀態,因此這扇門將永遠保持打開狀態。閉鎖可以用來確保某些活動直到其他活動都完成後才繼續執行,例如:

確保某個計算在其需要的所有資源都被初始化之後才繼續執行。

確保某個服務在其依賴的所有其他服務都已經啟動之後才啟動。

等待直到某個操作的所有參與者(在多玩家遊戲中的所有玩家)都就緒再繼續執行)

CountDownLatch是一種靈活的閉鎖實現,它可以使一個或多個線程等待一組事件發生。閉鎖狀態包括一個計數器,該計數器被初始化為一個正數,表示需要等待的事件數量。countDown方法遞減計數器,表示有一個事件已經發生了,而await方法等待計數器達到零,這表示所有需要等待的事件都已經發生。如果計數器的值非零,那麽await會一直阻塞直到計數器為零,或者等待中的線程中斷,或者等待超時。

在程序清單5-11的TestHarness中給出了閉鎖的兩種常見用法。TestHarness創建一定數

量的線程,利用它們並發地執行指定的任務。一它使用兩個閉鎖,分別表示"起始門(Starting

Gate )‘,和"結束門(Ending Gate ) "。起始門計數器的初始值為1,而結束門計數器的初始值為工作線程的數量。每個工作線程首先要做的值就是在啟動門上等待,從而確保所有線程都就緒後才開始執行。而每個線程要做的最後一件事情是將調用結束門的countDown方法減1,這能使主線程高效地等待直到所有工作線程都執行完成,因此可以統計所消耗的時間。

技術分享圖片

技術分享圖片

如果在創建線程後立即啟動它們,那麽先啟動的線程將"領先"後啟動的線程,並且活躍線程數量會隨著時間的推移而增加或減少,競爭程度也在不斷發生變化。啟動門將使得主線程能夠問時釋放所有工作線程,而結束門則使主線程能夠等待最後一個線程執行完成,而不是順序地等待每個線程執行完成。

5.5.2 FutureTask

FutureTask也可以用做閉鎖。( FutureTask實現了Future語義,表示一種抽象的可生成結果的計算[CPJ 4.3.3])o FutureTask表示的計算是通過Callable來實現的,相當於一種可生成結果的Runnable,並且可以處於以下3種狀態:等待運行(Waiting to run ),正在運行< Running)和運行完成(Completed) o"執行完成"表示計算的所有可能結束方式,包括正常結束、由於取消而結束和由於異常而結束等。當FutureTask進人完成狀態後,它會永遠停止在這個狀態上。

Future.get的行為取決於任務的狀態。如果任務已經完成,那麽get會立即返回結果,否則get將阻塞直到任務進人完成狀態,然後返回結果或者拋出異常。FutureTask將計算結果從執行計算的線程傳遞到獲取這個結果的線程,而FutureTask的規範確保了這種傳遞過程能實現結果的安全發布。

FutureTask在Executo:框架中表示異步任務,此外還可以用來表示一些時間較長的計算,這些計算可以在使用計算結果之前啟動。程序清單5-12中的Preloader就使用了FutureTask來執行一個高開銷的計算,並且計算結果將在稍後使用。通過提前啟動計算,可以減少在等待結果時需要的時間。

技術分享圖片

由於在構造函數或靜態初始化方法中啟動線程並不是一種好方法,因此提供了一

個start方法來啟動線程。當程序隨後需要ProductInfo時,可以調用get方法,如果數據已經加載,那麽將返回這些數據,否則將等待加載完成後再返回。

Callable表示的任務可以拋出受檢查的或未受檢查的異常,並且任何代碼都可能拋出一個Error。無論任務代碼拋出什麽異常,都會被封裝到一個ExecutionException中,並在

Future.get中被重新拋出。這將使調用get的代碼變得復雜,因為它不僅需要處理可能出現的ExecutionException(以及未檢查的CancellationException ),而且還由於ExecutionException是作為一個Throwable類返回的,因此處理起來並不容易。在Preloader中,當get方法拋出ExecutionException時,可能是以下三種情況之一:Callable拋出的受檢查異常,RuntimeException,以及Error。我們必須對每種情況進行單獨處理,但我們將使用程序清單5-13中的launderThrowable輔助方法來封裝一些復雜的異常處理邏輯。在調用launderThrowable之前,Preloader會首先檢查已知的受檢查異常,並重新拋出它們。剩下的是未檢查異常,Preloader將調用launderThrowabl。並拋出結果。如果Throwable傳遞給launderThrowable的是一個Error,那麽launderThrowable將直接再次拋出它;如果不是RuntimeException,那麽將拋出一個I11ega1StateException表示這是一個邏輯錯誤。乘下的

RuntimeException, launderThrowable將把它們返回給調用者,而調用者通常會重新拋出

它們。

技術分享圖片

5.5.3 信號量

計數信號量(Counting Semaphore)用來控制同時訪問某個特定資源的操作數量,數信號量還可以用來實現某種資源池,或者對容器施加邊界。Semaphore中管理著一組虛擬的許可(permit),許可的初始數量可通過構造函數來指定。在執行操作時可以首先獲得許可(只要還有剩余的許可),並在使用以後釋放許可。如果沒有許可,那麽acquire將阻塞直到有許可(或者直到被中斷或者操作超時)。release方法將返回一個許可給信號量。

計算信號量的一種簡化形式是二值信號量,即初始值為1的Semaphore。二值信號量可以用做互斥體(mutex),並具備不可重人的加鎖語義:誰擁有這個唯一的許可,誰就擁有了互斥鎖。

你可以使用Semaphore將任何一種容器變成有界阻塞容器。我們可以構造一個固定長度的資源池,當池為空時,請求資源將會失敗,但你真正希望看到的行為是阻塞而不是失敗,並且當池非空時解除阻塞。如果將Semaphore的計數值初始化為池的大小,並在從池中獲取一個資源之前首先調用acquire方法獲取一個許可,在將資源返回給池之後調用release釋放許可,那麽acquire將一直阻塞直到資源池不為空。

技術分享圖片

5.5.4柵欄

柵欄(B arrier)類似於閉鎖,它能阻塞一組線程直到某個事件發生【CPJ 4,4.3]。柵欄與閉鎖的關鍵區別在於,所有線程必須同時到達柵欄位置,才能繼續執行。閉鎖用於等待事件,而柵欄用於等待其他線程。柵欄用於實現一些協議,例如幾個家庭決定在某個地方集合:"所有人6:00在麥當勞碰頭,到了以後要等其他人,之後再討論下一步要做的事情。

CyclicBarrier可以使一定數量的參與方反復地在柵欄位置匯集,它在並行叠代算法中非常有用:這種算法通常將一個問題拆分成一系列相互獨立的子問題。當線程到達柵欄位置時將調用await方法,這個方法將阻塞直到所有線程都到達柵欄位置。如果所有線程都到達了柵欄位置,那麽柵欄將打開,此時所有線程都被釋放,而柵欄將被重置以便下次使用。如果對await的調用超時,或者await阻塞的線程被中斷,那麽柵欄就被認為是打破了,所有阻塞的await調。用都將終止並拋出BrokenBarrierException。如果成功地通過柵欄,那麽await將為每個線程返回一個唯一的到達索引號,我們可以利用這些索引來"選舉"產生一個領導線程,並在下一次叠代中由該領導線程執行一些特殊的工作。CyclicBarrier還可以使你將一個柵欄操作傳遞給構造函數,這是一個Runnable,當成功通過柵欄時會(在一個子任務線程中)執行它,但在阻塞線程被釋放之前是不能執行的。

在程序清單5-15的CellularAutomata中給出了如何通過柵欄來計算細胞的自動化模擬,例如Conway的生命遊戲。在把模擬過程並行化時,為每個元素(在這個示例中相當於一個細胞)分配一個獨立的線程是不現實的,因為這將產生過多的線程,而在協調這些線程上導致的開銷將降低計算性能。合理的做法是,將問題分解成一定數量的子問題,為每個子問題分配一個線程來進行求解,之後再將所有的結果合並起來。CellularAutomata將問題分解為Ncpu個子問題,其中\P}:等於可用CPU的數量,並將每個子問題分配給一個線程。在每個步驟中,工作線程都為各自子問題中的所有細胞計算新值。當所有工作線程都到達柵欄時,柵欄會把這些新值提交給數據模型。在柵欄的操作執行完以後,工作線程將開始下一步的計算,包括調用isDone方法來判斷是否需要進行下一次叠代。

技術分享圖片

技術分享圖片

另一種形式的柵欄是Exchanger,它是一種兩方(Two-Party )柵欄,各方在柵欄位置上交

換數據[CPJ 3.4.3]。當兩方執行不對稱的操作時,Exchanger會非常有用,例如當一個線程向緩沖區寫人數據,而另一個線程從緩沖區中讀取數據。這些線程可以使用Exchangez來匯合,並將滿的緩沖區與空的緩沖區交換。當兩個線程通過Exchanger交換對象時,這種交換就把這兩個對象安全地發布給另一方。

數據交換的時機取決於應用程序的響應需求。最簡草的方案是,當緩沖區被填滿時,

由填充任務進行交換,當緩沖區為空時,由清空任務進行交換。這樣會把需要交換的次數

降至最低,但如果新數據的到達率不可預測,那麽一些數據的處理過程就將延遲。另一個

方法是,不僅當緩沖被填滿時進行交換,並且當緩沖被填充到一定程度並保持一定時間後,

也進行交換。

5.6 構建高效且可伸縮的結果緩存

我們將開發一個高效且可伸縮的緩存。我們首先從簡單的HashMap開始,然後分析它的並發性缺陷,並討論如何修復它們。

技術分享圖片

技術分享圖片

由於ConcurrentHashMap是線程安全的,因此在訪問底層Map時就不需要進行同步。但它仍然存在一些不足。當兩個線程同時調用compute時存在一個漏洞,如果某個線程啟動了一個開銷很大的計算,而其他線程並不知道這個計算正在進行,那麽很可能會重復這個計算。因此在計算之間最好先判斷是否有其他線程正在進行此計算。FutureTask類能實現這個功能。FutureTasko FutureTask表示一個計算的過程,這個過程可能已經計算完成,也可能正在進行。如果有結果可用,那麽FutureTask.get將立即返回結果,否則它會一直阻塞,直到結果計算出來再將其返回。

用ConcurrentHashMap<A,Future<V>>,替換原來的ConcurrentHashMap<A, V>。首先檢查某個相應的計算是否已經開始,如果還沒有啟動,那麽就創建一個FutureTask,並註冊到Map中,然後啟動計算;如果已經啟動,那麽等待現有計算的結果。結果可能很快會得到,也可能還在運算過程中,但這對於Future.get的調用者來說是透明的。

技術分享圖片

ConcurrentMap中的原子方法putIfAbsent,避免了compute方法中的if代碼塊仍然是非原子(nonatomic)的"先檢查再執行"操作。 (2個線程仍有可能在同一時間內調用compute來計算相同的值,使用put方法則不具備原子性,不安全)。

基礎構建模塊