1. 程式人生 > 其它 >Lock與Condition

Lock與Condition

Lock與Condition

阿里巴巴2021版JDK原始碼筆記(2月第三版).pdf

連結:https://pan.baidu.com/s/1XhVcfbGTpU83snOZVu8AXg
提取碼:l3gy

1. 互斥鎖

1.1 鎖的可重入性

當一個執行緒呼叫 object.lock()拿到鎖,進入互斥區後,再次呼叫object.lock(), 仍然可以拿到該鎖(否則會死鎖)

1.2 類的繼承關係

lock.java

public interface Lock {
	void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

1.3 鎖的公平性與非公平性

Sync是一個抽象類,它有兩個子類FairSync與NonfairSync,分別 對應公平鎖和非公平鎖

  • 公平鎖:遵循先到者優先服務,先搶資源的先獲取CPU
  • 非公平鎖:執行緒來了直接搶鎖,獲取CPU資源不是按照順序獲取(提高效率,減少執行緒切換)

1.4 鎖實現的基本原理

Sync的父類AbstractQueuedSynchronizer經常被稱作佇列同步器 (AQS),這個類非常關鍵

AbstractOwnableSynchronizer具有阻塞執行緒的作用,為了實現一把具有阻塞和喚醒功能的鎖,需要一下核心要素:

    1. 需要一個state變數,標記該鎖的狀態,state變數至少有兩個值:0,1 對state變數的操作,要確保執行緒安全,也就是會用到CAS
    1. 需要記錄當前是哪個執行緒持有鎖
    1. 需要底層支援對一個執行緒進行阻塞或喚醒操作
    1. 需要有一個佇列維護所有阻塞的執行緒。這個佇列也必須是執行緒安全的無鎖佇列,也需要用到CAS

針對1,2

  • state取值不僅可以是0、1,還可以大於1,就是為了支援鎖的可 重入性。例如,同樣一個執行緒,呼叫5次lock,state會變成5;然後呼叫5次unlock,state減為0。
  • 當state=0時,沒有執行緒持有鎖,exclusiveOwnerThread=null;
  • 當state=1時,有一個執行緒持有鎖,exclusiveOwnerThread=該執行緒;
  • 當state > 1時,說明該執行緒重入了該鎖。

針對3

  • 在Unsafe類中,提供了阻塞或喚醒執行緒的一對操作原語,也就是park/unpark

  • LockSupport對其做了簡單的封裝

    public class LockSupport {
    	public static void park(Object blocker) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, 0L);
            setBlocker(t, null);
        }
        public static void unpark(Thread thread) {
            if (thread != null)
                UNSAFE.unpark(thread);
        }
    }
    
  • 在當前執行緒中呼叫park(),該執行緒就會被阻塞;在另外一個線 程中,呼叫unpark(Thread t),傳入一個被阻塞的執行緒,就可以喚醒阻塞在park()地方的執行緒

  • 尤其是 unpark(Thread t),它實現了一個執行緒對另外一個執行緒 的“精準喚醒”

針對4

  • 在AQS中利用雙向連結串列和CAS實現了一個阻塞佇列。

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        static final class Node {
            volatile Node prev;
            volatile Node next;
            volatile Thread thread;
        }
        private transient volatile Node head;
        private transient volatile Node tail;
    }
    

1.5 公平與非公平的lock()的實現差異

FairSync 公平鎖

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {//等於0,資源空閒,可以拿到鎖
        if (!hasQueuedPredecessors() && //判斷是否存在等待佇列或者當前執行緒是否是隊頭
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {//被鎖了,但是當前執行緒就是已經獲取鎖了(重入鎖),state+1 
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

NonfairSync 非公平鎖

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

公平鎖和非公平鎖的區別:

公平鎖就多了這塊程式碼 !hasQueuedPredecessors(),看原始碼

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

這裡其實就是判斷當前執行緒是否可以被公平的執行(佇列為空,或者當前在隊頭的時候表示到當前執行緒處理了)

1.6 阻塞佇列與喚醒機制

AQS類中,有嘗試拿鎖的方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) && //這裡嘗試去拿鎖,沒有拿到鎖才執行下一個條件
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
        //將當前執行緒入隊進入等待
        //addWaiter就是將執行緒加入到佇列中,
        //acquireQueued該執行緒被阻塞。在該函式返回 的一刻,就是拿到鎖的那一刻,也就是被喚醒的那一刻,此時會刪除佇列的第一個元素(head指標前移1個節點)
        selfInterrupt();
}

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {//這裡會阻塞住,直到拿到鎖
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

1.7 unlock()實現分析

unlock不區分公平還是非公平

public void unlock() {
    sync.release(1);
}

 public final boolean release(int arg) {
     if (tryRelease(arg)) {
         Node h = head;
         if (h != null && h.waitStatus != 0)
             unparkSuccessor(h);
         return true;
     }
     return false;
 }

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    //只有鎖的擁有者才可以釋放鎖
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //這裡需要考慮重入鎖
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

release()裡面做了兩件事:tryRelease(..)函式釋放鎖;unparkSuccessor(..)函式喚醒佇列中的後繼者。

1.8 lockInterruptibly()實現分析

當parkAndCheckInterrupt()返回true的時候,說明有其他執行緒傳送中斷訊號,直接丟擲InterruptedException,跳出for迴圈,整個函式返回。

1.9 tryLock()實現分析

tryLock()實現基於呼叫非公平鎖的tryAcquire(..),對state進行CAS操作,如果操作成功就拿到鎖;如果操作不成功則直接返回false,也不阻塞

2. 讀寫鎖

和互斥鎖相比,讀寫鎖(ReentrantReadWriteLock)就是讀執行緒 和讀執行緒之間可以不用互斥了。在正式介紹原理之前,先看一下相關類的繼承體系。

public interface ReadWriteLock {
	Lock readLock();
    Lock writeLock();
}

2.1 程式碼中使用

當使用 ReadWriteLock 的時候,並不是直接使用,而是獲得其內部的讀鎖和寫鎖,然後分別呼叫lock/unlock。

public static void main(String[] args) {
    ReadWriteLock rwlock = new ReentrantReadWriteLock();
    Lock rlock = rwlock.readLock();
    rlock.lock();
    rlock.unlock();
    Lock wlock = rwlock.writeLock();
    wlock.lock();
    wlock.unlock();
}

2.2 讀寫鎖實現的基本原理

從表面來看,ReadLock和WriteLock是兩把鎖,實際上它只是同一 把鎖的兩個檢視而已

  • 兩個檢視: 可以理解為是一把鎖,執行緒分成兩類:讀執行緒和寫執行緒。讀執行緒和讀執行緒之間不互斥(可以同時拿到這把鎖),讀執行緒和寫執行緒互斥,寫執行緒和寫執行緒也互斥。

  • readerLock和writerLock實際共 用同一個sync物件

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    
  • 同互斥鎖一樣,讀寫鎖也是用state變數來表示鎖狀態的。只是state變數在這裡的含義和互斥鎖完全不同

  • 是把 state 變數拆成兩半,低16位,用來記錄寫鎖,高16位,用來“讀”鎖。但同一 時間既然只能有一個執行緒寫,為什麼還需要16位呢?這是因為一個寫 執行緒可能多次重入

     abstract static class Sync extends AbstractQueuedSynchronizer {
         static final int SHARED_SHIFT   = 16;
         static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
         static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
         static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
         
         static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
         static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
     }
    
  • 為什麼要把一個int型別變數拆成兩半, 而不是用兩個int型變數分別表示讀鎖和寫鎖的狀態呢?這是因為無法 用一次CAS 同時操作兩個int變數,所以用了一個int型的高16位和低16位分別表示讀鎖和寫鎖的狀態。

  • 當state=0時,說明既沒有執行緒持有讀鎖,也沒有執行緒持有寫鎖; 當state!=0時,要麼有執行緒持有讀鎖,要麼有執行緒持有寫鎖,兩者不 能同時成立,因為讀和寫互斥。

2.3 AQS的兩對模板方法

ReentrantReadWriteLock的兩個內部類ReadLock和WriteLock中,是如何使用state變數的

acquire/release、acquireShared/releaseShared 是AQS裡面的 兩對模板方法。互斥鎖和讀寫鎖的寫鎖都是基於acquire/release模板 方法來實現的。讀寫鎖的讀鎖是基於acquireShared/releaseShared這對模板方法來實現的

將讀/寫、公平/非公平進行排列組合,就有4種組合

  • 讀鎖的公平實現:Sync.tryAccquireShared()+FairSync中的兩個覆寫的子函式。
  • 讀鎖的非公平實現:Sync.tryAccquireShared()+NonfairSync中的兩個覆寫的子函式
  • 寫鎖的公平實現:Sync.tryAccquire()+FairSync中的兩個覆寫的子函式
  • 寫鎖的非公平實現:Sync.tryAccquire()+NonfairSync中的兩個覆寫的子函式。

對於公平,比較容易理解,不論是讀鎖,還是寫鎖,只要佇列中 有其他執行緒在排隊(排隊等讀鎖,或者排隊等寫鎖),就不能直接去搶鎖,要排在佇列尾部。

對於非公平,讀鎖和寫鎖的實現策略略有差異。先說寫鎖,寫線 程能搶鎖,前提是state=0,只有在沒有其他執行緒持有讀鎖或寫鎖的情 況下,它才有機會去搶鎖。或者state!=0,但那個持有寫鎖的執行緒是 它自己,再次重入。寫執行緒是非公平的,就是不管三七二十一就去搶,即一直返回false。

因為讀執行緒和讀執行緒是不互斥的,假設當前執行緒被讀執行緒持有,然後其他讀執行緒還非公平地一直去搶,可能導致寫執行緒永遠拿不到鎖,所 以對於讀執行緒的非公平,要做一些“約束”

當發現佇列的第1個元素 是寫執行緒的時候,讀執行緒也要阻塞一下,不能“肆無忌憚”地直接去搶

2.4 WriteLock公平與非公平實現

寫鎖是排他鎖,實現策略類似於互斥鎖,重寫了tryAcquire/tryRelease方法。

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
  • if (c!=0) and w==0,說明當前一定是讀執行緒拿著鎖,寫鎖一定拿不到,返回false。
  • if (c!=0) and w!=0,說明當前一定是寫執行緒拿著鎖, 執行current!=getExclusive-OwnerThread()的判斷,發現ownerThread不是自己,返回false。
  • c ! =0 , w ! =0 , 且 current=getExclusiveOwnerThread(),才會走到if (w+exclusive-Count(acquires)> MAX_COUNT)。判斷重入次數,重入次數超過最大值,丟擲異常。
  • if(c=0),說明當前既沒有讀執行緒,也沒有寫執行緒持有該鎖。可以通過CAS操作開搶了。
protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null); 
    setState(nextc);//寫鎖是排他的
    return free;
}

2.5 ReadLock公平與非公平實現

讀鎖是共享鎖,重寫了 tryAcquireShared/tryReleaseShared 方法,其實現策略和排他鎖有很大的差異。

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 && //寫鎖被某執行緒持有,且不是自己,讀鎖肯定拿不到,直接返回
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() &&//公平和非公平的差異
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {//高位讀鎖+1
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
  • 低16位不等於0,說明有寫執行緒持有鎖,並且只有當ownerThread!=自己時,才返回-1。這裡面有一個潛臺詞:如果current=ownerThread,則這段程式碼不會返回。這是因為一個寫執行緒可以再次去拿讀 鎖!也就是說,一個執行緒在持有了WriteLock後,再去呼叫ReadLock.lock也是可以的。
  • 上面的compareAndSetState(c,c+SHARED_UNIT),其實是 把state的高16位加1(讀鎖的狀態),但因為是在高16位,必須把1左移16位再加1。
  • firstReader,cachedHoldConunter 之類的變數,只是一些 統計變數,在 ReentrantRead-WriteLock對外的一些查詢函式中會用 到,例如,查詢持有讀鎖的執行緒列表,但對整個讀寫互斥機制沒有影響,此處不再展開解釋
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

因為讀鎖是共享鎖,多個執行緒會同時持有讀鎖,所以對讀鎖的釋 放不能直接減1,而是需要通過一個for迴圈+CAS操作不斷重試。這是tryReleaseShared和tryRelease的根本差異所在。

3. Condition

Condition本身也是一個介面,其功能和wait/notify類似

public interface Condition {
	void await() throws InterruptedException;
    void signal();
    void signalAll();
}

3.1 Condition與Lock的關係

在講多執行緒基礎的時候,強調wait()/notify()必須和synchronized一起使用,Condition也是如此,必須和Lock一起使用。因此,在Lock的介面中,有一個與Condition相關的介面:

public interface Lock {
    Condition newCondition();
}

3.2 Condition的使用場景

為一個用陣列實現的阻塞 佇列,執行put(..)操作的時候,佇列滿了,生成者執行緒被阻塞;執行take()操作的時候,佇列為空,消費者執行緒被阻塞。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    //核心就是一把鎖,兩個條件
	final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
}

3.3 Condition實現原理

使用很簡潔,避免了 wait/notify 的生成者通知生 成者、消費者通知消費者的問題。

因為Condition必須和Lock一起使用,所以Condition的實現也是Lock的一部分

3.4 await()實現分析

public final void await() throws InterruptedException {
    if (Thread.interrupted())//正要執行await操作,收到了中斷訊號,丟擲異常
        throw new InterruptedException();
    Node node = addConditionWaiter();//加入condition等待佇列
    long savedState = fullyRelease(node);//阻塞在condition之前必須釋放鎖,否則會釋放鎖
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);//自己阻塞自己
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//重新拿鎖
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
  • 執行緒呼叫 await()的時候,肯定已經先拿到了鎖。所以, 在 addConditionWaiter()內部,對這個雙向連結串列的操作不需要執行CAS操作,執行緒天生是安全的
  • 線上程執行wait操作之前,必須先釋放鎖。也就是fullyRelease(node),否則會發生死鎖。這個和wait/notify與synchronized的配合機制一樣。
  • 執行緒從wait中被喚醒後,必須用acquireQueued(node,savedState)函式重新拿鎖。
  • checkInterruptWhileWaiting(node)程式碼在park(this) 程式碼之後,是為了檢測在park期間是否收到過中斷訊號。當執行緒從park中醒來時,有兩種可能:一種是其他執行緒呼叫了unpark,另一種是收 到中斷訊號。這裡的await()函式是可以響應中斷的,所以當發現自 己是被中斷喚醒的,而不是被unpark喚醒的時,會直接退出while迴圈,await()函式也會返回。
  • isOnSyncQueue(node)用於判斷該Node是否在AQS的同步隊 列裡面。初始的時候,Node只在Condition的佇列裡,而不在AQS的佇列裡。但執行notity操作的時候,會放進AQS的同步佇列。

3.5 awaitUninterruptibly()實現分析

與await()不同,awaitUninterruptibly()不會響應中斷,其 函式的定義中不會有中斷異常丟擲,下面分析其實現和await()的區別

 public final void awaitUninterruptibly() {
     Node node = addConditionWaiter();
     long savedState = fullyRelease(node);
     boolean interrupted = false;
     while (!isOnSyncQueue(node)) {
         LockSupport.park(this);
         if (Thread.interrupted())//從park中醒來,收到中斷,不退出,繼續執行迴圈
             interrupted = true;
     }
     if (acquireQueued(node, savedState) || interrupted)
         selfInterrupt();
 }

可以看出,整體程式碼和 await()類似,區別在於收到異常後,不會丟擲異常,而是繼續執行while迴圈。

3.6 signal()實現分析

public final void signal() {
    if (!isHeldExclusively())//只有持有鎖的佇列才可以呼叫signal
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
private void doSignal(Node first) {//喚醒佇列的第一個執行緒
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    Node p = enq(node);//先把Node放入互斥鎖的同步佇列中,再呼叫下面的unpark方法
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
  • 同 await()一樣,在呼叫 notify()的時候,必須先拿到鎖 (否則就會丟擲上面的異常),是因為前面執行await()的時候,把鎖釋放了。
  • 從佇列中取出firstWait,喚醒它。在通過呼叫unpark喚醒 它之前,先用enq(node)函式把這個Node放入AQS的鎖對應的阻塞隊 列中

4. StampedLock

JDK8引入

4.1 為什麼要引入?

  • ReentrantLock: 讀與讀互斥,寫與寫互斥,讀與寫互斥
  • ReentrantReadWriteLock:讀與讀不互斥,寫與寫互斥,讀與寫互斥
  • StampedLock:讀與讀不互斥,寫與寫不互斥,讀與寫互斥

StampedLock引入了“樂觀讀”策略,讀的時候不加讀鎖,讀出來發現數據被修改 了,再升級為“悲觀讀”,相當於降低了“讀”的地位,把搶鎖的天平往“寫”的一方傾斜了一下,避免寫執行緒被餓死。

4.2 使用場景

public class Point {
    private double x, y;
    private final StampedLock s1 = new StampedLock();

    void move(double deltaX, double deltaY) {
        //多個執行緒呼叫,修改x,y的值
        long stamp = s1.writeLock();
        try {
            x = deltaX;
            y = deltaY;
        } finally {
            s1.unlock(stamp);
        }
    }

    double distanceFromOrigin() {

        long stamp = s1.tryOptimisticRead();//使用樂觀鎖
        double currentX = x, currentY = y;
        if (!s1.validate(stamp)) {
            /**
             * 上面這三行關鍵程式碼對順序非常敏感,不能有重排序。 因
             * 為 state 變數已經是volatile,所以可以禁止重排序,但stamp並 不是volatile的。
             * 為此,在validate(stamp)函式裡面插入記憶體屏 障。
             */
            stamp = s1.readLock();//升級悲觀鎖
            try {
                currentX = x;
                currentY = y;
            } finally {
                s1.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
}

4.3 “樂觀讀”的實現原理

StampedLock是一個讀寫鎖,因此也會像讀寫鎖那樣,把一 個state變數分成兩半,分別表示讀鎖和寫鎖的狀態。同時,它還需要 一個數據的version。但正如前面所說,一次CAS沒有辦法操作兩個變 量,所以這個state變數本身同時也表示了資料的version。下面先分析state變數。

  • 用最低的8位表示讀和寫的狀態,其中第8位表 示寫鎖的狀態,最低的7位表示讀鎖的狀態。因為寫鎖只有一個bit位,所以寫鎖是不可重入的。

4.4 悲觀讀/寫:“阻塞”與“自旋”策略實現差異

同ReadWriteLock一樣,StampedLock也要進行悲觀的讀鎖和寫鎖 操作。不過,它不是基於AQS實現的,而是內部重新實現了一個阻塞佇列

public class StampedLock implements java.io.Serializable {
	static final class WNode {
        volatile WNode prev;
        volatile WNode next;
        volatile WNode cowait;    // list of linked readers
        volatile Thread thread;   // non-null while possibly parked
        volatile int status;      // 0, WAITING, or CANCELLED
        final int mode;           // RMODE or WMODE
        WNode(int m, WNode p) { mode = m; prev = p; }
    }
}

這個阻塞佇列和 AQS 裡面的很像。剛開始的時候,whead=wtail=NULL,然後初始化,建一個空節點,whead和wtail都指向這個空節 點,之後往裡面加入一個個讀執行緒或寫執行緒節點。但基於這個阻塞隊 列實現的鎖的排程策略和AQS很不一樣,也就是“自旋”。在AQS裡 面,當一個執行緒CAS state失敗之後,會立即加入阻塞佇列,並且進入 阻塞狀態。但在StampedLock中,CAS state失敗之後,會不斷自旋, 自旋足夠多的次數之後,如果還拿不到鎖,才進入阻塞狀態。為此, 根據CPU的核數,定義了自旋次數的常量值。如果是單核的CPU,肯定不能自旋,在多核情況下,才採用自旋策略。