1. 程式人生 > 其它 >Java併發包中鎖原理剖析

Java併發包中鎖原理剖析

概述

Java 5之後新增了Lock介面,自定義類可實現Lock介面,並通過內部靜態類繼承AQS抽象類的方式實現獨佔鎖、共享鎖。

鎖是面向使用者的,它定義了使用者與鎖互動的介面,隱藏了實現細節;同步器面向的是鎖的實現者,它簡化了鎖的實現方式,遮蔽了同步狀態管理、執行緒的排隊、等待與喚醒等底層操作。使用者呼叫子類實現的Lock介面中提供的方法,而這些方法又呼叫同步器的方法來實現具體的功能。

AQS實現原理
AQS是一個Java提供的底層同步工具類,用一個int型別的變數state表示同步狀態,並使用CAS操作來管理這個同步狀態。同時,它還實現了一個FIFO的佇列,底層採用雙向連結串列實現,並有head和tial指標指向頭和尾。

當一個執行緒獲取到鎖之後,通過CAS將state設定為1代表執行緒獲取到鎖;如果這時候有其它的執行緒在競爭鎖,那麼在失敗後其它將會被加入佇列尾部,並且自旋判斷其前驅節點為頭節點&是否成功獲取同步狀態,兩個條件都成立,則將當前執行緒設定為頭節點,如果不是,則用LockSupport.park(this)將當前執行緒掛起 ,等待前驅節點釋放unpark喚醒自己。

ReentrantLock實現原理
ReentrantLock是可重入鎖,通過判斷上次獲取鎖的執行緒是否為當前執行緒(current == getExclusiveOwnerThread()),如果是則可再次進入臨界區並且增加同步狀態值返回最後true,如果不是,則返回false。當釋放鎖時也要減小同步狀態值。

ReentrantLock可實現公平鎖,通過構造傳參的方式。在非公平鎖的基礎上加入了對同步佇列中當前節點是否有前驅節點的判斷,如果該 方法返回true,則表示有執行緒比當前執行緒更早地請求獲取鎖,因此需要等待前驅執行緒獲取並釋放鎖之後才能繼續獲取鎖。

Lock

鎖是用來控制多個執行緒訪問共享資源的方式。在Lock接口出現之前,Java程式是靠synchronized關鍵字實現鎖功能的,而Java SE 5之後,併發包中新增了Lock介面來實現鎖功能,它提供了synchronized關鍵字類似的同步功能,只是在使用時需要顯式地獲取和釋放鎖。
Lock介面:

 public interface Lock {
     void lock();
     void lockInterruptibly() throws InterruptedException;
     boolean tryLock();
     boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
     void unlock();
     Condition newCondition();
 }

Lock介面的使用:

Lock lock  = new ReentrantLock();
lock.lock();
try{
//可能會出現執行緒安全的操作
}finally{
//一定在finally中釋放鎖
//也不能把獲取鎖在try中進行,因為有可能在獲取鎖的時候丟擲異常
  lock.ublock();
}

Lock介面的API:

Lock介面與synchronized關鍵字的區別:

AQS

AQS是AbustactQueuedSynchronizer(佇列同步器)的簡稱,它是一個Java提供的底層同步工具類,用一個int型別的變量表示同步狀態,並提供了一系列的CAS操作來管理這個同步狀態。

AQS的主要作用是為Java中的併發同步元件提供統一的底層支援,例如ReentrantLock,CountdowLatch就是基於AQS實現的,用法是通過繼承AQS實現其模版方法,然後將子類作為同步元件的內部類。

AQS中可重寫的方法分為獨佔式與共享式的

AQS提供的可重寫方法:

/* 獨佔模式(排它模式)獲取同步狀態,獲取同步狀態前要檢查當前執行緒是否可以獲取。因為可能已被其它執行緒獲取,
 * 當同步狀態允許被執行緒多次獲取時,獨佔模式只能被同一個執行緒多次獲取。未獲取同步狀態的執行緒,將在同步佇列中
 * 等待。
 */ 
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}

/*獨佔模式釋放同步狀態,方法呼叫完後在同步佇列中等待獲取同步狀態的執行緒將有機會獲取同步狀態*/
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}

/*共享模式獲取同步狀態,返回大於等於0,表示成功獲取同步狀態,反之代表失敗*/ 
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}

/*共享模式釋放同步狀態*/ 
protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();}

/*同步器是否在獨佔模式(排它模式)下被執行緒佔用,一般改方法表示同步狀態是否已被當前執行緒獲取過*/ 
protected boolean isHeldExclusively() {throw new UnsupportedOperationException();}

實現自定的同步元件時,將會呼叫AQS提供的模板方法,這些模板方法內部又將呼叫上面重寫的tryAcquire、 tryRelease、 tryAcquireShared 、tryReleaseShared isHeldExclusively方法。
模板方法描述如下:

/*獨佔模式獲取同步狀態,該方法會先呼叫重寫的tryAcquire方法嘗試獲取同步狀態,如果當前執行緒成功獲取同步狀態,
 * 該方法直接返回,否則當前執行緒會被封裝成一個節點放入同步佇列中等待
 */
public final void acquire(int arg) {//實現省去...}

/*與acquire方法類似,但該方法能響應中斷,如果當前執行緒未能獲取到同步狀態,而在同步佇列中,當這個執行緒被中斷時,
 *則該丟擲InterruptedException 異常並返回
 */
public final void acquireInterruptibly(int arg){//實現省去...}

/*在acquireInterruptibly方法上加了超時機制,如果當前執行緒在超過時間內沒有獲取同步狀態,那麼將返回false,
 *獲取到返回ture
 */
public final boolean tryAcquireNanos(int arg, long nanosTimeout){//實現省去...}

/*共享式獲取同步狀態,該方法會先呼叫重寫的tryAcquireShared方法嘗試獲取同步狀態,如果當前執行緒成功獲取同步狀態,
 *該方法直接返回,否則當前執行緒會被封裝成一個節點放入同步佇列中等待
 */
public final void acquireShared(int arg) {//實現省去...}

/*以acquireShared方法類似,在其基礎上增加了響應中斷的功能
 */
public final void acquireSharedInterruptibly(int arg){//實現省去...}

/*在acquireSharedInterruptibly方法的基礎上增加了超時限制
 */public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout){//實現省去...}

/*獨佔模式釋放同步狀態,該方法會先呼叫重寫的tryRelease方法,如果釋放同步狀態成功,將同步佇列中第一個節點中
 *對應執行緒喚醒
 */
public final boolean release(int arg) {//實現省去...}

/*共享模式釋放同步狀態,該方法會先呼叫重寫的tryReleaseShared方法
 */
public final boolean releaseShared(int arg) {//實現省去...}

AQS 提供瞭如下三個方法對應state狀態進行管理:

/*獲取當前同步狀態*/
protected final int getState() {return state;}

/*設定當前同步狀態*/
protected final void setState(int newState) {state = newState;}

/*使用Unsafe的CAS操作設定當前同步狀態,該方法操作能保證原子性*/
protected final boolean compareAndSetState(int expect, int update) {
   // See below for intrinsics setup to support this  
   return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

實現

同步佇列

同步佇列是AQS很重要的組成部分,它是一個雙端佇列,遵循FIFO原則,主要作用是用來存放在鎖上阻塞的執行緒,當一個執行緒嘗試獲取鎖時,如果已經被佔用,獲取鎖失敗那麼當前執行緒就會被構造成一個Node節點加入到同步佇列的尾部,佇列的頭節點是成功獲取鎖的節點,當頭節點執行緒釋放鎖時,會喚醒後面的節點並釋放當前頭節點的引用。

同步佇列中的節點(Node)用來儲存獲取同步狀態失敗的執行緒引用、等待狀態以及前驅和後繼節點。

使用CAS將節點插入到尾部,並用tail指向該結點。

2.2.2 獨佔鎖的獲取和釋放流程
獲取

呼叫入口方法acquire(arg)
呼叫模版方法tryAcquire(arg)嘗試獲取鎖,若成功則返回,若失敗則走下一步
將當前執行緒構造成一個Node節點,並利用addWaiter(Node node) 將其加入到同步佇列尾部
呼叫acquireQueued(Node node,int arg)方法,使得該 節點以“死迴圈”的方式獲取同步狀態
自旋時,首先判斷其前驅節點為頭節點且釋放&是否成功獲取同步狀態,兩個條件都成立,則將當前執行緒的節設定為頭節點,如果不是,則利用LockSupport.park(this)將當前執行緒掛起 ,等待前驅節點釋放喚醒自己,之後繼續判斷。
釋放

呼叫入口方法release(arg)
呼叫模版方法tryRelease(arg)釋放同步狀態
利用LockSupport.unpark(currentNode.next.thread)喚醒後繼節點(接獲取的第五步)

2.2.3 共享鎖的獲取和釋放流程
共享式獲取與獨佔式獲取最主要的區別在於同一時刻能否有多個執行緒同時獲取到同步狀態

獲取鎖

在acquireShared(int arg)方法中,同步器呼叫tryAcquireShared(int arg)方法嘗試獲取同步狀態
tryAcquireShared(int arg)方法返回值為int型別,當返回值大於等於0時,表示能夠獲取到同步狀態。因此,在共享式獲取的自旋過程中,成功獲取到同步狀態並退出自旋的條件就是 tryAcquireShared(int arg)方法返回值大於等於0。
可以看到,在doAcquireShared(int arg)方法的自 旋過程中,如果當前節點的前驅為頭節點時,嘗試獲取同步狀態,如果返回值大於等於0,表示該次獲取同步狀態成功並從自旋過程中退出。

釋放鎖

呼叫releaseShared(arg)模版方法釋放同步狀態
呼叫模版方法tryReleaseShard(arg)釋放同步狀態
如果釋放成功,則遍歷整個佇列,利用LockSupport.unpark(nextNode.thread)喚醒所有後繼節點
與獨佔式區別在於執行緒安全釋放,通過迴圈和CAS保證,因為釋放同步狀態的操作會同時來自多個執行緒

2.2.4 獨佔鎖和共享鎖在實現上的區別
獨佔鎖的同步狀態值為1,即同一時刻只能有一個執行緒成功獲取同步狀態
共享鎖的同步狀態>1,取值由上層同步元件確定
獨佔鎖佇列中頭節點執行完成後釋放它的直接後繼節點
共享鎖佇列中頭節點執行完成後釋放它後面的所有節點
共享鎖中會出現多個執行緒(即同步佇列中的節點)同時成功獲取同步狀態的情況

2.2.5 重入鎖
重入鎖指的是當前執行緒成功獲取鎖後,如果再次訪問該臨界區,則不會對自己產生互斥行為。Java中ReentrantLock和synchronized都是可重入鎖,synchronized由JVM偏向鎖實現可重入鎖,ReentrantLock可重入性基於AQS實現。

重入鎖的基本原理是判斷上次獲取鎖的執行緒是否為當前執行緒(current == getExclusiveOwnerThread()),如果是則可再次進入臨界區,如果不是,則阻塞。

final boolean nonfairTryAcquire(int acquires) {
//獲取當前執行緒
final Thread current = Thread.currentThread();
//通過AQS獲取同步狀態
int c = getState();
//同步狀態為0,說明臨界區處於無鎖狀態,
if (c == 0) {
//修改同步狀態,即加鎖
if (compareAndSetState(0, acquires)) {
//將當前執行緒設定為鎖的owner
setExclusiveOwnerThread(current);
return true;
}
}
//如果臨界區處於鎖定狀態,且上次獲取鎖的執行緒為當前執行緒
else if (current == getExclusiveOwnerThread()) {
//則遞增同步狀態
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

如果是獲取鎖的執行緒再次請求,則將同步狀態值進行增加並返回 true,表示獲取同步狀態成功。

成功獲取鎖的執行緒再次獲取鎖,只是增加了同步狀態值,這也就要求ReentrantLock在釋放 同步狀態時減少同步狀態值

2.2.6 公平鎖和非公平鎖
對於非公平鎖,只要CAS設定 同步狀態成功,則表示當前執行緒獲取了鎖,而公平鎖則不同

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//此處為公平鎖的核心,即判斷同步佇列中當前節點是否有前驅節點
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
該方法與nonfairTryAcquire(int acquires)比較,唯一不同的位置為判斷條件多了 hasQueuedPredecessors()方法,即加入了同步佇列中當前節點是否有前驅節點的判斷,如果該 方法返回true,則表示有執行緒比當前執行緒更早地請求獲取鎖,因此需要等待前驅執行緒獲取並釋放鎖之後才能繼續獲取鎖。

2.2.7 讀寫鎖
Java提供了一個基於AQS到讀寫鎖實現ReentrantReadWriteLock,該讀寫鎖到實現原理是:將同步變數state按照高16位和低16位進行拆分,高16位表示讀鎖,低16位表示寫鎖。

寫鎖的獲取與釋放 寫鎖是一個獨佔鎖,所以我們看一下ReentrantReadWriteLock中tryAcquire(arg)的實現:

 protected final boolean tryAcquire(int acquires) {
         Thread current = Thread.currentThread();
         int c = getState();
         int w = exclusiveCount(c);
         if (c != 0) {
             if (w == 0 || current != getExclusiveOwnerThread())
                 return false;
             if (w + exclusiveCount(acquires) > MAX_COUNT)
                 throw new Error("Maximum lock count exceeded");
             // Reentrant acquire
             setState(c + acquires);
             return true;
         }
         if (writerShouldBlock() ||
             !compareAndSetState(c, c + acquires))
             return false;
         setExclusiveOwnerThread(current);
         return true;
     }


上述程式碼的處理流程已經非常清晰:

獲取同步狀態,並從中分離出低16為的寫鎖狀態
如果同步狀態不為0,說明存在讀鎖或寫鎖
如果存在讀鎖(c !=0 && w == 0),則不能獲取寫鎖(保證寫對讀的可見性)
如果當前執行緒不是上次獲取寫鎖的執行緒,則不能獲取寫鎖(寫鎖為獨佔鎖)
如果以上判斷均通過,則在低16為寫鎖同步狀態上利用CAS進行修改(增加寫鎖同步狀態,實現可重入) 將當前執行緒設定為寫鎖的獲取執行緒

寫鎖的釋放過程與獨佔鎖基本相同:

 protected final boolean tryRelease(int releases) {
         if (!isHeldExclusively())
             throw new IllegalMonitorStateException();
         int nextc = getState() - releases;
         boolean free = exclusiveCount(nextc) == 0;
         if (free)
             setExclusiveOwnerThread(null);
         setState(nextc);
         return free;
     }


在釋放的過程中,不斷減少讀鎖同步狀態,只為同步狀態為0時,寫鎖完全釋放。

讀鎖的獲取與釋放

讀鎖是一個共享鎖,獲取讀鎖的步驟如下:

獲取當前同步狀態
計算高16為讀鎖狀態+1後的值
如果大於能夠獲取到的讀鎖的最大值,則丟擲異常
如果存在寫鎖並且當前執行緒不是寫鎖的獲取者,則獲取讀鎖失敗
如果上述判斷都通過,則利用CAS重新設定讀鎖的同步狀態

讀鎖的釋放步驟與寫鎖類似,即不斷的釋放寫鎖狀態,直到為0時,表示沒有執行緒獲取讀鎖。

三、使用AQS與Lock自定義一個鎖

class Mutex implements Lock {

 // 靜態內部類,自定義同步器    
 private static class Sync extends AbstractQueuedSynchronizer {            
     
     // 是否處於佔用狀態            
     protected boolean isHeldExclusively() {                    
         return getState() == 1;            
     }            
     
     // 當狀態為0的時候獲取鎖            
     public boolean tryAcquire(int acquires) {                    
         if (compareAndSetState(0, 1)) {   
             setExclusiveOwnerThread(Thread.currentThread()); 
             return true;                    
         }                    
         return false;            
     }            
     
     // 釋放鎖,將狀態設定為0            
     protected boolean tryRelease(int releases) {                    
         if (getState() == 0) 
             throw new IllegalMonitorStateException();            
         setExclusiveOwnerThread(null);                    
         setState(0);                    
         return true;            
     }            
     
     // 返回一個Condition,每個condition都包含了一個condition佇列            
     Condition newCondition() { 
         return new ConditionObject(); 
     }    
 }    
 
 // 僅需要將操作代理到Sync上即可    
 private final Sync sync = new Sync();
 
 public void lock() { 
     sync.acquire(1); 
 }
 
 public boolean tryLock() { 
     return sync.tryAcquire(1); 
 }
 
 public void unlock() { 
     sync.release(1); 
 }    
 
 public Condition newCondition() { 
     return sync.newCondition(); 
 }
 
 public boolean isLocked() { 
     return sync.isHeldExclusively(); 
 }
 
 public boolean hasQueuedThreads() { 
     return sync.hasQueuedThreads(); 
 }
 
 public void lockInterruptibly() throws InterruptedException {            
     sync.acquireInterruptibly(1);    
 }
 
 public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException{ 
     return sync.tryAcquireNanos(1, unit.toNanos(timeout));    
 } 

}
流程:

這個自定義類Mutex首先實現了Lock介面,
內部靜態類Sync繼承了AQS抽象類,並重寫了獨佔式的tryAcquire和tryRelease方法,
接著Mutex例項化Sync內部類,
Mutex類重寫Lock介面的方法,如lock、tryLock、unlock等方法,具體實現是通過呼叫Sync類中的重寫的方法(tryAcquire)以及模板方法(acquire)等
使用者使用Mutex時呼叫Mutex提供的方法,在Mutex的實現中,呼叫同步器的模板方法acquire(int args)