ReentrantLock在Java中Lock的實現原理拿鎖過程分析
import java.util.concurrent.locks.ReentrantLock; public class App { public static void main(String[] args) throws Exception { final int[] counter = {0}; ReentrantLock lock = new ReentrantLock(); for (int i= 0; i < 50; i++){ new Thread(new Runnable() { @Override public void run() { lock.lock(); try { int a = counter[0]; counter[0] = a + 1; }finally { lock.unlock(); } } }).start(); } // 主執行緒休眠,等待結果 Thread.sleep(5000); System.out.println(counter[0]); } }
拿鎖過程 lock.lock();
public void lock() {
sync.lock();
}
final void lock() {
if (compareAndSetState(0, 1)) //預設非公平鎖,所有執行緒在這裡進行搶佔鎖,直接試圖獲取鎖,如果獲取不成功,再放入隊尾
setExclusiveOwnerThread(Thread.currentThread()); //拿到鎖之後執行緒操作
else
acquire(1); //未拿到鎖的進行操作
}
拿到鎖進行操作
setExclusiveOwnerThread(Thread thread)
為AOS中方法
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread; //設定當前擁有獨佔訪問的執行緒。
}
看到這裡,我們基本有了一個大概的瞭解,還記得之前AQS中的int型別的state值,這裡就是通過CAS(樂觀鎖)去修改state的值。lock的基本操作還是通過樂觀鎖來實現的。
獲取鎖通過CAS,那麼沒有獲取到鎖,等待獲取鎖是如何實現的?我們可以看一下else分支的邏輯,acquire方法:
acquire()在AQS中實現的,它的原始碼如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
(01) “當前執行緒”首先通過tryAcquire()嘗試獲取鎖。獲取成功的話,直接返回;嘗試失敗的話,進入到等待佇列排序等待(前面還有可能有需要執行緒在等待該鎖)。 (02) “當前執行緒”嘗試失敗的情況下,先通過addWaiter(Node.EXCLUSIVE)來將“當前執行緒”加入到”CLH佇列(非阻塞的FIFO佇列)”末尾。CLH佇列就是執行緒等待佇列。
任何時刻拿到鎖的只有一個執行緒,未拿到鎖的執行緒會打包成節點(node),然後將節點通過CAS自旋的方式,從佇列尾部放入同步佇列中。(03) 再執行完addWaiter(Node.EXCLUSIVE)之後,會呼叫acquireQueued()來獲取鎖。(04) “當前執行緒”在執行acquireQueued()時,會進入到CLH佇列中休眠等待,直到獲取鎖了才返回!如果“當前執行緒”在休眠等待過程中被中斷過,acquireQueued會返回true,此時”當前執行緒”會呼叫selfInterrupt()來自己給自己產生一箇中斷。
- 呼叫tryAcquire方法,試圖獲取鎖
- 當獲取不到鎖,執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 接著往下走看tryAcquire的實現。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
竟然是一個空實現,說明此處需要子類實現此方法,既然需要子類實現此方法,為什麼不寫成抽象方法呢,當我們瞭解了AQS的實現全貌就全明白了。 這裡我們看下ReentrantLock中公平鎖FairSync的tryAcquire實現。
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//首先通過變數state判斷鎖是否被佔用,0代表未被佔用
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {//hasQueuedPredecessors判斷佇列中是否有其他執行緒在等待,如果沒有執行緒在等待,設定鎖的狀態為1(被佔用),因為獲取鎖的過程是多執行緒同時獲取的,所以需要使用CAS。
setExclusiveOwnerThread(current);//設定佔用排它鎖的執行緒是當前執行緒
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//當前鎖已被佔用,但是佔用鎖的是當前執行緒本身
//還記得我們上面說過ReentrantLock是支援重入的,下面就是重入的實現,當已經獲取鎖的執行緒每多獲取一次鎖,state就執行加1操作
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
當獲取到鎖的時候返回true,當獲取不到鎖的時候返回false。 我們再來看下AQS中acquire方法的實現,當獲取不到鎖就執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法。 在瞭解acquireQueued方法之前,先看下addWaiter方法的實現。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
- 判斷尾節點是否存在,如果存在,則直接將新節點插入到尾節點之後,然後修改尾節點指向。
- 如果尾節點不存在,則執行enq方法,該方法是一個死迴圈,保證新增的節點一定會被加入到佇列中
此時AQS佇列的結構如下
增加尾節點為什麼要用cas,因為會存在多個執行緒競爭尾節點。
addWaiter保證了,節點一定會被插入到佇列中。 接著看acquireQueued的實現
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {//當前節點的上個節點如果是頭結點,並且可以獲取鎖,則切換頭結點
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//鎖獲取失敗,首先,第一次迴圈設定當前節點的前一個節點的waitStatus為SIGNAL(待通知),第二次迴圈執行parkAndCheckInterrupt,掛起當前執行緒LockSupport.park(this);
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
此時AQS佇列的結構如下 如果再有第三個執行緒過來,則AQS佇列結構如下 acquireQueued主要做了兩件事 1. 使當前節點的上一個節點的狀態為SIGNAL狀態. 2. 阻塞當前節點的執行緒
unLock的內部實現
unLock內部也是通過sync物件來實現的,具體實現如下:
public void unlock() {
sync.release(1);
}
relese方法是AQS的方法
public final boolean release(int arg) {
if (tryRelease(arg)) {//判斷是否能釋放鎖
//釋放下一個執行緒,刪除原頭結點的指向
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
在獲取鎖的時候,我們知道tryAcquire是一個空方法,需要子類去實現。可以猜測tryRelease方法應該也是空方法。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
果不其然,是空方法,我們來看下ReentrantLock對tryRelase的重寫。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;//鎖的重入次數減1
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
此時AQS的資料結構如下: 此時僅僅能說明,第二個執行緒可以去獲取鎖,但是並不能代表它已經獲取到鎖,因為頭結點並沒有變。 當第二個執行緒的阻塞狀態被釋放後,acquireQueued方法
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
此時AQS的資料結構如下: