Lock與Condition
Lock與Condition
阿里巴巴2021版JDK原始碼筆記(2月第三版).pdf
連結:https://pan.baidu.com/s/1XhVcfbGTpU83snOZVu8AXg
提取碼:l3gy
1. 互斥鎖
1.1 鎖的可重入性
當一個執行緒呼叫 object.lock()拿到鎖,進入互斥區後,再次呼叫object.lock(), 仍然可以拿到該鎖(否則會死鎖)
1.2 類的繼承關係
lock.java
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
1.3 鎖的公平性與非公平性
Sync是一個抽象類,它有兩個子類FairSync與NonfairSync,分別 對應公平鎖和非公平鎖
- 公平鎖:遵循先到者優先服務,先搶資源的先獲取CPU
- 非公平鎖:執行緒來了直接搶鎖,獲取CPU資源不是按照順序獲取(提高效率,減少執行緒切換)
1.4 鎖實現的基本原理
Sync的父類AbstractQueuedSynchronizer經常被稱作佇列同步器 (AQS),這個類非常關鍵
AbstractOwnableSynchronizer具有阻塞執行緒的作用,為了實現一把具有阻塞和喚醒功能的鎖,需要一下核心要素:
-
- 需要一個state變數,標記該鎖的狀態,state變數至少有兩個值:0,1 對state變數的操作,要確保執行緒安全,也就是會用到CAS
-
- 需要記錄當前是哪個執行緒持有鎖
-
- 需要底層支援對一個執行緒進行阻塞或喚醒操作
-
- 需要有一個佇列維護所有阻塞的執行緒。這個佇列也必須是執行緒安全的無鎖佇列,也需要用到CAS
針對1,2
- state取值不僅可以是0、1,還可以大於1,就是為了支援鎖的可 重入性。例如,同樣一個執行緒,呼叫5次lock,state會變成5;然後呼叫5次unlock,state減為0。
- 當state=0時,沒有執行緒持有鎖,exclusiveOwnerThread=null;
- 當state=1時,有一個執行緒持有鎖,exclusiveOwnerThread=該執行緒;
- 當state > 1時,說明該執行緒重入了該鎖。
針對3
-
在Unsafe類中,提供了阻塞或喚醒執行緒的一對操作原語,也就是park/unpark
-
LockSupport對其做了簡單的封裝
public class LockSupport { public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); } public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); } }
-
在當前執行緒中呼叫park(),該執行緒就會被阻塞;在另外一個線 程中,呼叫unpark(Thread t),傳入一個被阻塞的執行緒,就可以喚醒阻塞在park()地方的執行緒
-
尤其是 unpark(Thread t),它實現了一個執行緒對另外一個執行緒 的“精準喚醒”
針對4
-
在AQS中利用雙向連結串列和CAS實現了一個阻塞佇列。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { static final class Node { volatile Node prev; volatile Node next; volatile Thread thread; } private transient volatile Node head; private transient volatile Node tail; }
1.5 公平與非公平的lock()的實現差異
FairSync 公平鎖
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//等於0,資源空閒,可以拿到鎖
if (!hasQueuedPredecessors() && //判斷是否存在等待佇列或者當前執行緒是否是隊頭
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//被鎖了,但是當前執行緒就是已經獲取鎖了(重入鎖),state+1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
NonfairSync 非公平鎖
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
公平鎖和非公平鎖的區別:
公平鎖就多了這塊程式碼 !hasQueuedPredecessors()
,看原始碼
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
這裡其實就是判斷當前執行緒是否可以被公平的執行(佇列為空,或者當前在隊頭的時候表示到當前執行緒處理了)
1.6 阻塞佇列與喚醒機制
AQS類中,有嘗試拿鎖的方法
public final void acquire(int arg) {
if (!tryAcquire(arg) && //這裡嘗試去拿鎖,沒有拿到鎖才執行下一個條件
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//將當前執行緒入隊進入等待
//addWaiter就是將執行緒加入到佇列中,
//acquireQueued該執行緒被阻塞。在該函式返回 的一刻,就是拿到鎖的那一刻,也就是被喚醒的那一刻,此時會刪除佇列的第一個元素(head指標前移1個節點)
selfInterrupt();
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//這裡會阻塞住,直到拿到鎖
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1.7 unlock()實現分析
unlock不區分公平還是非公平
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//只有鎖的擁有者才可以釋放鎖
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//這裡需要考慮重入鎖
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
release()裡面做了兩件事:tryRelease(..)函式釋放鎖;unparkSuccessor(..)函式喚醒佇列中的後繼者。
1.8 lockInterruptibly()實現分析
當parkAndCheckInterrupt()返回true的時候,說明有其他執行緒傳送中斷訊號,直接丟擲InterruptedException,跳出for迴圈,整個函式返回。
1.9 tryLock()實現分析
tryLock()實現基於呼叫非公平鎖的tryAcquire(..),對state進行CAS操作,如果操作成功就拿到鎖;如果操作不成功則直接返回false,也不阻塞
2. 讀寫鎖
和互斥鎖相比,讀寫鎖(ReentrantReadWriteLock)就是讀執行緒 和讀執行緒之間可以不用互斥了。在正式介紹原理之前,先看一下相關類的繼承體系。
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
2.1 程式碼中使用
當使用 ReadWriteLock 的時候,並不是直接使用,而是獲得其內部的讀鎖和寫鎖,然後分別呼叫lock/unlock。
public static void main(String[] args) {
ReadWriteLock rwlock = new ReentrantReadWriteLock();
Lock rlock = rwlock.readLock();
rlock.lock();
rlock.unlock();
Lock wlock = rwlock.writeLock();
wlock.lock();
wlock.unlock();
}
2.2 讀寫鎖實現的基本原理
從表面來看,ReadLock和WriteLock是兩把鎖,實際上它只是同一 把鎖的兩個檢視而已
-
兩個檢視: 可以理解為是一把鎖,執行緒分成兩類:讀執行緒和寫執行緒。讀執行緒和讀執行緒之間不互斥(可以同時拿到這把鎖),讀執行緒和寫執行緒互斥,寫執行緒和寫執行緒也互斥。
-
readerLock和writerLock實際共 用同一個sync物件
public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
-
同互斥鎖一樣,讀寫鎖也是用state變數來表示鎖狀態的。只是state變數在這裡的含義和互斥鎖完全不同
-
是把 state 變數拆成兩半,低16位,用來記錄寫鎖,高16位,用來“讀”鎖。但同一 時間既然只能有一個執行緒寫,為什麼還需要16位呢?這是因為一個寫 執行緒可能多次重入
abstract static class Sync extends AbstractQueuedSynchronizer { static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; static int sharedCount(int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } }
-
為什麼要把一個int型別變數拆成兩半, 而不是用兩個int型變數分別表示讀鎖和寫鎖的狀態呢?這是因為無法 用一次CAS 同時操作兩個int變數,所以用了一個int型的高16位和低16位分別表示讀鎖和寫鎖的狀態。
-
當state=0時,說明既沒有執行緒持有讀鎖,也沒有執行緒持有寫鎖; 當state!=0時,要麼有執行緒持有讀鎖,要麼有執行緒持有寫鎖,兩者不 能同時成立,因為讀和寫互斥。
2.3 AQS的兩對模板方法
ReentrantReadWriteLock的兩個內部類ReadLock和WriteLock中,是如何使用state變數的
acquire/release、acquireShared/releaseShared 是AQS裡面的 兩對模板方法。互斥鎖和讀寫鎖的寫鎖都是基於acquire/release模板 方法來實現的。讀寫鎖的讀鎖是基於acquireShared/releaseShared這對模板方法來實現的
將讀/寫、公平/非公平進行排列組合,就有4種組合
- 讀鎖的公平實現:Sync.tryAccquireShared()+FairSync中的兩個覆寫的子函式。
- 讀鎖的非公平實現:Sync.tryAccquireShared()+NonfairSync中的兩個覆寫的子函式
- 寫鎖的公平實現:Sync.tryAccquire()+FairSync中的兩個覆寫的子函式
- 寫鎖的非公平實現:Sync.tryAccquire()+NonfairSync中的兩個覆寫的子函式。
對於公平,比較容易理解,不論是讀鎖,還是寫鎖,只要佇列中 有其他執行緒在排隊(排隊等讀鎖,或者排隊等寫鎖),就不能直接去搶鎖,要排在佇列尾部。
對於非公平,讀鎖和寫鎖的實現策略略有差異。先說寫鎖,寫線 程能搶鎖,前提是state=0,只有在沒有其他執行緒持有讀鎖或寫鎖的情 況下,它才有機會去搶鎖。或者state!=0,但那個持有寫鎖的執行緒是 它自己,再次重入。寫執行緒是非公平的,就是不管三七二十一就去搶,即一直返回false。
因為讀執行緒和讀執行緒是不互斥的,假設當前執行緒被讀執行緒持有,然後其他讀執行緒還非公平地一直去搶,可能導致寫執行緒永遠拿不到鎖,所 以對於讀執行緒的非公平,要做一些“約束”
當發現佇列的第1個元素 是寫執行緒的時候,讀執行緒也要阻塞一下,不能“肆無忌憚”地直接去搶
2.4 WriteLock公平與非公平實現
寫鎖是排他鎖,實現策略類似於互斥鎖,重寫了tryAcquire/tryRelease方法。
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
- if (c!=0) and w==0,說明當前一定是讀執行緒拿著鎖,寫鎖一定拿不到,返回false。
- if (c!=0) and w!=0,說明當前一定是寫執行緒拿著鎖, 執行current!=getExclusive-OwnerThread()的判斷,發現ownerThread不是自己,返回false。
- c ! =0 , w ! =0 , 且 current=getExclusiveOwnerThread(),才會走到if (w+exclusive-Count(acquires)> MAX_COUNT)。判斷重入次數,重入次數超過最大值,丟擲異常。
- if(c=0),說明當前既沒有讀執行緒,也沒有寫執行緒持有該鎖。可以通過CAS操作開搶了。
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);//寫鎖是排他的
return free;
}
2.5 ReadLock公平與非公平實現
讀鎖是共享鎖,重寫了 tryAcquireShared/tryReleaseShared 方法,其實現策略和排他鎖有很大的差異。
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 && //寫鎖被某執行緒持有,且不是自己,讀鎖肯定拿不到,直接返回
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&//公平和非公平的差異
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {//高位讀鎖+1
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
- 低16位不等於0,說明有寫執行緒持有鎖,並且只有當ownerThread!=自己時,才返回-1。這裡面有一個潛臺詞:如果current=ownerThread,則這段程式碼不會返回。這是因為一個寫執行緒可以再次去拿讀 鎖!也就是說,一個執行緒在持有了WriteLock後,再去呼叫ReadLock.lock也是可以的。
- 上面的compareAndSetState(c,c+SHARED_UNIT),其實是 把state的高16位加1(讀鎖的狀態),但因為是在高16位,必須把1左移16位再加1。
- firstReader,cachedHoldConunter 之類的變數,只是一些 統計變數,在 ReentrantRead-WriteLock對外的一些查詢函式中會用 到,例如,查詢持有讀鎖的執行緒列表,但對整個讀寫互斥機制沒有影響,此處不再展開解釋
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
因為讀鎖是共享鎖,多個執行緒會同時持有讀鎖,所以對讀鎖的釋 放不能直接減1,而是需要通過一個for迴圈+CAS操作不斷重試。這是tryReleaseShared和tryRelease的根本差異所在。
3. Condition
Condition本身也是一個介面,其功能和wait/notify類似
public interface Condition {
void await() throws InterruptedException;
void signal();
void signalAll();
}
3.1 Condition與Lock的關係
在講多執行緒基礎的時候,強調wait()/notify()必須和synchronized一起使用,Condition也是如此,必須和Lock一起使用。因此,在Lock的介面中,有一個與Condition相關的介面:
public interface Lock {
Condition newCondition();
}
3.2 Condition的使用場景
為一個用陣列實現的阻塞 佇列,執行put(..)操作的時候,佇列滿了,生成者執行緒被阻塞;執行take()操作的時候,佇列為空,消費者執行緒被阻塞。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//核心就是一把鎖,兩個條件
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
}
3.3 Condition實現原理
使用很簡潔,避免了 wait/notify 的生成者通知生 成者、消費者通知消費者的問題。
因為Condition必須和Lock一起使用,所以Condition的實現也是Lock的一部分
3.4 await()實現分析
public final void await() throws InterruptedException {
if (Thread.interrupted())//正要執行await操作,收到了中斷訊號,丟擲異常
throw new InterruptedException();
Node node = addConditionWaiter();//加入condition等待佇列
long savedState = fullyRelease(node);//阻塞在condition之前必須釋放鎖,否則會釋放鎖
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);//自己阻塞自己
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//重新拿鎖
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
- 執行緒呼叫 await()的時候,肯定已經先拿到了鎖。所以, 在 addConditionWaiter()內部,對這個雙向連結串列的操作不需要執行CAS操作,執行緒天生是安全的
- 線上程執行wait操作之前,必須先釋放鎖。也就是fullyRelease(node),否則會發生死鎖。這個和wait/notify與synchronized的配合機制一樣。
- 執行緒從wait中被喚醒後,必須用acquireQueued(node,savedState)函式重新拿鎖。
- checkInterruptWhileWaiting(node)程式碼在park(this) 程式碼之後,是為了檢測在park期間是否收到過中斷訊號。當執行緒從park中醒來時,有兩種可能:一種是其他執行緒呼叫了unpark,另一種是收 到中斷訊號。這裡的await()函式是可以響應中斷的,所以當發現自 己是被中斷喚醒的,而不是被unpark喚醒的時,會直接退出while迴圈,await()函式也會返回。
- isOnSyncQueue(node)用於判斷該Node是否在AQS的同步隊 列裡面。初始的時候,Node只在Condition的佇列裡,而不在AQS的佇列裡。但執行notity操作的時候,會放進AQS的同步佇列。
3.5 awaitUninterruptibly()實現分析
與await()不同,awaitUninterruptibly()不會響應中斷,其 函式的定義中不會有中斷異常丟擲,下面分析其實現和await()的區別
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
long savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())//從park中醒來,收到中斷,不退出,繼續執行迴圈
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
可以看出,整體程式碼和 await()類似,區別在於收到異常後,不會丟擲異常,而是繼續執行while迴圈。
3.6 signal()實現分析
public final void signal() {
if (!isHeldExclusively())//只有持有鎖的佇列才可以呼叫signal
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {//喚醒佇列的第一個執行緒
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);//先把Node放入互斥鎖的同步佇列中,再呼叫下面的unpark方法
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
- 同 await()一樣,在呼叫 notify()的時候,必須先拿到鎖 (否則就會丟擲上面的異常),是因為前面執行await()的時候,把鎖釋放了。
- 從佇列中取出firstWait,喚醒它。在通過呼叫unpark喚醒 它之前,先用enq(node)函式把這個Node放入AQS的鎖對應的阻塞隊 列中
4. StampedLock
JDK8引入
4.1 為什麼要引入?
- ReentrantLock: 讀與讀互斥,寫與寫互斥,讀與寫互斥
- ReentrantReadWriteLock:讀與讀不互斥,寫與寫互斥,讀與寫互斥
- StampedLock:讀與讀不互斥,寫與寫不互斥,讀與寫互斥
StampedLock引入了“樂觀讀”策略,讀的時候不加讀鎖,讀出來發現數據被修改 了,再升級為“悲觀讀”,相當於降低了“讀”的地位,把搶鎖的天平往“寫”的一方傾斜了一下,避免寫執行緒被餓死。
4.2 使用場景
public class Point {
private double x, y;
private final StampedLock s1 = new StampedLock();
void move(double deltaX, double deltaY) {
//多個執行緒呼叫,修改x,y的值
long stamp = s1.writeLock();
try {
x = deltaX;
y = deltaY;
} finally {
s1.unlock(stamp);
}
}
double distanceFromOrigin() {
long stamp = s1.tryOptimisticRead();//使用樂觀鎖
double currentX = x, currentY = y;
if (!s1.validate(stamp)) {
/**
* 上面這三行關鍵程式碼對順序非常敏感,不能有重排序。 因
* 為 state 變數已經是volatile,所以可以禁止重排序,但stamp並 不是volatile的。
* 為此,在validate(stamp)函式裡面插入記憶體屏 障。
*/
stamp = s1.readLock();//升級悲觀鎖
try {
currentX = x;
currentY = y;
} finally {
s1.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}
4.3 “樂觀讀”的實現原理
StampedLock是一個讀寫鎖,因此也會像讀寫鎖那樣,把一 個state變數分成兩半,分別表示讀鎖和寫鎖的狀態。同時,它還需要 一個數據的version。但正如前面所說,一次CAS沒有辦法操作兩個變 量,所以這個state變數本身同時也表示了資料的version。下面先分析state變數。
- 用最低的8位表示讀和寫的狀態,其中第8位表 示寫鎖的狀態,最低的7位表示讀鎖的狀態。因為寫鎖只有一個bit位,所以寫鎖是不可重入的。
4.4 悲觀讀/寫:“阻塞”與“自旋”策略實現差異
同ReadWriteLock一樣,StampedLock也要進行悲觀的讀鎖和寫鎖 操作。不過,它不是基於AQS實現的,而是內部重新實現了一個阻塞佇列
public class StampedLock implements java.io.Serializable {
static final class WNode {
volatile WNode prev;
volatile WNode next;
volatile WNode cowait; // list of linked readers
volatile Thread thread; // non-null while possibly parked
volatile int status; // 0, WAITING, or CANCELLED
final int mode; // RMODE or WMODE
WNode(int m, WNode p) { mode = m; prev = p; }
}
}
這個阻塞佇列和 AQS 裡面的很像。剛開始的時候,whead=wtail=NULL,然後初始化,建一個空節點,whead和wtail都指向這個空節 點,之後往裡面加入一個個讀執行緒或寫執行緒節點。但基於這個阻塞隊 列實現的鎖的排程策略和AQS很不一樣,也就是“自旋”。在AQS裡 面,當一個執行緒CAS state失敗之後,會立即加入阻塞佇列,並且進入 阻塞狀態。但在StampedLock中,CAS state失敗之後,會不斷自旋, 自旋足夠多的次數之後,如果還拿不到鎖,才進入阻塞狀態。為此, 根據CPU的核數,定義了自旋次數的常量值。如果是單核的CPU,肯定不能自旋,在多核情況下,才採用自旋策略。