佇列同步器
佇列同步器AbstractQueueSynchronizer(以下簡稱同步器),是用來構建鎖或者其他同步元件的基礎框架,它使用了一個int成員變量表示同步狀態,通過內建的FIFO佇列來完成資源獲取執行緒的排隊工作,併發包的作者(Doug Lea)期望它能夠成為實現大部分同步需求的基礎。
同步器是實現鎖(也可以是任意同步元件)的關鍵,在鎖的實現中聚合同步器,利用同步器實現鎖的語義。可以這樣理解二者之間的關係:鎖是面向使用者的,它定義了使用者與鎖互動的介面(比如可以允許兩個執行緒並行訪問),隱藏了實現細節;同步器面向的是鎖的實現者,它簡化了鎖的實現方式,遮蔽了同步狀態關係、執行緒的排隊、等待與喚醒等底層操作。
佇列同步器的介面與示例
同步器的設計是基於模板方法模式的,也就是說,使用者需要繼承同步器並重寫指定的方法,隨後將同步器組合在自定義同步元件的實現中,並呼叫同步器提供的模板方法,而這些模板方法將會呼叫使用者重寫方法。
重寫同步器指定的方法時,需要使用同步器提供的如下3個方法來訪問或修改同步狀態。
- getState():獲取當前同步狀態。
- setState(int newState):設定當前同步狀態。
- compareAndSetState(int expect,int update):使用CAS設定當前狀態,該方法能夠保證狀態設定的原子性。
同步器可重寫的方法與描述如表所示:
方法名稱 | 描述 |
---|---|
protected boolean tryAcquire(int arg) | 獨佔鎖獲取同步狀態,實現該方法需要查詢當前狀態並判斷同步狀態是否符合預期,然後再進行CAS設定同步狀態 |
protected boolean tryRelease(int arg) | 獨佔式釋放同步狀態,等待獲取同步狀態的執行緒將有機會獲取同步狀態 |
protected int tryAcquireShared(int arg) | 共享式獲取同步狀態,返回大於等於0的值,表示獲取成功,反之,獲取失敗 |
protected boolean tryReleaseShare(int arg) | 共享式釋放同步狀態 |
protected boolean isHeldExclusively | 當前同步器是否在獨佔模式下被執行緒佔用,一般該方法表示是否被當前執行緒所獨佔 |
實現自定義同步元件時,將會呼叫同步器提供的模板方法,這些(部分)模板方法與描述如表所示:
方法名稱 | 描述 |
---|---|
void acuire(int arg) | 獨佔式獲取同步狀態,如果當前執行緒獲取同步狀態成功,則由該方法返回,否則,將會進入同步佇列等待,該方法將會呼叫重寫的tryAcquire(int arg)方法 |
void acquireInterruptibly(int arg) | 與acquire(int arg)相同,但是該方法響應中斷,當前執行緒未獲取到同步狀態而進入同步佇列中,如果當前執行緒被中斷,則該方法會丟擲InterruptedException並返回 |
boolean tryAcquireNanos(int arg,long nanos) | 在acquireInterruptibly(int arg)基礎上增加了超時限制,如果當前執行緒在超時時間內沒有獲取到同步狀態,那麼將會返回false,如果獲取到了返回true |
void acquireShared(int arg) | 共享式的獲取同步狀態,如果當前執行緒未獲取到同步狀態,將會進入同步佇列等待,與獨佔式獲取的主要區別是在同一時刻可以有多個執行緒獲取到同步狀態 |
void acquireSharedInterruptibly(int arg) | 與acquireShared(int arg)相同,該方法響應中斷 |
boolean tryAcquireSharedNanos(int arg,long nanos) | 在acquireSharedInterruptibly(int arg)基礎上增加了超時限制 |
boolean release(int arg) | 獨佔式的釋放同步狀態,該方法會在釋放同步狀態之後,將同步佇列中第一個節點包含的執行緒喚醒 |
boolean releaseShared(int arg) | 共享式的釋放同步狀態 |
Collection
|
獲取等待在同步佇列上的執行緒集合 |
同步器提供的模板方法基本上分為3類:獨佔式獲取與釋放同步狀態、共享式獲取與釋放同步狀態和查詢同步佇列中的等待執行緒情況。自定義同步元件將使用同步器提供的模板方法來實現自己的同步語義。
只有掌握了同步器的工作原理才能更加深入地理解併發包中其他地併發元件,所以下面通過一個獨佔鎖的示例來深入瞭解一下同步器的工作原理。
顧名思義,獨佔鎖就是在同一時刻只能有一個執行緒獲取到鎖,而其他獲取鎖的執行緒只能處於同步佇列中等待,只有獲取鎖的執行緒釋放了鎖,後繼的執行緒才能夠獲取鎖:
class Mutex implements Lock{
//靜態內部類,自定義同步器
private static class Sync extends AbstractQueuedSynchronizer{
//是否處於佔用狀態
protected boolean isHeldExclusively(){
return getState() == 1;
}
//當狀態為0的時候獲取鎖
public boolean tryAcquire(int acquires){
if(compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//釋放鎖,將狀態設定為0
protected boolean tryRelease(int releases){
if(getState() == 0) throw new
IllegalMonitorStateException();
setExcelusiveOwnerThread(null);
setState(0);
return true;
}
//返回一個Condition,每個codition都包含了一個codition佇列
Codition newConditon() {return new CoditionObject();}
}
//僅需要將操作代理到Sync上即可
private final Sync sync = new Sync();
public void lock(){sync.acuire(1);}
public boolean tryLock() {return sync.tryAcquire(1);}
public void unlock(){sync.release(1);}
public Condition newCodition(){return sysc.newCondition();}
public boolean isLocked() {return sync.isHeldExclusively();}
public boolean hasQueuedThreads(){return sync.hasQueuedThreads();}
public void lockInterruptibly() throw InterruptedException{
sync.acqurieInterruptibly(1);
}
public boolean tryLock(long timeout,TimeUnit unit) throws InterruptedException{
return sync.tryAcquireNanos(1,unit.toNanos(timeout));
}
}
上述示例中,獨佔鎖Mutex是一個自定義同步元件,它在同一時刻只允許一個執行緒佔用鎖。Mutex中定義了一個靜態內部類,該內部類繼承了同步器並實現了獨佔式獲取和釋放同步狀態。在tryAcquire(int acquires)方法中,如果經過CAS設定成功(同步狀態設定為1),則代表獲取了同步狀態,而在tryRelease(int releases)方法中只是將同步狀態重置為0。
佇列同步器的實現分析
1.同步佇列
同步器依賴內部的同步佇列(一個FIFO雙向佇列)來完成同步狀態的管理,當前執行緒獲取同步狀態失敗時,同步器會將當前執行緒以及等待狀態等資訊構造成為一個節點(Node)並將其加入同步佇列,同時會阻塞當前執行緒,當同步狀態釋放時,會把首節點中的執行緒喚醒,使其再次嘗試獲取同步狀態。
同步佇列中的節點(Node)用來儲存獲取同步狀態失敗的執行緒引用、等待狀態以及前驅和後繼節點,節點的屬性型別與名稱以及描述如表所示:
節點是構成同步佇列的基礎,同步器擁有首節點(head)和尾節點(tail),沒有成功獲取同步狀態和執行緒將會成為節點加入該佇列的尾部,同步佇列的基本結構如圖所示:
當一個執行緒成功地獲取了同步狀態(或者鎖),其他執行緒將無法獲取到同步狀態,轉而被構造成為節點並加入到同步佇列中,而這個加入佇列地過程必須要保證執行緒安全,因此同步器提供了一個基於CAS地設定尾節點地方法:compareAndSetTail(Node expect,Node update),它需要傳遞當前執行緒“認為”的尾節點和當前節點,只有設定成功後,當前節點才正式與之前的尾節點建立關聯。
同步器將節點加入到同步佇列的過程如圖所示:
同步佇列遵循FIFO,首節點是獲取同步狀態成功的節點,首節點的執行緒在釋放同步狀態時,將會喚醒後繼節點,而後繼節點將會在獲取同步狀態成功時將自己設定為首節點,過程如圖所示:
上圖中,設定首節點是通過獲取同步狀態成功的執行緒來完成的,由於只有一個執行緒能夠成功獲取到同步狀態,因此設定頭節點的方法並不需要使用CAS來保證,它只需要將首節點設定成為原首節點的後繼節點並斷開原首節點的next引用即可。
2.獨佔式同步狀態獲取與釋放
通過呼叫同步器的acquire(int arg)方法可以獲取同步狀態,該方法對中斷不敏感,也就是由於執行緒獲取同步狀態失敗後進入同步佇列中,後繼對執行緒進行中斷操作時,執行緒不會從同步佇列中移出,該方法程式碼如下:
public final void acquire(int arg){
if(!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}
其主要邏輯是:首先呼叫自定義同步器實現的tryAcquire(int arg)方法,該方法保證執行緒安全的獲取同步狀態,如果同步狀態獲取失敗,則構造同步節點(獨佔式Node.EXCLUSIVE,同一時刻只能有一個執行緒成功獲取同步狀態)並通過addWaiter(Node node)方法將該節點加入到同步佇列的尾部,最後呼叫acquireQueued(Node node,int arg)方法,使得該節點以“死迴圈”的方式獲取同步狀態。如果獲取不到則阻塞節點中的執行緒,而被阻塞執行緒的喚醒主要依靠前驅節點的出隊或阻塞現場被中斷來實現。
下面分析一個相關工作。首先是節點的構造以及加入同步佇列,如程式碼所示。
private Node addWaiter(Node mode){
Node node = new Node(Thread.currentThread(),mode);
//快速嘗試在尾部新增
Node pred = tail;
if(pred != null){
node.prev = pred;
if(compareAndSetTail(pred,node)){
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node){
for(;;){
Node t = tail;
if(t == null){
//Must initialize
if(compareAndHead(new Node()))
tail = head;
}else{
node.prev = t;
if(compareAndSetTail(t,node)){
t.next = node;
return t;
}
}
}
}
上述程式碼通過使用compareAndSetTail(Node expect,Node update)方法來確保節點能夠被執行緒安全的新增。如果使用一個普通的LinkedList來維護節點之間的關係,那麼當一個執行緒獲取了同步狀態,而其他多個執行緒由於呼叫tryAcquire(int arg)方法獲取同步狀態失敗而併發地被新增到LinkedList時,LinkedList將難以保證Node的正確新增,最終的結果可能是節點的數量有偏差,而且順序也是混亂的。
在enq(final Node node)方法中,同步器通過“死迴圈”來保證節點的正確新增,在“死迴圈”中只有通過CAS將節點設定成為尾節點之後,當前執行緒才能從該方法返回,否則,當前執行緒不斷地嘗試設定。可以看出,enq(final Node node)方法將併發節點的請求通過CAS變得“序列化”了。
節點進入同步佇列之後,就進入了一個自旋的過程,每個節點(或者說每個執行緒)都在自省地觀察,當條件滿足,獲取到了同步狀態,就可以從這個自選過程中退出,否則依舊留在這個自旋過程中(並會阻塞節點地執行緒),如程式碼所示:
final boolean acquireQueued(final Node node,int arg){
boolean failed = true;
try{
boolean interrupted = false;
for(;;){
final Node p = node.predecessor();
if(p == head && tryAcquire(arg)){
setHead(node);
p.next = null;//help GC
failed = false;
return interrupted;
}
if(shouldParkAfterFailedAcquire(p,node) &&
parkAndCheckInterrupt())
interrupted = true;
}
}finally{
if(failed)
cancelAcquire(node);
}
}
在acquireQueued(final Node node,int arg)方法中,當前執行緒在“死迴圈”中嘗試獲取同步狀態,而只有前驅結點是頭節點才能夠獲取同步狀態,這是為什麼?原因有兩個,如下。
第一,頭節點是成功獲取到同步狀態的節點,而頭節點的執行緒釋放了同步狀態之後,將會喚醒其後繼節點,後繼節點的執行緒被喚醒後需要檢查自己的前驅節點是否是頭節點。
第二,維護同步佇列的FIFIO原則。該方法中,節點自旋獲取同步狀態的行為如圖所示:
上圖中,由於非首節點執行緒前驅節點出隊或者被中斷而從等待狀態返回,隨後檢查自己的前驅是否是頭節點,如果是則嘗試獲取同步狀態。可以看到節點和節點之間在迴圈檢查的過程中基本不相互通訊,而是簡單地判斷自己的前驅是否為頭節點,這樣就使得節點的釋放規則符合FIFO,並且也便於對過早通知的處理(過早通知是指前驅節點不是頭節點的執行緒由於中斷而被喚醒)。
獨佔式同步狀態獲取流程,也就是acquire(int arg)方法呼叫執行緒,如圖所示:
在上圖中,前驅節點為頭節點且能夠獲取同步狀態的判斷條件和執行緒進入等待狀態是獲取同步狀態的自選過程。當同步狀態獲取成功之後,當前執行緒從acqurie(int arg)方法返回,如果對於鎖這種併發元件而言,代表著當前執行緒獲取了鎖。
當前執行緒獲取同步狀態並執行了相應邏輯之後,就需要釋放同步狀態,使得後續節點能夠繼續獲取同步狀態。通過呼叫同步器的release(int arg)方法可以釋放同步狀態,該方法在釋放了同步狀態之後,會喚醒其後繼節點(進而使後繼節點重新嘗試獲取同步狀態)。該方法程式碼如下所示:
public final boolean release(int arg){
if(tryRelease(arg)){
Node h = head;
if(h != null && h.waitStatus != 0){
unparkSuccessor(h);
}
return true;
}
return false;
}
該方法執行時,會喚醒頭節點的後繼節點執行緒,uparkSuccessor(Node node)方法使用LockSupport來喚醒處於等待狀態的執行緒。
做個總結:在獲取同步狀態時,同步器維護一個同步佇列,獲取狀態失敗的執行緒都會被加入到佇列中並在佇列中進行自旋;移出佇列(或停止自旋)的條件是前驅節點為頭節點且成為獲取了同步狀態。在釋放同步狀態時,同步器呼叫tryRelease(int arg)方法釋放同步狀態,然後喚醒頭節點的後繼節點。
3.共享同步狀態獲取與釋放
共享式獲取與獨佔式獲取最主要的區別在於同一時刻能否有多個執行緒同時獲取到同步狀態。
上圖中,左半部分,共享式訪問資源時,其他共享式的訪問均被允許,而獨佔式訪問被阻塞,右半部分是獨佔式訪問資源時,同一時刻其他訪問均被阻塞。
通過呼叫同步器的acquireShared(int arg)方法可以共享式地獲取同步狀態,該方法程式碼如下:
public final void acquireShared(int arg){
if(tryAcquireShared(arg)<0){
doAcquireShared(arg);
}
private void doAcquireShared(int arg){
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try{
boolean interrupted = false;
for(;;){
final Node p = node.predecessor();
if(p == head){
int r = tryAcquireShared(arg);
if(r >= 0){
setHeadAndPropagate(node,r);
p.next = null;
if(interrupted){
selfInterrupt();
}
failed = false;
return;
}
}
if(shouldParkAfterFailedAcquire(p,node)&&
parkAndCheckInterrupt()){
interrupted = true;
}
}
finally{
if(failed){
cancelAcquire(node);
}
}
}
}
}
在acquireShared(int arg)方法中,同步器呼叫tryAcquireShared(int arg)方法嘗試獲取同步狀態,tryAcquireShared(int arg)方法返回值為int型別,當返回值大於等於0時,表示能夠獲取到同步狀態。因此,在共享式獲取地自旋過程中,成功獲取到同步狀態並退出自旋地條件是tryAcquireShared(int arg)方法返回值大於等於0.可以看到,在doAcquireShared(int arg)地自旋過程中,如果當前節點地前驅為頭節點時,嘗試獲取同步狀態,如果返回值大於等於0,表示該次獲取同步狀態成功並從自旋過程中退出。
4.獨佔式超時獲取同步狀態
響應中斷的同步狀態獲取過程。在Java 5之前,當一個執行緒獲取不到鎖而被阻塞在synchronized之外時,對該執行緒進行中斷操作,此時該過程的中斷標誌位會被修改,但執行緒依舊會阻塞在synchronized上,等待著獲取鎖。在Java 5中,同步器提供了acquireInterruptibly(int arg)方法,這個方法在等待獲取同步狀態時,如果當前執行緒被中斷,會立刻返回,並丟擲InterruptedException。
超市獲取同步狀態過程可以被視作響應中斷獲取同步狀態過程的“增強版”,doAcquireNanos(int arg,long nanosTimeout)方法在支援響應中斷的基礎上,增加了超時獲取的特性。針對超市獲取,主要需要計算出需要睡眠的時間間隔nanosTimeout,為了防止過早通知,nanosTimeout計算公式為:nanosTimeout = now -lastTime,其中now為當前喚醒時間,lastTime為上次喚醒時間。
上圖中可以看出,獨佔式超時獲取狀態doAcquireNanos(int arg,long nanosTimeout)和獨佔式獲取同步狀態acquire(int args)在流程上非常相似,其主要區別在於未獲取到同步狀態時的處理邏輯。
5.自定義同步元件——TwinsLock
該工具在同一時刻,只允許至多兩個執行緒同時訪問,超過兩個執行緒的訪問將被阻塞,我們將這個同步工具命名未TwinsLock。
設定初始狀態status為2,當一個執行緒進行獲取,status減1,該執行緒釋放,則status加1,其中0表示當前已經有兩個執行緒獲取了同步資源,此時再有其他執行緒對同步狀態進行獲取,該執行緒只能被阻塞。