Java鎖(二)ReentrantLock獨佔鎖分析
ReentrantLock的功能是實現程式碼段的併發訪問控制,是一種排它鎖,也就是通常意義上所說的鎖,內部有兩種實現NonfairSync和FairSync,公平鎖和非公平鎖,預設採用非公平鎖策略。ReentrantLock的實現不僅可以替代隱式的synchronized關鍵字,而且能夠提供超過關鍵字本身的多種功能。
1、ReentrantLock的使用
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}
ReentrantLock會保證method-body在同一時間只有一個執行緒在執行這段程式碼,或者說,同一時刻只有一個執行緒的lock方法會返回。其餘執行緒會被掛起,直到獲取鎖。從這裡可以看出,其實ReentrantLock實現的就是一個獨佔鎖的功能:有且只有一個執行緒獲取到鎖,其餘執行緒全部掛起,直到該擁有鎖的執行緒釋放鎖,被掛起的執行緒被喚醒重新開始競爭鎖。那現在看下Doug Lea 怎麼去實現ReentrantLock重入鎖的。首先看下ReentrantLock的建立和加鎖、解鎖過程。
2、ReentrantLock原理分析
/**
* 預設非公平鎖
*/
public ReentrantLock() {
sync = new NonfairSync();
} /**
* 建立ReentrantLock,公平鎖or非公平鎖
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
/**
*加鎖解鎖,使用sync完成
*/
public void lock() {
sync.lock();
}
public void unlock () {
sync.release(1);
}
其中,公平鎖中每個執行緒搶佔鎖的順序為先後呼叫lock方法的順序依次獲取鎖。非公平鎖中每個執行緒搶佔鎖的順序不定,先獲取鎖,獲取不到,然後加入到queue中,和呼叫lock方法的先後順序無關。
如果在絕對時間上,先對鎖進行獲取的請求一定被先滿足,那麼這個鎖是公平的,反之,是不公平的,也就是說等待時間最長的執行緒最有機會獲取鎖,也可以說鎖的獲取是有序的。ReentrantLock這個鎖提供了一個建構函式,能夠控制這個鎖是否是公平的。而鎖的名字也是說明了這個鎖具備了重複進入的可能,也就是說能夠讓當前執行緒多次的進行對鎖的獲取操作,這樣的最大次數限制是Integer.MAX_VALUE,約21億次左右。
事實上公平的鎖機制往往沒有非公平的效率高,因為公平的獲取鎖沒有考慮到作業系統對執行緒的排程因素,這樣造成JVM對於等待中的執行緒排程次序和作業系統對執行緒的排程之間的不匹配。對於鎖的快速且重複的獲取過程中,連續獲取的概率是非常高的,而公平鎖會壓制這種情況,雖然公平性得以保障,但是響應比卻下降了,但是並不是任何場景都是以TPS作為唯一指標的,因為公平鎖能夠減少“飢餓”發生的概率,等待越久的請求越是能夠得到優先滿足。
AQS的佇列操作
在分析加鎖前,先看下AQS的入佇列操作, head,tail節點預設為null,入佇列時,當tail為null時,初始化建立dummy head節點,將入佇列的node插入隊尾。
/**
* Creates and enqueues node for given thread and mode.
* 節點入同步佇列,通過CAS比較然後插入佇列尾部,
* @param current the thread
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 快速入佇列,tail不為null,通過CAS比較然後插入佇列尾部 Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//快速入佇列失敗後執行enq方法
enq(node);
return node;
}
/**
* 插入節點到佇列中,必要的時候初始化頭節點,返回該節點前驅
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 初始化,建立Dummy header,thread為null
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
AQS獲取鎖定(獨佔鎖)
呼叫acquire方法,其實這個方法是阻塞的, 獲取鎖的步驟為:
1、 tryAcquire(由子類Sync實現)嘗試獲取鎖
2、 沒有獲取到鎖,將節點加入到佇列尾部中,加入成功selfInterrupt,中斷當前執行緒。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
沒有獲取到鎖則呼叫AQS的acquireQueued方法:
1、當node的前驅節點是頭節點,並且獨佔時才返回
2、前繼節點不是head, 而且當你的前繼節點狀態是Node.SIGNAL時, 你這個執行緒將被park(),直到另外的執行緒release時,發現head.next是你這個node時才unpark,才能繼續迴圈並獲取鎖
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
/* 當node的前驅節點是頭節點,並且獨佔時才返回 */
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC,佇列中移除頭節點p
failed = false;
return interrupted;
}
/* 阻塞並判斷是否打斷,其實這個判斷才是自旋鎖真正的猥瑣點,
* 意思是如果你的前繼節點不是head,
* 而且當你的前繼節點狀態是Node.SIGNAL時,
* 你這個執行緒將被park(),
* 直到另外的執行緒release時,發現head.next是你這個node時,才unpark,
* 才能繼續迴圈並獲取鎖
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire這個方法刪除所有waitStatus>0也就是CANCELLED狀態的Node,並設定前繼節點為signal
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 節點已經設定狀態Node.SIGNAL,
* 請求釋放使它獲取訊號,所以它才能安全的park
*/
return true;
if (ws > 0) {
/*
*前驅節點被取消,跳過所有的取消的前驅節點和表明重試
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus為0或者PROPAGATE,表明我們需要一個訊號,但是還沒有park,
* 呼叫者需要重試,保證在parking過程中,它不能被獲取到
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/* 禁用當前執行緒,返回是否中斷 */
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
獨佔鎖acquire獲取的過程,如下所示
AQS釋放獨佔鎖
1、tryRelease釋放鎖,沒有釋放成功,返回false
2、鎖釋放成功,喚醒head的後繼節點,返回true
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//unblock,喚醒head的後繼節點
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* 狀態為負數,清除訊號,設定成0
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 喚醒後繼節點(一般是下一個節點),如果節點被取消或者為null
* 反向遍歷從尾到頭找到實際的非取消的後繼節點(問題:為什麼不正向遍歷)
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
ReentrantLock中公平鎖和非公平鎖
兩種鎖都繼承Sync,而 Sync又繼承AbstractQueuedSynchronizer ,所以子類必須實現AQS的5個保護方法。
對於獨佔鎖,需要實現tryAcquire,tryRelease,isHeldExclusively這3個方法,
其中tryRelease,isHeldExclusively是公平鎖和非公平鎖共有的,在Sync中實現。
1、tryRelease嘗試釋放鎖,每呼叫tryRelease(1)一次,將state減去1,當state為0的時候,釋放鎖成功,將獨佔鎖的owner設為null。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2、isHeldExclusively判斷獨佔鎖的owner是不是當前執行緒
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
非公平鎖
現在我們來看NonfairSync加鎖的策略
1、檢視同步佇列鎖的state,不通過同步佇列,通過CAS搶佔獨佔鎖,搶佔成功,將當前執行緒設定成獨佔執行緒,state增加1。
2、獲取不成功,走普通的獲取鎖的流程
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
其中NonfairSync在acquire 獲取鎖的過程中,呼叫tryAcquire嘗試獲取鎖。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
在看Sync的 nonfairTryAcquire方法實現如下,直接通過CAS獲取鎖
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
/* 鎖沒有被佔用,將當前執行緒設定成獨佔鎖的owner */
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
/* 增加獨佔鎖的被執行緒的加鎖次數 */
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
公平鎖
瞭解非公平鎖的獲取過程,我們再看下公平鎖的加鎖過程,瞭解其區別
final void lock() {
acquire(1);
}
通過acquire獲取鎖,沒有像非公平鎖通過CAS操作,直接搶佔獨佔鎖。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
/* hasQueuedPredecessors判斷是否有比當前執行緒等待更久的執行緒在等待
* 沒有的話則通過CAS獲取鎖
*/
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
從上面的程式碼可以看出
1、 公平鎖與非公平鎖的釋放鎖步驟是一致的
2、獲取鎖的過程不一致,非公平鎖是讓當前執行緒優先獨佔,而公平鎖則是讓等待時間最長的執行緒優先,非公平的可能讓其他執行緒沒機會執行,而公平的則可以讓等待時間最長的先執行,但是效能上會差點。