java鎖的語義及ReentrantLock原始碼剖析
阿新 • • 發佈:2019-02-03
鎖是java併發的重要機制,它除了可以讓臨界區互斥執行以外,還可以向其它執行緒傳送訊息。
監視器鎖
下面使用happens-before來分析其執行過程。happens-before規則詳見 http://blog.csdn.net/quanzhongzhao/article/details/45619135 。class MonitorExample { int a = 0; public synchronized void write() { //1 a++; //2 } //3 public synchronized void read() { //4 int i = a; //5 ... } //6 }
1、程式順序規則:1 happens before 2, 2 happens before 3; 4 happens before 5, 5 happens before 6。
2、監視器鎖規則:3 happens before4。
3、happens-before規則的傳遞性:2 happens-before 5。
假如A執行緒先執行write()方法,即先獲取監視器鎖,然後B執行緒使用方法read()方法獲取變數a的值。因為2 happens-before5,所以A執行緒在釋放監視器鎖之前對變數a的操作,在A釋放監視器(按照監視器鎖的語義,釋放監視器 將本地記憶體中共享變數重新整理到主記憶體)。所以隨後B獲取監視器鎖(獲取監視器會清空本地記憶體共享變數資料,並從主記憶體讀取相應資料)後A對a的操作對B都是可見的。
JUC併發包中的鎖
下面以ReenterantLock為例,說一下JUC併發包中的鎖是如何實現的。ReentrantLock是一種互斥鎖,但是是可重入的,即獲得該鎖的執行緒可以多次再獲得該鎖。ReentrantLock的實現依賴於其中的Sync型別變數。class ReentrantLockExample { int a = 0; ReentrantLock lock = new ReentrantLock(); public void write() { lock.lock(); //獲取鎖 try { a++; } finally { <span style="white-space:pre"> </span>lock.unlock(); //釋放鎖 <span style="white-space:pre"> </span>} } public void reade () { lock.lock(); //獲取鎖 try { int i = a; …… } finally { lock.unlock(); //釋放鎖 } } }
而Sync是同步器AQS的子類,AQS即AbstractQueuedSynchronizer類。
AbstractQueuedSynchronizer使用一個整型volatile變數來維護同步器的狀態,這個volatile的變數state是鎖記憶體語義實現的關鍵。
/**
* The synchronization state.
*/
private volatile int state;
先看看ReentrantLock的建構函式吧
public ReentrantLock() {
sync = new NonfairSync(); //預設使用非公平鎖
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync() ; //若傳入引數fair = true,則使用公平鎖
}
公平鎖與非公平鎖的區別主要在獲得鎖時的策略。後面再具體分析。下面分析一下我們使用lock.lock()方法和lock.unlock時候都發生了什麼。
public void lock() {
sync.lock();
}
先來看看公平鎖的lock.lock()方法
final void lock() {
acquire(1); //直接呼叫acquire()方法。
}
再來看看非公平鎖的實現,此處(lock方法)為公平鎖與非公平鎖的第一處不同。
final void lock() {
if (compareAndSetState(0, 1)) //首先CAS判斷同步器的狀態state是否為0,為0表示當前鎖可獲得,獲取該鎖。
setExclusiveOwnerThread(Thread.currentThread()); //並設定當前執行緒為鎖的擁有者。
else
acquire(1); //若CAS失敗,則同公平鎖一樣呼叫acquire()方法。
}
acquire()方法由AbstractQueuedSynchronizer定義
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
巨集觀上解釋一下acquire()方法
1、使用tryAcquire()方法嘗試獲取鎖,獲取成功則該方法直接返回true 表示獲取成功 。該方法NonfairSync和FairSync都有實現。
2、嘗試獲取失敗,則新生成一個節點(Node),其型別為互斥鎖(Exclusive),存放當前執行緒。當前執行緒獲取鎖失敗,則使用acquireQueued方法讓等待佇列中執行緒獲取鎖,
該方法是自旋的,即獲取成功才能返回,保證了等待佇列的非阻塞。且該方法是非中斷模式的,即不響應中斷,但會記錄中斷狀態。
3、acquireQueued方法呼叫過程中可能會有中斷產生,有的話,執行完畢後在此處引發一箇中斷。
當多個執行緒嘗試獲取ReentrantLock鎖時,只有一個執行緒能夠獨佔該鎖,其它執行緒則放入一個由Node組成等待佇列中。其中每個Node持有一個執行緒,並維護節點的狀態值。
static final class Node {
static final Node SHARED = new Node(); //表示節點是共享節點
static final Node EXCLUSIVE = null; //獨佔節點
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus; //節點狀態值。為SIGNAL/CANCELLED/CONDITION/PROPAGATE。
volatile Node prev; //等待佇列下一節點
volatile Node next;
volatile Thread thread; //節點持有的執行緒。一個節點代表一個等待的執行緒。
Node nextWaiter; //用於waitStatus=CONDITION的節點。
Node() {} // Used to establish initial head or SHARED marker
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
由上可知,節點的狀態值也是一個volatile變數。
當waitStatus=SIGNAL時,表示該節點的後繼節點正在阻塞狀態,當前執行緒釋放鎖時或者當前執行緒狀態變為CANCELLED,即取消獲取鎖時需要喚醒(unpark)後繼節點。為了避免競爭,acquire方法首先指示我們需要一個Signal acquireQueued(addWaiter(Node.EXCLUSIVE), arg) //即生成一個Node,其型別為EXCLUSIVE
當waitStatus=CANCELLED時,表示由於超時或者中斷導致該等待執行緒被取消,即執行緒不再獲取鎖。
當waitStatus=CONDITION時,表示節點在條件佇列中。(目前不太理解這一狀態)
當waitStatus=PROPAGATE時,用於共享鎖。releaseShared應該傳遞給佇列中的其它節點。用於doReleaseShared()方法中。節點先說到這裡,下面詳細介紹acquire()方法。 其中tryAcquire()方法在AQS類中定義,NonfairSync類和FairSync類中分別覆寫該方法。此處(tryAcquire方法)為公平與非公平鎖的第二處不同。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException(); //AQS中tryAcquire()方法並未實現,也未使用抽象方法,考慮到AQS的兩種功能(共享鎖與互斥鎖)
} //避免子類實現抽象方法時需要實現兩種功能。
//非公平鎖版本
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread(); //獲取當前執行緒
int c = getState(); //獲取同步器狀態state
if (c == 0) {
if (compareAndSetState(0, acquires)) { //state=0,表示鎖空閒,可以獲取。使用CAS比較並置state的狀態為1。
setExclusiveOwnerThread(current); //設定當前執行緒為鎖的持有者。
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // state!=0,則表示鎖已被某執行緒持有,則判斷鎖的持有執行緒 是否是 當前執行緒
int nextc = c + acquires; //鎖的持有者為當前執行緒,則更新state的值。前面說過ReentrantLock可重入。
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc); //設定state的值為新值。
return true;
}
return false; //前面兩種條件都不滿足,則返回false,表示當前執行緒獲取鎖失敗
}
//公平鎖版本
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread(); //獲取當前執行緒
int c = getState(); //獲取同步器狀態
if (c == 0) { //當前同步器state=0,鎖空閒,可獲取
if (!hasQueuedPredecessors() && //判斷是否有等待佇列中是否有其它執行緒
compareAndSetState(0, acquires)) { //沒有的話,則嘗試獲取鎖
setExclusiveOwnerThread(current); //獲取成功,設定當前執行緒為鎖的持有者。
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //若state!=0,判斷鎖的持有執行緒是否是當前執行緒
int nextc = c + acquires; //是,則重入,並更新鎖的狀態值state
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true; //表示獲取鎖成功
}
return false; //獲取失敗
}
當tryAcquire方法失敗時,繼而呼叫addWaiter()方法,新建節點持有當前執行緒,然後將其加入等待佇列連結串列尾。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); //持有當前執行緒,接收引數mode=EXCLUSIVE 生成新節點,
Node pred = tail;
if (pred != null) { //判斷等待佇列是否為空
node.prev = pred;
if (compareAndSetTail(pred, node)) { //佇列非空,嘗試CAS更新隊尾。
pred.next = node;
return node; //CAS成功,則返回該節點。
}
}
enq(node); //佇列為空或者CAS失敗 則以自旋的方式 加入佇列。
return node;
}
private Node enq(final Node node) {
for (;;) { //自旋,直到節點成功加入等待佇列。
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) //佇列為空,建立新的頭結點。
tail = head;
} else {
node.prev = t; //非空,則CAS更改佇列尾部
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //表示是否成功獲取鎖
try {
boolean interrupted = false;
for (;;) { //自旋,直到 等待佇列中某一執行緒 成功獲取鎖
final Node p = node.predecessor(); //獲得當前節點前驅
if (p == head && tryAcquire(arg)) { //其前驅節點為佇列頭節點,則嘗試獲取鎖。
setHead(node); //獲取鎖成功,設定當前節點為頭結點,
p.next = null; // help GC //此時原來的頭結點 p 已經無效,設定為null有助於垃圾回收。
failed = false;
return interrupted; //此過程中未產生中斷,interrupted=false;所以acquire()方法不需要呼叫 selfInterrupt()方法。
}
if (shouldParkAfterFailedAcquire(p, node) && //當前節點 前驅節點不是頭節點,或者 是頭結點但獲取鎖失 敗呼叫該函式。
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
繼續展開 shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //獲取前驅結點p的節點狀態waitStatus
if (ws == Node.SIGNAL) //前驅結點 waitStatus=SIGNAL,表示需要喚醒(unpark)其後繼節點
return true; //返回,進而呼叫 parkAndCheckInterrupt()
if (ws > 0) { //節點狀態不是SIGNAL,大於0 ,只可能是CANCELLED ,表示前驅結點被取消,不再獲取鎖。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); //只要節點狀態為CANCELLED,則繼續向前搜尋前驅節點,即為當前節點找到一個狀態不為CANCELLED的前驅結點。
pred.next = node; //找到,將其設為當前節點的前驅節點。
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //waitStatus must be 0 or PROPAGATE。 Indicate that we need a signal, but don't park yet
} //Caller will need to retry to make sure it cannot acquire before parking
return false;
}
private final boolean parkAndCheckInterrupt() { //阻塞當前執行緒,並檢查acquireQueued函式執行過程是是否發生中斷
LockSupport.park(this);
return Thread.interrupted();
}
至此,lock.lock()方法執行完畢。
再看一下lock.unlock()方法。
public void unlock() {
sync.release(1);
}
release()方法是在AQS中定義並實現的,子類並未實現。
public final boolean release(int arg) {
if (tryRelease(arg)) { //嘗試釋放鎖
Node h = head;
if (h != null && h.waitStatus != 0) //釋放成功判斷頭結點是否為空,並且判斷其waitStatus狀態
unparkSuccessor(h); //喚醒後繼節點。
return true;
}
return false; //釋放失敗
}
protected boolean tryRelease(int arg) { //AQS中tryRelease()方法並未實現,在子類Sync中實現。
throw new UnsupportedOperationException();
}
tryRelease()方法在Sync中實現,並無公平鎖與非公平鎖之分。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c); //釋放鎖的最後,寫volatile變數state
return free;
}
釋放鎖的最後寫volatile變數state;在獲取鎖時首先讀這個volatile變數。根據volatile的happens-before規則,釋放鎖的執行緒對volatile變數的寫會重新整理到主記憶體,獲取鎖的執行緒讀會強制從主記憶體來讀取該共享變數。即volatile變數保證了共享變數線上程之間的可見性。最後說一下CAS方法,該方法時原子的,判斷state值是否為expect,是則CAS更改state=update;
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
CAS操作具有volatile 讀和寫的記憶體語義。編譯器不會對volatile讀與volatile讀後面的任意記憶體操作重排序;編譯器不會對volatile寫與volatile寫前面的任意記憶體操作重排序。組合這兩個條件,意味著為了同時實現volatile讀和volatile寫的記憶體語義,編譯器不能對CAS與CAS前面和後面的任意記憶體操作重排序。而具體實現的時候,CAS是依靠處理器指令級別的控制來實現原子操作。現在對公平鎖和非公平鎖的記憶體語義做個總結:
1、公平鎖和非公平鎖釋放時,最後都要寫一個volatile變數state。
2、 公平鎖獲取時,首先會去讀這個volatile變數。
3、非公平鎖獲取時,首先會用CAS更新這個volatile變數,這個操作同時具有volatile讀和volatile寫的記憶體語義。
JUC鎖記憶體語義實現的總結
從本文對ReentrantLock的分析可以看出,鎖釋放-獲取的記憶體語義的實現至少有下面兩種方式:1. 利用volatile變數的寫-讀所具有的記憶體語義。
2. 利用CAS所附帶的volatile讀和volatile寫的記憶體語義。
由於java的CAS同時具有volatile 讀和volatile寫的記憶體語義,因此Java執行緒之間的通訊現在有了下面四種方式:
1. A執行緒寫volatile變數,隨後B執行緒讀這個volatile變數。
2. A執行緒寫volatile變數,隨後B執行緒用CAS更新這個volatile變數。
3. A執行緒用CAS更新一個volatile變數,隨後B執行緒用CAS更新這個volatile變數。
4. A執行緒用CAS更新一個volatile變數,隨後B執行緒讀這個volatile變數。
Java的CAS會使用現代處理器上提供的高效機器級別原子指令,這些原子指令以原子方式對記憶體執行讀-改-寫操作,這是在多處理器中實現同步的關鍵(從本質上來說,能夠支援原子性讀-改-寫指令的計算機器,是順序計算圖靈機的非同步等價機器,因此任何現代的多處理器都會去支援某種能對記憶體執行原子性讀-改-寫操作的原子指令)。同時,volatile變數的讀/寫和CAS可以實現執行緒之間的通訊。把這些特性整合在一起,就形成了整個concurrent包得以實現的基石。如果我們仔細分析concurrent包的原始碼實現,會發現一個通用化的實現模式:
1. 首先,宣告共享變數為volatile;
2. 然後,使用CAS的原子條件更新來實現執行緒之間的同步;
3. 同時,配合以volatile的讀/寫和CAS所具有的volatile讀和寫的記憶體語義來實現執行緒之間的通訊。
AQS,非阻塞資料結構和原子變數類(java.util.concurrent.atomic包中的類),這些concurrent包中的基礎類都是使用這種模式來實現的,而concurrent包中的高層類又是依賴於這些基礎類來實現的。
本文鎖的語義實現部分主要參考了程曉明<深入理解java記憶體模型>第六章鎖的實現,其中最後仿宋字型部分為直接摘抄,其中AQS繼承體系圖來自 Java多執行緒系列--“JUC鎖”03之 公平鎖(一) 。如有錯漏,多多指出啊。