java多執行緒10.構建同步工具
建立狀態依賴類的最簡單方法通常是在類庫中現有狀態依賴類的基礎上進行構造。如果類庫中沒有提供你需要的功能,可以使用java語言和類庫提供的底層機制來構造自己的同步機制,包括內建的條件佇列、顯示地Condition物件以及AbstractQueuedSynchronizer框架。
在單執行緒程式中呼叫方法時,如果基於某個狀態的前提條件未得到滿足,那麼這個條件永遠無法成真。而在併發程式中,基於狀態的條件可能會由於其他執行緒的操作而改變。
可阻塞的狀態依賴操作
acquire lock on object state while(precondition does not hold){ release lock wait until precondition hold operation failif interrupted or timeout expires reacquire lock } perform action reacquire lock
這種加鎖模式有些不尋常,因為鎖是在操作的執行過程中被釋放與重新獲取的。
構成前提條件的狀態變數必須由物件的鎖來保護,從而使它們在測試前提條件地同時保持不變。如果前提條件尚未滿足,就必須釋放鎖,以便其他執行緒可以修改物件的狀態,否則前提條件永遠無法成真。而再次測試前提條件之前,必須重新獲得鎖。
- 提問:有界快取的實現
/** * BaseBoundedBuffer實現了一個基於陣列的迴圈快取,其快取變數buf,tail,head,count均由內建鎖來保護。 * 提供同步的doput和dotake方法,在其子類中來實現put和take方法。底層狀態對子類隱藏。 * *@param <V> */ public abstract class BaseBoundedBuffer<V> { private final V[] buf; private int tail; private int head; private int count; protected BaseBoundedBuffer(int capacity){ this.buf = (V[]) new Object[capacity]; } protected synchronizedfinal void doput(V v){ buf[tail] = v; if(++tail == buf.length){ tail = 0; } ++count; } protected synchronized final V dotake(){ V v = buf[head]; buf[head] = null; if(++head == buf.length){ head = 0; } --count; return v; } public synchronized final boolean isFull(){ return count == buf.length; } public synchronized final boolean isEmpty(){ return count == 0; } }
將前提條件的失敗傳遞給呼叫者:
/** * 儘管實現很簡單,但是實現快取時得到的簡化並不能抵消在使用時的複雜性。呼叫者必須做好捕獲異常的準備,並且每次快取操作時都需要重試。 * * @param <V> */ public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V>{ protected GrumpyBoundedBuffer(int capacity) { super(capacity); } public synchronized void put(V v) throws BufferFullException{ if(isFull()){ throw new BufferFullException(); } doput(v); } public synchronized V take() throws BufferFullException{ if(isEmpty()){ throw new BufferFullException(); } return dotake(); } }
/** * 客戶端程式碼: * 呼叫者可以不進入休眠狀態,而直接呼叫take方法,這種方法被稱為忙等待或自旋等待。 * 如果快取狀態在很長一段時間內都不會發生變化,那麼這種方式將會消耗大量的CPU時間。 * 呼叫者也可以進入休眠狀態來避免消耗過多的CPU時間,但如果快取狀態在剛調完sleep就立即發生變化,那麼將不必要地休眠一段時間。 * 因此客戶端程式碼必須在二者之間進行選擇要麼容忍自旋導致的CPU時鐘浪費,要麼容忍由於休眠而導致的低響應性。 * 另一種選擇是Thread.yield,可能使整體的執行過程更快。 */ while(true){ try{ V item = buffer.take(); //... break; }catch(BufferEmptyException e){ Thread.sleep(SLEEP_TIME); } }
- 改進:通過輪詢與休眠來實現簡單的阻塞
/** * 呼叫者無須在每次呼叫時都實現重試邏輯,簡化了對快取的使用。 * 如果執行緒在休眠或者被阻塞時持有一個鎖,通常是不好的做法,因為只要執行緒不釋放這個鎖,有些條件(快取為滿/空)就永遠無法成真。 * SleepyBoundedBuffer要求呼叫者處理InterruptedException。當一個方法由於等待某個條件為真而阻塞時,需要提供一種取消機制。 * * @param <V> */ public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V>{ protected SleepyBoundedBuffer(int capacity) { super(capacity); } public void put(V v) throws InterruptedException{ while(true){ synchronized(this){ if(!isFull()){ doput(v); return; } Thread.sleep(SLEEP_TIME); } } } public V take() throws InterruptedException{ while(true){ synchronized(this){ if(!isEmpty()){ return dotake(); } Thread.sleep(SLEEP_TIME); } } } }
條件佇列
它使得一組執行緒能夠通過某種方法來等待特定的條件變成真。佇列中的元素是一個個正在等待相關條件地執行緒。
正如每個java物件都可以作為一個鎖,每個物件同樣可以作為一個條件佇列,並且Object中的wait,notify,notifyAll方法就構成了內部條件佇列的API。
Object.wait會自動釋放鎖,並請求作業系統掛起當前執行緒,從而使其他執行緒能夠獲得這個鎖並修改物件的狀態。當掛起的執行緒醒來時,他將在返回前重新獲得鎖。
- 改進:條件佇列實現
/** * 簡單易用,且實現了明晰的狀態依賴性管理 * * @param <V> */ public class BoundedBuffer<V> extends BaseBoundedBuffer<V> { protected BoundedBuffer(int capacity) { super(capacity); } public synchronized void put(V v) throws InterruptedException{ while(isFull()){ wait(); } doput(v); notifyAll(); } public synchronized V take() throws InterruptedException{ while(isEmpty()){ wait(); } V v = dotake(); notifyAll(); return v; } }
使用條件佇列
在條件等待中存在一種重要的三元關係,包括加鎖、wait方法和一個條件謂詞。
在條件謂詞中包含多個狀態變數,而狀態變數由一個鎖來保護,因此在測試條件謂詞之前必須先持有這個鎖。
鎖物件與條件佇列物件(即呼叫wait和notify等方法所在的物件)必須是同一個物件。
synchronized(lock){ while(!condition){ lock.wait(); } dosomething(); }
- 改進:優化條件佇列實現
/** * 上面BoundedBuffer的put和take採用的通知機制是保守的,每當將一個物件放入快取或者從快取中移走一個物件時,就執行一次通知。 * 可以進行優化,僅當從空變為非空,或從滿變為非滿時才發出通知 * * @param v * @throws InterruptedException */ public synchronized void put(V v) throws InterruptedException{ while(isFull()){ wait(); } boolean isEmpty = isEmpty(); doput(v); if(isEmpty){ notifyAll(); } }
閥門類
/** * ThreadGate可以開啟和關閉閥門,並提供一個await方法,該方法能一直阻塞直到閥門開啟。 * 在await中使用的條件謂詞比測試isOpen複雜得多,這是必需的。 * 因為如果當閥門開啟時有N個執行緒正在等待它,那麼這些執行緒都應該允許被執行。 * 然而,如果閥門在開啟後又非常快速的關閉了,並且await方法只檢查isOpen,那麼所有執行緒都可能無法釋放。 * 因此,在ThreadGate中使用了一個更復雜的條件謂詞:每次閥門開啟時,遞增一個generation計數器, * 如果閥門現在是開啟的(isOpen=true),或者閥門自從該執行緒到達後就一直是開啟的,那麼執行緒都可以通過await * 即使開啟後又快速關閉的關閉了(arrivalGeneration != generation),在開啟之前被阻塞的執行緒也能通過。 */ public class ThreadGate { //條件謂詞 isOpen || generation > n private boolean isOpen; private int generation; public synchronized void close(){ isOpen = false; } public synchronized void open(){ ++generation; isOpen = true; notifyAll(); } public synchronized void await() throws InterruptedException{ int arrivalGeneration = generation; while(!isOpen && arrivalGeneration == generation){ wait(); } } }
顯示地Condition物件
public interface Condition{ void await() throws InterruptedException; boolean await(long time,TimeUnit unit) throws InterruptedException; long awaitNanos(long nanosTimeOut) throws InterruptedException; void awaitUninterruptibly(); boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
內建條件佇列存在一些缺陷。每個內建鎖都只能有一個想關聯的條件佇列,因而在像BoundedBuffer這種類中,多個執行緒可能在同一個條件佇列上等待不同的條件謂詞,並且在最常見的加鎖模式下公開條件佇列物件。這些因素都使得無法滿足在使用notifyAll時所有等待執行緒為同一型別的需求。如果想編寫一個帶有多個條件謂詞的併發物件,或者想獲得除了條件佇列可見性之外的更多控制權,就可以使用顯示地Lock和Condition而不是內建鎖和條件佇列。
一個Condition和一個Lock關聯在一起,就像一個條件佇列和一個內建鎖相關聯一樣。
要建立一個Condition,可以再相關聯的Lock上呼叫Lock.newCondition方法。正如Lock比內建鎖提供了更為豐富的功能,Condition同樣比內建條件佇列提供了更豐富的功能:在每個鎖上可存在多個等待、條件等待可以是可中斷或不可中斷、基於時限的等待,以及公平的或非公平的佇列操作。每個Lock可以有任意數量的Condition物件,Condition物件繼承了Lock物件的公平性。
- 改進:使用顯示條件變數的有界快取
/** * 在分析使用多個Condition的類時,比分析一個使用單一內部佇列加多個條件謂詞的類簡單得多。 * 通過將兩個條件謂詞分開並放到兩個等待執行緒集中,Condition使其更容易滿足單詞通知的需求。 * signal比signalAll更高效,它能極大地減少在每次快取操作中發生的上下文切換與鎖請求的次數。 * 與內建鎖和條件佇列一樣,當使用顯示的Lock和Condition時,也必須滿足鎖、條件謂詞和條件變數之間的三元關係。 * 在條件謂詞中包含的變數必須由Lock來保護,並且在檢查條件謂詞以及呼叫await和signal時,必須持有Lock物件。 * * @param <T> */ public class ConditionBoundedBuffer<T> { protected final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); private final T[] items = (T[])new Object[10]; private int tail; private int head; private int count; //阻塞直到notFull public void put(T x) throws InterruptedException{ lock.lock(); try{ while(count == items.length){ notFull.wait(); } items[tail] = x; if(++tail == items.length){ tail = 0; } ++count; notEmpty.signal(); }finally{ lock.unlock(); } } //阻塞直到notEmpty public T take() throws InterruptedException{ lock.lock(); try{ while(count == 0){ notEmpty.await(); } T t = items[head]; items[head] = null; if(++head == items.length){ head = 0; } --count; notFull.signal(); return t; }finally{ lock.unlock(); } } }
Synchronizer
在ReentrantLock和Semaphore這兩個介面之間存在許多共同點,這兩個類都可以用做一個閥門,即每次只允許一定數量的執行緒通過,並當執行緒到達閥門時,可以通過或等待或取消。或許會認為Semaphore是基於ReentrantLock實現的,或者認為ReentrantLock實際上是帶有一個許可的Semaphore。這些實現方式都是可行的。
- 示例:通過鎖來實現計數訊號量
public class SemaphoreOnLock { private final Lock lock = new ReentrantLock(); private final Condition permitAvailable = lock.newCondition(); private int permits; SemaphoreOnLock(int permits){ lock.lock(); try{ this.permits = permits; }finally{ lock.unlock(); } } public void acquire() throws InterruptedException{ lock.lock(); try{ while(permits <= 0){ permitAvailable.await(); } --permits; }finally{ lock.unlock(); } } public void release(){ lock.lock(); try{ ++permits; permitAvailable.signal(); }finally{ lock.unlock(); } } }
事實上,它們的實現都使用了一個共同的基類,AbstractQueuedSynchronizer,這個類也是其他許多同步類的基類。AQS是一個用於構建鎖和同步器的框架,如ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock、SynchronousQueue和FutureTask。
在SemaphoreOnLock中,獲取許可的操作可能在兩個時刻阻塞---當鎖保護訊號量狀態時,以及當許可不可用時。在基於AQS構建的同步器中,只可能在一個時刻發生阻塞,從而降低上下文切換的開銷,並提高吞吐量。AQS在設計時充分考慮了可伸縮性。
AbstractQueuedSynchronizer
在基於AQS構建的同步器類中,最基本的操作包括各種形式的獲取操作和釋放操作。獲取操作時一種依賴狀態的操作,並且通常會阻塞。
當使用鎖或訊號量時 ,獲取操作的含義就很直觀,即獲取的是鎖或者許可,並且呼叫者可能會一直等待直到同步器類處於可被獲取的狀態。
在使用CountDownLatch時,獲取操作意味著等待並直到閉鎖到達結束狀態,在使用FutureTask時,意味著等待並直到任務已經完成。釋放並不是一個可阻塞的操作,當執行釋放操作時,所有在請求時被阻塞的執行緒都會開始執行。
如果一個類想成為狀態依賴的類,那麼它必須擁有一些狀態。AQS負責管理同步器類中的狀態,它管理了一個整數狀態資訊,可以通過getState,setState以及compareAndSetState等protected型別方法來進行操作。這個整數可以用於表示任意狀態。
在ReentrantLock中,它用來表示所有者執行緒已經重複獲取該鎖的次數,Semaphore用它來表示剩餘的許可數量,FutureTask用它來表示任務的狀態。在同步器類中還可以自行管理一些額外的狀態變數,如ReentrantLock儲存了鎖的當前所有者資訊,這樣就能區分某個獲取操作是重入還是競爭的。
事實上,java.util.concurrent中的所有同步器類都沒有直接擴充套件AQS,而是都將它們的相應功能委託給私有的AQS子類來實現。
AQS中獲取操作和釋放操作的標準形式
boolean acquire() throws InterruptedException{ while(當前狀態不允許獲取操作){ if(需要阻塞獲取請求){ 如果當前執行緒不在佇列中,則將其插入佇列 阻塞當前執行緒 }else{ 返回失敗 } } 可能更新同步器的狀態 如果執行緒位於佇列中,則將其移出佇列 返回成功 } void release(){ 更新同步器的狀態 if(新的狀態允許某個被阻塞的執行緒獲取成功){ 解除佇列中一個或多個執行緒的阻塞狀態 } }
- AQS示例:一個簡單的閉鎖
/** * 在OneShotLatch中,AQS狀態用來表示閉鎖狀態---關閉(0)或者開啟(1)。 * await方法呼叫AQS的acquireSharedInterruptibly,然後接著呼叫OneShotLatch中的tryAcquireShared。 * 在tryAcquireShared的實現中必須返回一個值來表示該獲取操作能否執行。 * 如果之前已經打開了閉鎖,那麼tryAcquireShared將返回成功並允許執行緒通過,否則就會返回一個表示獲取操作失敗的值。 * acquireSharedInterruptibly處理失敗的方式,是把這個執行緒放入等待執行緒佇列中。 * 類似的,signal將呼叫releaseShared,接下來又會呼叫tryReleaseShared。 * 在tryReleaseShared中將無條件地把閉鎖的狀態設定為開啟,表示該同步類處於完全釋放的狀態。 */ public class OneShotLatch { private final Sync sync = new Sync(); public void signal(){ sync.release(0); } public void await() throws InterruptedException{ sync.acquireSharedInterruptibly(0); } private class Sync extends AbstractQueuedSynchronizer{ protected int tryAcquireShared(int ignored){ return (getState() == 1) ? 1 : -1; } protected boolean tryReleaseShared(int ignored){ setState(1); return true; } } }
- AQS示例:java.util.concurrent中的ReentrantLock
/** * ReentrantLock只支援獨佔方式的獲取操作,因此它實現了tryAcquire、tryRelease和isHeldExclusively。 * * ReentrantLock將同步狀態用於儲存鎖獲取操作的次數,並且維護一個owner變數來儲存當前所有者執行緒的識別符號, * 只有在當前執行緒剛剛獲取到鎖,或者正要釋放鎖的時候,才會修改這個變數。 * 在tryRelease中檢查owner域,從而確保當前執行緒在執行unlock操作之前已經獲取了鎖 * 在tryAcquire中將使用這個域來區分獲取操作是重入還是競爭的。 * * 當一個執行緒嘗試獲取鎖時,tryAcquire將首先檢查鎖的狀態。如果鎖未被持有,那麼它將嘗試更新鎖的狀態以表示鎖已經被持有。 * 由於狀態可能在檢查後被立即修改,因此tryAcquire使用compareAndSetState來更新狀態。 * 如果鎖狀態表明它已經被持有,並且如果當前執行緒是鎖的持有者,那麼獲取計數會遞增 * 如果當前執行緒不是鎖的擁有者,那麼獲取操作將失敗。 * @param ignored * @return */ protected boolean tryAcquire(int ignored){ final Thread current = Thread.currentThread(); int c = getState(); if(c == 0){ if(compareAndSetState(0,1)){ owner = current; return true; } }else if(current == owner){ setState(c+1); return true; } return false; }
- AQS示例:java.util.concurrent中的Semaphore與CountDownLatch
/** * Semaphore將AQS的同步狀態用於儲存當前可用許可的數量。 * * tryAcquireShared方法首先計算剩餘許可的數量,如果沒有足夠的許可,那麼會返回一個值表示獲取操作失敗。 * 如果還有剩餘的許可,那麼tryAcquireShared會通過compareAndSetState方式來降低許可的計數。 * * 如果這個操作成功,那麼將返回一個值表示獲取操作的成功。 * 在返回值中還包含了表示其他共享獲取操作能否成功的資訊,如果成功,那麼其他等待的執行緒同樣會解除阻塞。 * 當沒有足夠的許可,或者當tryAcquireShared可以通過原子方式來更新許可計數以響應獲取操作時,while迴圈將終止。 * * 雖然compareAndSetState的呼叫可能由於與另一個執行緒發生競爭而失敗,並使其重新嘗試, * 但在經過了一定次數的重試操作以後,在這兩個結束條件中有一個會變為真。 * 同樣,tryReleaseShared將增加許可計數,這可能會解除等待中執行緒的阻塞狀態,並且不斷地重試直到更新操作成功。 * tryReleaseShared的返回值表示在這次釋放操作中解除了其他執行緒的阻塞。 * * CountDownLatch使用AQS的方式很相似,在同步狀態中儲存的是當前的計數值。 * countDown方法呼叫release,從而導致計數值遞減,並且當計數值為零時,解除所有等待執行緒的阻塞。 * await呼叫acquire,當計數器為零時,acquire將立即返回,否則將阻塞。 * * @param acquires * @return */ protected int tryAcquireShared(int acquires){ while(true){ int available = getState(); int remaining = available - acquires; if(remaining < 0 || compareAndSetState(available,remaining)){ return remaining; } } } protected boolean tryReleaseShared(int release){ while(true){ int p = getState(); if(compareAndSetState(p,p + release)){ return true; } } }
FutureTask
Future.get()的語義非常類似於閉鎖的語義-----如果發生了某個事件,那麼執行緒就可以恢復執行,否則這些執行緒將停留在佇列中並直到該事件發生。
在FutureTask中,AQS同步狀態被用來儲存任務的狀態,如:正在執行、已完成或已取消。FutureTask還維護一些額外的狀態變數,用來儲存計算結果或者丟擲異常。
此外,它還維護了一個引用,指向正在執行計算任務的執行緒,因為如果任務取消,該執行緒就會中斷。
ReentrantReadWriteLock
ReadWritelock介面表示存在兩個鎖:一個讀取鎖和一個寫入鎖,但在基於AQS實現的ReentrantReadWriteLock中,單個AQS子類將同時管理讀取加鎖和寫入加鎖。
ReentrantReadWriteLock使用了一個16位的狀態來表示寫入鎖的計數,並且使用了另一個16位的狀態來表示讀取鎖的計數。在讀取鎖上的操作將使用共享的獲取方法與釋放方法,在寫入鎖上的操作將使用獨佔的獲取方法與釋放方法。
AQS在內部維護一個等待執行緒佇列,其中記錄了某個執行緒請求的是獨佔訪問還是共享訪問。在ReentrantReadWriteLock中,當鎖可用時,如果位於佇列頭部的執行緒執行寫入操作,那麼執行緒就會得到這個鎖,如果位於佇列頭部的執行緒執行讀取訪問,那麼佇列中在第一個寫入執行緒之前的所有執行緒都將獲得這個鎖。
#筆記內容參考 《java併發程式設計實戰》