Reentrantlock原始碼解析(一):lock方法
對於java中實現併發加鎖的方式可以分為兩種,一種是重量級的synchronized,一種是concurrent包下的lock介面。本系列文章將對這兩種鎖和依賴於cpu嗅探技術的volatile進行詳細說明。
在Lock接口出現前,Java程式是靠synchronized的關鍵字實現鎖功能的,而Java SE 5 下之後,併發包中新增了Lock介面及其相關實現類用來實現鎖功能,它提供了與synchronized關鍵字類似的同步功能,知識在使用時需要顯示的獲取和釋放鎖。雖然它缺少了隱式獲取釋放鎖的便捷性,但是擁有了鎖獲取與釋放的可操作性,可中斷的獲取鎖以及超時獲取鎖等synchronized關鍵字所不具備的同步特性。
使用範例:這裡使用的是lock的實現類Reentrantlock
private static Lock lock = new ReentrantLock();
public static void critical() {
lock.lock();
try {
System.out.println("start " + Thread.currentThread().getName());
Thread.sleep(1000);
System.out.println("end " + Thread.currentThread().getName())
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
注意:不能在try塊中進行lock,如果程式碼發生異常,會導致鎖的無故釋放
在介紹Reentrantlock前首先需要介紹一下AbstractQueuedSynchronizer,下面簡稱AQS。它是一個非常重要的同步佇列,concurrent包下許多工具都用到了這個佇列同步器,比如著名的CountDownLatch。通過檢視原始碼可以發現,AQS在內部實現了一個鏈式FIFO的雙向佇列,擁有Head和Tail指標,並且實現了setHead(),setTail()等佇列方法。既然是佇列,那勢必有結點,AQS就是通過對一個個結點的管理實現同步作用的,噹噹前執行緒獲取鎖失敗的時候,AQS將當前執行緒的狀態包裝為一個結點,放入內部的queue中並阻塞當前執行緒。當可以再次獲取鎖的時候,AQS將首結點對應的執行緒喚醒並再次嘗試獲取鎖。
下面用一張表說明結點內的屬性。
屬 性 | 定 義 |
Node SHARED = new Node() | 表示Node處於共享模式 |
Node EXCLUSIVE = null | 表示Node處於獨佔模式 |
int CANCELLED = 1 | 因為超時或者中斷,Node被設定為取消狀態,被取消的Node不應該去競爭鎖,只能保持取消狀態不變,不能轉換為其他狀態,處於這種狀態的Node會被踢出佇列,被GC回收 |
int SIGNAL = -1 | 表示這個Node的繼任Node被阻塞了,到時需要通知它 |
int CONDITION = -2 | 表示這個Node在條件佇列中,因為等待某個條件而被阻塞 |
int PROPAGATE = -3 | 使用在共享模式頭Node有可能處於這種狀態, 表示鎖的下一次獲取可以無條件傳播 |
int waitStatus | 0,新Node會處於這種狀態 |
Node prev | 佇列中某個Node的前驅Node |
Node next | 佇列中某個Node的後繼Node |
Thread thread | 這個Node持有的執行緒,表示等待鎖的執行緒 |
Node nextWaiter | 表示下一個等待condition的Node |
再用一張表說明AQS的屬性和方法
屬性/方法 | 含 義 |
Thread exclusiveOwnerThread | 這個是AQS父類AbstractOwnableSynchronizer的屬性,表示獨佔模式同步器的當前擁有者 |
Node | 上面已經介紹過了,FIFO佇列的基本單位 |
Node head | FIFO佇列中的頭Node |
Node tail | FIFO佇列中的尾Node |
int state | 同步狀態,0表示未鎖 |
int getState() | 獲取同步狀態 |
setState(int newState) | 設定同步狀態 |
boolean compareAndSetState(int expect, int update) | 利用CAS進行State的設定 |
long spinForTimeoutThreshold = 1000L | 執行緒自旋等待的時間 |
Node enq(final Node node) | 插入一個Node到FIFO佇列中 |
Node addWaiter(Node mode) | 為當前執行緒和指定模式建立並擴充一個等待佇列 |
void setHead(Node node) | 設定佇列的頭Node |
void unparkSuccessor(Node node) | 如果存在的話,喚起Node持有的執行緒 |
void doReleaseShared() | 共享模式下做釋放鎖的動作 |
void cancelAcquire(Node node) | 取消正在進行的Node獲取鎖的嘗試 |
boolean shouldParkAfterFailedAcquire(Node pred, Node node) | 在嘗試獲取鎖失敗後是否應該禁用當前執行緒並等待 |
void selfInterrupt() | 中斷當前執行緒本身 |
boolean parkAndCheckInterrupt() | 禁用當前執行緒進入等待狀態並中斷執行緒本身 |
boolean acquireQueued(final Node node, int arg) | 佇列中的執行緒獲取鎖 |
tryAcquire(int arg) | 嘗試獲得鎖(由AQS的子類實現它) |
tryRelease(int arg) | 嘗試釋放鎖(由AQS的子類實現它) |
isHeldExclusively() | 是否獨自持有鎖 |
acquire(int arg) | 獲取鎖 |
release(int arg) | 釋放鎖 |
compareAndSetHead(Node update) | 利用CAS設定頭Node |
compareAndSetTail(Node expect, Node update) | 利用CAS設定尾Node |
compareAndSetWaitStatus(Node node, int expect, int update) | 利用CAS設定某個Node中的等待狀態 |
上表中有很多的CAS操作,這裡解釋一下CAS,全稱為CompareAndSwap,即比較並交換。顧名思義,當需要更新值的時候,先用當前值和舊值進行比較,如果相同則更新,不同則返回一個錯誤碼。它是concurrent存在的基礎,CAS的實現並不是通過程式碼,而是通過CPU對其的支援,不同的CPU對CAS的實現方式不同,但是最後的效果是相同的,當需要觸發CAS的時候,通過呼叫Java的Unsafe類對硬體級別進行操作,從而實現比較並交換這一操作,Unsafe類是Java自己留的後門,從而實現不依賴native方法就可以實現硬體級別的原子操作。
Reentrantlock中的鎖分為兩種,公平鎖和非公平鎖,通過new Reentrantlock(true)可以獲得公平鎖,預設是非公平鎖。對於公平鎖,每次有執行緒到達時,都會先通過hasQueuedPredecessors()方法檢視等待佇列是否為空,不為空則加入等待佇列。而非公平鎖則不然,可以通過檢視下面我的原始碼分析看出,非公平鎖回多次嘗試對鎖的所有權進行搶佔,全部失敗之後,才會按公平鎖的方式完全加入等待佇列。至於為什麼要用非公平鎖,廢話當然是因為它快啊,這一點在原始碼的註釋中也寫了。Reentrantlock是一種可重入鎖,也就是一個執行緒可以獲取鎖多次。作業系統在切換執行緒的時候,是需要儲存並切換上下文的,這也就是為什麼在單核的cpu上進行多執行緒操作時反而更加費時的原因。如果是公平鎖,當前執行緒讓出鎖之後,必須交給等待佇列的隊首元素對應的執行緒佔有鎖。而非公平鎖則可以直接進行重入,這樣就不會存在上下文切換的時間,從而提高了效率。下文介紹的lock和unlock都是針對非公平鎖的。
1.lock
首先,我們假設執行緒1先佔有了鎖,這個過程是非常簡單的
(1)首先,呼叫
private final Sync sync;
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
例項化出一個非公平鎖,其中Sync是一個繼承了AQS的類。
(2)呼叫sync的lock方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
(3)呼叫AQS的compareAndSetState方法,將state屬性設為1,表示執行緒1獨佔鎖
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
(4)呼叫AQS父類AOS的setExclusiveOwnerThread方法將佔有鎖的執行緒設為當前執行緒
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
此時執行緒1成功佔有鎖,此時AQS中的state=1,exclusiveOwnerThread = thread1
執行緒1佔有鎖之後,執行緒2到來,由於是非公平鎖,執行緒2從到來直到加入佇列的過程中,它會多次嘗試搶佔鎖,如果均不成功,最後會阻塞並被加入到AQS的等待佇列中,下圖是呼叫鏈。
(1)呼叫lock()方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
在呼叫lock時,首先會嘗試使用CAS判斷state是不是0,如果是0就設為1。這裡是非公平的一個體現,新到執行緒可以去搶佔隊首節點的鎖的所有權。但是現在這裡是失敗的,因為執行緒1已經把state設定成了1。所以跳到第二步
(2)呼叫AQS的acquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
可以看到這個if語句,之前說過,非公平鎖從開始到放入等待佇列結束期間,會多次嘗試獲得鎖,這裡的tryAcquire()就是嘗試獲得鎖,非公平鎖中它最後會走nonfairTryAcquire方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
首先獲取到當前執行緒,獲取當前佔有鎖的執行緒數,如果state等於0說明鎖沒有被佔用。此時嘗試用CAS將state設定為1,如果失敗,再判斷佔有鎖的是不是當前執行緒。如果是,走else if裡的程式碼,則開始重入,這裡是對鎖的重入次數做了一個限制,我們知道,整型的數如果超過範圍會溢位為負數,所以當鎖被當前執行緒重入時,每次都會對state加一,如果state變成負數,那麼說明重入次數已經超過2147483647次,此時丟擲異常。很簡單的就實現了一個偏向鎖。
如果acquire失敗,走第二個條件嘗試將當前執行緒加入等待佇列中。首先走addWaiter方法將當前執行緒的資訊包裝成node並加入隊尾
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
首先先建立一個節點,因為是獨佔鎖,所以實際上傳進來的mode是個NULL,此時new Node會建立一個獨佔模式的節點。此時先獲取一下尾節點,如果尾節點都是null那麼說明這個佇列就是空,此時走enq方法新建一個佇列。如果存在,用CAS把它設為佇列的頭節點,用CAS的原因是cpu時間片是切換的,可能有其他執行緒把佇列已經建立好了,此時CAS失敗,由於tail是volatile的,所以對當前執行緒可見,此時看見tail不為null再次迴圈後將會將當前節點放到隊尾。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在enq中這裡用了一個for的死迴圈保證佇列是一定新建成功的,它會先再次判斷一下佇列是不是不存在,然後新建頭節點,然後把當前節點連在頭節點後面。並將當前節點設為尾節點。這裡其實是這樣的,
到這裡算是把一個就緒的節點放入AQS的等待佇列中了,開始走acquireQueued(addWaiter(Node.EXCLUSIVE), arg))最外層的acquireQueued方法。這個方法主要是對新加入等待佇列的執行緒進行阻塞。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這裡第一個if語句是對當前節點能否獲得鎖進行判斷,因為此時可能鎖已經被釋放。如果當前節點是佇列中的第一個節點而且能夠獲得鎖,那麼就把當前節點設定為頭節點。如果還是不能獲取鎖,那麼需要對這條執行緒進行阻塞。但因為並不是所有執行緒都需要阻塞,比如取消狀態。這裡開始走shouldParkAfterFailedAcquire(p, node)方法判斷是否需要阻塞。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
因為是多執行緒環境,當前節點的位置是有可能變得,所以這裡需要再判斷一下status,這裡根據status設定三個規則
規則1:如果前繼的節點狀態為SIGNAL,表明當前節點需要unpark,則返回成功,此時acquireQueued方法的第12行(parkAndCheckInterrupt)將導致執行緒阻塞
規則2:如果前繼節點狀態為CANCELLED(ws>0),說明前置節點已經被放棄,則回溯到一個非取消的前繼節點,返回false,acquireQueued方法的無限迴圈將遞迴呼叫該方法,直至規則1返回true,導致執行緒阻塞
規則3:如果前繼節點狀態為非SIGNAL、非CANCELLED,則設定前繼的狀態為SIGNAL,返回false後進入acquireQueued的無限迴圈,與規則2同
這裡如果返回true,說明執行緒需要被阻塞,那麼通過parkAndCheckInterrupt呼叫native方法進行阻塞。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
至此,lock過程結束。