1. 程式人生 > >【java基礎】ReentrantReadWriteLock原始碼及實現原理分析

【java基礎】ReentrantReadWriteLock原始碼及實現原理分析

繼承關係

ReadLock和WriteLock是ReentrantReadWriteLock的兩個內部類,Lock的上鎖和釋放鎖都是通過AQS來實現的。

AQS定義了獨佔模式的acquire()和release()方法,共享模式的acquireShared()和releaseShared()方法.還定義了抽象方法tryAcquire()、tryAcquiredShared()、tryRelease()和tryReleaseShared()由子類實現,tryAcquire()和tryAcquiredShared()分別對應獨佔模式和共享模式下的鎖的嘗試獲取,就是通過這兩個方法來實現公平性和非公平性,在嘗試獲取中,如果新來的執行緒必須先入隊才能獲取鎖就是公平的,否則就是非公平的。這裡可以看出AQS定義整體的同步器框架,具體實現放手交由子類實現。

原始碼分析

ReadLock和WriteLock方法都是通過呼叫Sync的方法實現的,所以我們先來分析一下Sync原始碼:

AQS 的狀態state是32位(int 型別)的,辦成兩份,讀鎖用高16位,表示持有讀鎖的執行緒數(sharedCount),寫鎖低16位,表示寫鎖的重入次數 (exclusiveCount)。狀態值為 0 表示鎖空閒,sharedCount不為 0 表示分配了讀鎖,exclusiveCount 不為 0 表示分配了寫鎖,sharedCount和exclusiveCount 一般不會同時不為 0,只有當執行緒佔用了寫鎖,該執行緒可以重入獲取讀鎖,反之不成立。

abstract static class Sync extends AbstractQueuedSynchronizer {
  
       static final int SHARED_SHIFT   = 16;
       // 由於讀鎖用高位部分,所以讀鎖個數加1,其實是狀態值加 2^16
       static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
       // 寫鎖的可重入的最大次數、讀鎖允許的最大數量
       static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
       // 寫鎖的掩碼,用於狀態的低16位有效值
       static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
       // 讀鎖計數,當前持有讀鎖的執行緒數
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    // 寫鎖的計數,也就是它的重入次數
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}

 重入計數:

abstract static class Sync extends AbstractQueuedSynchronizer {
    /**
     * 每個執行緒特定的 read 持有計數。存放在ThreadLocal,不需要是執行緒安全的。
     */
    static final class HoldCounter {
        int count = 0;
        // 使用id而不是引用是為了避免保留垃圾。注意這是個常量。
        final long tid = Thread.currentThread().getId();
    }
    /**
     * 採用繼承是為了重寫 initialValue 方法,這樣就不用進行這樣的處理:
     * 如果ThreadLocal沒有當前執行緒的計數,則new一個,再放進ThreadLocal裡。
     * 可以直接呼叫 get。
     * */
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }
    /**
     * 儲存當前執行緒重入讀鎖的次數的容器。在讀鎖重入次數為 0 時移除。
     */
    private transient ThreadLocalHoldCounter readHolds;
    /**
     * 最近一個成功獲取讀鎖的執行緒的計數。這省卻了ThreadLocal查詢,
     * 通常情況下,下一個釋放執行緒是最後一個獲取執行緒。這不是 volatile 的,
     * 因為它僅用於試探的,執行緒進行快取也是可以的
     * (因為判斷是否是當前執行緒是通過執行緒id來比較的)。
     */
    private transient HoldCounter cachedHoldCounter;
    /**
     * firstReader是這樣一個特殊執行緒:它是最後一個把 共享計數 從 0 改為 1 的
     * (在鎖空閒的時候),而且從那之後還沒有釋放讀鎖的。如果不存在則為null。
     * firstReaderHoldCount 是 firstReader 的重入計數。
     *
     * firstReader 不能導致保留垃圾,因此在 tryReleaseShared 裡設定為null,
     * 除非執行緒異常終止,沒有釋放讀鎖。
     *
     * 作用是在跟蹤無競爭的讀鎖計數時非常便宜。
     *
     * firstReader及其計數firstReaderHoldCount是不會放入 readHolds 的。
     */
    private transient Thread firstReader = null;
    private transient int firstReaderHoldCount;
    Sync() {
        readHolds = new ThreadLocalHoldCounter();
        setState(getState()); // 確保 readHolds 的記憶體可見性,利用 volatile 寫的記憶體語義。
    }
}

Sync中提供了很多方法,但是有兩個方法是抽象的,子類必須實現。下面以FairSync為例,分析一下這兩個抽象方法:

static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }

writerShouldBlock和readerShouldBlock方法都表示當有別的執行緒也在嘗試獲取鎖時,是否應該阻塞。 
對於公平模式,hasQueuedPredecessors()方法表示前面是否有等待執行緒。一旦前面有等待執行緒,那麼為了遵循公平,當前執行緒也就應該被掛起。 

下面再來看NonfairSync的實現:

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // 寫執行緒總是可以闖入
        }
        final boolean readerShouldBlock() {
           //
            return apparentlyFirstQueuedIsExclusive();
        }
    }
/**如果頭節點的下一個節點是獨佔執行緒,為了防止獨佔執行緒也就是寫執行緒飢餓等待,則後入執行緒應該排隊,否則可以闖入*/
final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }

從上面可以看到,非公平模式下,writerShouldBlock直接返回false,說明不需要阻塞;而readShouldBlock呼叫了apparentFirstQueuedIsExcluisve()方法。如果等待佇列中第一個等待執行緒想獲取寫鎖,返回true;否則返回false。也就說明,如果等待佇列中第一個等待執行緒想獲取寫鎖,那麼該讀執行緒應該阻塞。

如果當前全域性處於讀鎖狀態,且等待佇列中第一個等待執行緒想獲取寫鎖,那麼當前執行緒能夠獲取到讀鎖的條件為:當前執行緒獲取了寫鎖,還未釋放;當前執行緒獲取了讀鎖,這一次只是重入讀鎖而已;其它情況當前執行緒入隊尾。之所以這樣處理一方面是為了效率,一方面是為了避免想獲取寫鎖的執行緒飢餓,老是得不到執行的機會     。

例如:執行緒C請求一個寫鎖,由於當前其他兩個執行緒擁有讀鎖,寫鎖獲取失敗,執行緒C入佇列(根據規則i),如下所示

  

AQS初始化會建立一個空的頭節點,C入佇列,然後會休眠,等待其他執行緒釋放鎖喚醒。

此時執行緒D也來了,執行緒D想獲取一個讀鎖,上面規則,佇列中第一個等待執行緒C請求的是寫鎖,為避免寫鎖遲遲獲取不到,並且執行緒D不是重入獲取讀鎖,所以執行緒D也入隊,如下圖所示:

     

讀鎖獲取

獲取共享lock 方法 acquireShared

public final void acquireShared(int arg){
    if(tryAcquireShared(arg) < 0){  // 1. 呼叫子類, 獲取共享 lock  返回 < 0, 表示失敗
        doAcquireShared(arg);       // 2. 呼叫 doAcquireShared 當前 執行緒加入 Sync Queue 裡面, 等待獲取 lock
    }
}
複製程式碼

Sync實現的嘗試獲取鎖

在以下幾種情況,獲取讀鎖會失敗:

(1)有執行緒持有寫鎖,且該執行緒不是當前執行緒,獲取鎖失敗。

(2)寫鎖空閒 且  公平策略決定 讀執行緒應當被阻塞,除了重入獲取,其他獲取鎖失敗。

(3)讀鎖數量達到最多,丟擲異常。

除了以上三種情況,該執行緒會迴圈嘗試獲取讀鎖直到成功。

protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;            //1.有執行緒持有寫鎖,且該執行緒不是當前執行緒,獲取鎖失敗
            int r = sharedCount(c);   //2.獲取讀鎖計數
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {//3.如果不應該阻塞,且讀鎖數<MAX_COUNT且設定同步狀態state成功,獲取鎖成功。
                if (r == 0) {            //下面對firstReader的處理:firstReader是不會放到readHolds裡的,這樣,在讀鎖只有一個的情況下,就避免了查詢readHolds。
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
//  // 非 firstReader 讀鎖重入計數更新
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != current.getId())
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
                //4.獲取讀鎖失敗,放到迴圈裡重試。
            return fullTryAcquireShared(current);
        }
final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;     //1.有執行緒持有寫鎖,且該執行緒不是當前執行緒,獲取鎖失敗
                    //2.有執行緒持有寫鎖,且該執行緒是當前執行緒,則應該放行讓其重入獲取鎖,否則會造成死鎖。
                } else if (readerShouldBlock()) {
                    //3.寫鎖空閒  且  公平策略決定 讀執行緒應當被阻塞
                      // 下面的處理是說,如果是已獲取讀鎖的執行緒重入讀鎖時,
                      // 即使公平策略指示應當阻塞也不會阻塞。
                      // 否則,這也會導致死鎖的。
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != current.getId()) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        //4.需要阻塞且是非重入(還未獲取讀鎖的),獲取失敗。
                        if (rh.count == 0)
                            return -1;
                    }

                }
                //5.寫鎖空閒  且  公平策略決定執行緒可以獲取讀鎖
                if (sharedCount(c) == MAX_COUNT)//6.讀鎖數量達到最多
                    throw new Error("Maximum lock count exceeded");
                //7. 申請讀鎖成功,下面的處理跟tryAcquireShared是類似的。
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != current.getId())
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

獲取共享lock 方法 doAcquireShared

private void doAcquireShared(int arg){
    final Node node = addWaiter(Node.SHARED);       // 1. 將當前的執行緒封裝成 Node 加入到 Sync Queue 裡面
    boolean failed = true;

    try {
        boolean interrupted = false;
        for(;;){
            final Node p = node.predecessor();      // 2. 獲取當前節點的前繼節點 (當一個n在 Sync Queue 裡面, 並且沒有獲取 lock 的 node 的前繼節點不可能是 null)
            if(p == head){
                int r = tryAcquireShared(arg);      // 3. 判斷前繼節點是否是head節點(前繼節點是head, 存在兩種情況 (1) 前繼節點現在佔用 lock (2)前繼節點是個空節點, 已經釋放 lock, node 現在有機會獲取 lock); 則再次呼叫 tryAcquireShared 嘗試獲取一下
                if(r >= 0){
                    setHeadAndPropagate(node, r);   // 4. 獲取 lock 成功, 設定新的 head, 並喚醒後繼獲取  readLock 的節點
                    p.next = null; // help GC
                    if(interrupted){               // 5. 在獲取 lock 時, 被中斷過, 則自己再自我中斷一下(外面的函式可能需要這個引數)
                        selfInterrupt();
                    }
                    failed = false;
                    return;
                }
            }

            if(shouldParkAfterFailedAcquire(p, node) && // 6. 呼叫 shouldParkAfterFailedAcquire 判斷是否需要中斷(這裡可能會一開始 返回 false, 但在此進去後直接返回 true(主要和前繼節點的狀態是否是 signal))
                    parkAndCheckInterrupt()){           // 7. 現在lock還是被其他執行緒佔用 那就睡一會, 返回值判斷是否這次執行緒的喚醒是被中斷喚醒
                interrupted = true;
            }
        }
    }finally {
        if(failed){             // 8. 在整個獲取中出錯(比如執行緒中斷/超時)
            cancelAcquire(node);  // 9. 清除 node 節點(清除的過程是先給 node 打上 CANCELLED標誌, 然後再刪除)
        }
    }
}

獨佔鎖模式獲取成功以後設定頭結點然後返回中斷狀態,結束流程。而共享鎖模式獲取成功以後,呼叫了setHeadAndPropagate方法,從方法名就可以看出除了設定新的頭結點以外還有一個傳遞動作,一起看下程式碼:

    //兩個入參,一個是當前成功獲取共享鎖的節點,一個就是tryAcquireShared方法的返回值,注意上面說的,它可能大於0也可能等於0
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; //記錄當前頭節點
        //設定新的頭節點,即把當前獲取到鎖的節點設定為頭節點
        //注:這裡是獲取到鎖之後的操作,不需要併發控制
        setHead(node);
        //這裡意思有兩種情況是需要執行喚醒操作
        //1.propagate > 0 表示呼叫方指明瞭後繼節點有可能需要被喚醒,因為此方法是獲取讀鎖過程呼叫,那麼後面節點很可能也要獲取讀鎖
        //2.頭節點後面的節點需要被喚醒(waitStatus<0),不論是老的頭結點還是新的頭結點
        if (propagate > 0 || h == null || h.waitStatus < 0) {
            Node s = node.next;
            //如果當前節點的後繼節點是共享型別獲取沒有後繼節點,則進行喚醒
            //這裡可以理解為除非明確指明不需要喚醒(後繼等待節點是獨佔型別),否則都要喚醒
            //這裡的初衷是   後一個節點正好是共享節點,就喚醒,實現共享,獨佔有鎖釋放時候喚醒
            if (s == null || s.isShared())
                //後面詳細說
                doReleaseShared();
        }
    }

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

注:這個喚醒操作在releaseShared()方法裡也會呼叫。喚醒後面想獲取鎖的節點。

private void doReleaseShared() {
        for (;;) {
            //喚醒操作由頭結點開始,注意這裡的頭節點已經是上面新設定的頭結點了
            //其實就是喚醒上面新獲取到共享鎖的節點的後繼節點
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //表示後繼節點需要被喚醒
                if (ws == Node.SIGNAL) {
                    //這裡需要控制併發,因為入口有setHeadAndPropagate跟releaseShared兩個,避免兩次unpark
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;      
                    //執行喚醒操作      
                    unparkSuccessor(h);
                }
                //如果後繼節點暫時不需要喚醒,則把當前節點狀態設定為PROPAGATE確保以後可以傳遞下去
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                
            }
            //如果頭結點沒有發生變化,表示設定完成,退出迴圈
            //如果頭結點發生變化,比如說其他執行緒獲取到了鎖,為了使自己的喚醒動作可以傳遞,必須進行重試
            if (h == head)                   
                break;
        }
    }

這裡分析一下共享鎖是如何進行傳遞的

讀鎖的釋放

 public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

釋放鎖tryReleaseShared由子類Sync實現

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    // 清理firstReader快取 或 readHolds裡的重入計數
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != current.getId())
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            // 完全釋放讀鎖
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count; // 主要用於重入退出
    }
    // 迴圈在CAS更新狀態值,主要是把讀鎖數量減 1
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // 釋放讀鎖對其他讀執行緒沒有任何影響,
            // 但可以允許等待的寫執行緒繼續,如果讀鎖、寫鎖都空閒。
            return nextc == 0;
    }
}

寫鎖的獲取

寫鎖的獲取和ReentrantLock獨佔鎖的鎖獲取過程幾乎一樣,除了tryAcquire()方法,要考慮讀鎖的情況。

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

在以下情況,寫鎖獲取失敗:

(1) 寫鎖為0,讀鎖不為0    或者寫鎖不為0,且當前執行緒不是已獲取獨佔鎖的執行緒,鎖獲取失敗。

(2)寫鎖數量已達到最大值,寫鎖獲取失敗。

(3)當前執行緒應該阻塞,或者設定同步狀態state失敗,獲取鎖失敗。

protected final boolean tryAcquire(int acquires) {

            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // 1.寫鎖為0,讀鎖不為0    或者寫鎖不為0,且當前執行緒不是已獲取獨佔鎖的執行緒,鎖獲取失敗
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                //2. 寫鎖數量已達到最大值,寫鎖獲取失敗
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            //3.當前執行緒應該阻塞,或者設定同步狀態state失敗,獲取鎖失敗。
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

 寫鎖的釋放

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

 總結:

(1)首先說一下公平鎖和非公平鎖的區別,

公平鎖:當執行緒發現已經有執行緒在排對獲取鎖了,那麼它必須排隊,除了一種情況就是,執行緒已經佔有鎖,此次是重入,不用排隊。

非公平鎖:只有一種情況需排隊,其他情況不用排隊就可以嘗試獲取鎖: 如果當前全域性處於讀鎖狀態,且等待佇列中第一個等待執行緒想獲取寫鎖,那麼當前執行緒能夠獲取到讀鎖的條件為:當前執行緒獲取了寫鎖,還未釋放;當前執行緒獲取了讀鎖,這一次只是重入讀鎖而已;其它情況當前執行緒入隊尾。

(2)獲取讀鎖和釋放讀鎖

獲取鎖的過程:

  1. 當執行緒呼叫acquireShared()申請獲取鎖資源時,如果成功,則進入臨界區。
  2. 當獲取鎖失敗時,則建立一個共享型別的節點並進入一個FIFO等待佇列,然後被掛起等待喚醒。
  3. 當佇列中的等待執行緒被喚醒以後就重新嘗試獲取鎖資源,如果成功則喚醒後面還在等待的共享節點並把該喚醒事件傳遞下去,即會依次喚醒在該節點後面的所有共享節點,然後進入臨界區,否則繼續掛起等待。

釋放鎖過程:

  1. 當執行緒呼叫releaseShared()進行鎖資源釋放時,如果釋放成功,則喚醒佇列中等待的節點,如果有的話。

(3)跟獨佔鎖相比,共享鎖的主要特徵在於當一個在等待佇列中的共享節點成功獲取到鎖以後(它獲取到的是共享鎖),既然是共享,那它必須要依次喚醒後面所有可以跟它一起共享當前鎖資源的節點,毫無疑問,這些節點必須也是在等待共享鎖(這是大前提,如果等待的是獨佔鎖,那前面已經有一個共享節點獲取鎖了,它肯定是獲取不到的)。當共享鎖被釋放的時候,可以用讀寫鎖為例進行思考,當一個讀鎖被釋放,此時不論是讀鎖還是寫鎖都是可以競爭資源的。

參考文章: