1. 程式人生 > >ReentrantReadWriteLock 原始碼分析

ReentrantReadWriteLock 原始碼分析

ReentrantReadWriteLock

1)獲取順序:
非公平模式(預設):連續競爭的非公平鎖可能無限期地推遲一個或多個 reader 或 writer 執行緒,但吞吐量通常要高於公平鎖。
公平模式:當某個執行緒釋放當前保持的鎖時,可以為等待時間最長的單個 writer 執行緒分配寫入鎖,如果有一組等待時間大於所有正在等待的 writer 執行緒的 reader 執行緒,則將為該組分配讀取鎖。
2)重入:此鎖允許 reader 和 writer 按照 ReentrantLock 的樣式重新獲取讀取鎖或寫入鎖。
3)鎖降級:重入還允許從寫入鎖降級為讀取鎖,其實現方式是:先獲取寫入鎖,然後獲取讀取鎖,最後釋放寫入鎖。但是,從讀取鎖升級到寫入鎖是不可能的。
4)鎖獲取的中斷:讀取鎖和寫入鎖都支援鎖獲取期間的中斷。
5)同步狀態值的高 16 位儲存讀取鎖被持有的次數,低 16 位儲存寫入鎖被持有的次數。

建立例項

    /** 內部類實現的讀鎖 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** 內部類實現的寫鎖 */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** 實現鎖操作的同步器 */
    final Sync sync;

    /**
     * 建立一個非公平的讀寫鎖例項
     */
    public ReentrantReadWriteLock() {
        this(false);
    }

    /**
     * 1)fair=true,建立一個公平的讀寫鎖例項。
     * 2)fair=false,建立一個非公平的讀寫鎖例項。
     */
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        // 基於當期例項建立讀鎖和寫鎖
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

讀鎖獲取:ReadLock#lock

        /**
         * 1)如果寫鎖沒有被其他執行緒持有,則成功獲取讀鎖並返回
         * 2)寫鎖被其他執行緒持有,則進入阻塞狀態。
         */
        @Override
        public void lock() {
            sync.acquireShared(1);
        }

AbstractQueuedSynchronizer#acquireShared
    /**
     * 在共享模式下獲取鎖,忽略執行緒中斷。
     */
    public final void acquireShared(int arg) {
        // 嘗試獲取共享鎖
        if (tryAcquireShared(arg) < 0) {
            // 再次獲取共享鎖
            doAcquireShared(arg);
        }
    }

ReentrantReadWriteLock#Sync
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;

        /**
         * 高 16 位記錄寫鎖持有次數,低 16 位記錄讀鎖持有次數
         */
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = 1 << Sync.SHARED_SHIFT;
        static final int MAX_COUNT      = (1 << Sync.SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << Sync.SHARED_SHIFT) - 1;

        /** 讀鎖被持有的計數值 */
        static int sharedCount(int c)    { return c >>> Sync.SHARED_SHIFT; }
        /** 寫鎖被持有的計數值 */
        static int exclusiveCount(int c) { return c & Sync.EXCLUSIVE_MASK; }

        /**
         * 每個執行緒的讀鎖保持計數器
         */
        static final class HoldCounter {
            int count;          // initially 0
            // Use id, not reference, to avoid garbage retention
            final long tid = LockSupport.getThreadId(Thread.currentThread());
        }

        /**
         * 持有讀鎖計數器的執行緒區域性物件
         */
        static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
            @Override
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

        /**
         * 當前執行緒持有的讀鎖計數器物件,在建立時初始化,當讀鎖釋放時移除
         */
        private transient ThreadLocalHoldCounter readHolds;

        /**
         * 最近一個成功獲取讀鎖的執行緒的讀鎖持有計數器
         */
        private transient HoldCounter cachedHoldCounter;

        /**
         * 第一個獲取讀鎖的執行緒
         */
        private transient Thread firstReader;
        /**
         * 第一個獲取讀鎖的執行緒持有讀鎖的計數值
         */
        private transient int firstReaderHoldCount;

        Sync() {
            readHolds = new ThreadLocalHoldCounter();
            setState(getState()); // ensures visibility of readHolds
        }


    /**
     * 非公平同步器
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        @Override
        boolean writerShouldBlock() {
            return false; // writers can always barge
        }
        @Override
        boolean readerShouldBlock() {
            // 避免獲取寫鎖的執行緒飢餓
            return apparentlyFirstQueuedIsExclusive();
        }
    }

AbstractQueuedSynchronizer#apparentlyFirstQueuedIsExclusive
    /**
     * 同步佇列中第一個等待獲取鎖的執行緒,需要獲取獨佔的寫鎖,則返回 true
     */
    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        // 頭結點、第二個節點都不為 null,節點處於獨佔模式,並且節點上有駐留執行緒,表示有執行緒在等待獲取寫鎖。
        return (h = head) != null &&
                (s = h.next)  != null &&
                !s.isShared()         &&
                s.thread != null;
    }

ReentrantReadWriteLock#Sync#tryAcquireShared
        @Override
        @ReservedStackAccess
        protected final int tryAcquireShared(int unused) {
            // 讀取當前執行緒
            final Thread current = Thread.currentThread();
            // 讀取同步狀態
            final int c = getState();
            /**
             * 1)如果寫鎖已經被執行緒持有,並且不是當前執行緒,則獲取失敗,返回 -1。
             */
            if (Sync.exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current) {
                return -1;
            }
            /**
             * 2)寫鎖未被任何執行緒持有,或寫鎖被當前執行緒持有。
             */
            // 讀取讀鎖計數值
            final int r = Sync.sharedCount(c);
            /**
             * 獲取讀鎖的執行緒是否應該被阻塞【同步佇列第一個阻塞執行緒在等待獲取寫鎖】
             * && 讀鎖的佔用計數值 < 65535
             * && 比較設定讀鎖的新計數值
             */
            if (!readerShouldBlock() &&
                    r < Sync.MAX_COUNT &&
                    compareAndSetState(c, c + Sync.SHARED_UNIT)) {
                // 1)如果是第一個獲取讀鎖的執行緒
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                    // 2)當前執行緒是第一個獲取讀鎖的執行緒,並在重複獲取讀鎖
                } else if (firstReader == current) {
                    // 累積讀鎖持有計數值
                    firstReaderHoldCount++;
                    // 3)當前執行緒不是第一個獲取讀鎖的執行緒
                } else {
                    // 讀取最近一個成功獲取讀鎖的執行緒的讀鎖持有計數器
                    HoldCounter rh = cachedHoldCounter;
                    // 如果最近獲取讀鎖的執行緒不是當前執行緒
                    if (rh == null ||
                            rh.tid != LockSupport.getThreadId(current)) {
                        // 獲取當前執行緒的持有計數器
                        cachedHoldCounter = rh = readHolds.get();
                    } else if (rh.count == 0) {
                        readHolds.set(rh);
                    }
                    // 遞增計數值
                    rh.count++;
                }
                return 1;
            }
            // 寫鎖已經被當前執行緒持有,正在獲取讀鎖
            return fullTryAcquireShared(current);
        }

        /**
         * Full version of acquire for reads, that handles CAS misses
         * and reentrant reads not dealt with in tryAcquireShared.
         */
        final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                // 讀取同步狀態
                final int c = getState();
                // 1)有執行緒持有寫鎖
                if (Sync.exclusiveCount(c) != 0) {
                    // 如果不是當前執行緒持有,則返回 -1
                    if (getExclusiveOwnerThread() != current)
                    {
                        return -1;
                        // else we hold the exclusive lock; blocking here
                        // would cause deadlock.
                    }
                // 2)寫鎖沒有被任何執行緒持有
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null ||
                                    rh.tid != LockSupport.getThreadId(current)) {
                                rh = readHolds.get();
                                // 讀鎖持有計數值為 0,則移除計數器
                                if (rh.count == 0) {
                                    readHolds.remove();
                                }
                            }
                        }
                        if (rh.count == 0) {
                            return -1;
                        }
                    }
                }
                // 讀鎖的獲取計數值已經為最大值
                if (Sync.sharedCount(c) == Sync.MAX_COUNT) {
                    throw new Error("Maximum lock count exceeded");
                }
                // 比較更新讀鎖的計數值
                if (compareAndSetState(c, c + Sync.SHARED_UNIT)) {
                    // 1)當前執行緒是第一個獲取讀鎖的執行緒
                    if (Sync.sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    // 2)當前執行緒是第一個獲取讀鎖的執行緒,重複獲取   
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                        }
                        if (rh == null ||
                                rh.tid != LockSupport.getThreadId(current)) {
                            rh = readHolds.get();
                        } else if (rh.count == 0) {
                            readHolds.set(rh);
                        }
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

AbstractQueuedSynchronizer#doAcquireShared
    /**
     * 在共享模式下獲取鎖,執行緒能響應中斷
     */
    private void doAcquireShared(int arg) {
        // 建立一個共享節點
        final Node node = addWaiter(Node.SHARED);
        boolean interrupted = false;
        try {
            for (;;) {
                // 讀取前置節點
                final Node p = node.predecessor();
                // 前置節點是頭節點
                if (p == head) {
                    // 嘗試獲取鎖
                    final int r = tryAcquireShared(arg);
                    /**
                     * 第一個等待獲取寫鎖的執行緒已經成功獲取寫鎖,並且已經使用完畢而釋放,
                     * 當前執行緒成功獲取讀鎖。
                     */
                    if (r >= 0) {
                        // 設定當前節點為新的頭節點,並嘗試將訊號往後傳播,喚醒等待獲取讀鎖的執行緒
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                // 當前節點的駐留執行緒需要被阻塞,則阻塞當前執行緒
                if (AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(p, node)) {
                    interrupted |= parkAndCheckInterrupt();
                }
            }
        } catch (final Throwable t) {
            cancelAcquire(node);
            throw t;
        } finally {
            if (interrupted) {
                AbstractQueuedSynchronizer.selfInterrupt();
            }
        }
    }

AbstractQueuedSynchronizer#setHeadAndPropagate
    /**
     * 設定頭節點,如果同步佇列中有等待獲取讀鎖的執行緒,則嘗試喚醒
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
            // 讀取當前節點的後置節點,如果其為 null 或處於共享模式【等待獲取讀鎖】
            final Node s = node.next;
            if (s == null || s.isShared()) {
                // 嘗試釋放後繼節點
                doReleaseShared();
            }
        }
    }

AbstractQueuedSynchronizer#doReleaseShared
    /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        for (;;) {
            // 讀取頭節點
            final Node h = head;
            // 頭結點和尾節點不相等,表示有執行緒在等待獲取鎖
            if (h != null && h != tail) {
                // 讀取頭節點的同步狀態
                final int ws = h.waitStatus;
                // 後繼節點需要被喚醒
                if (ws == Node.SIGNAL) {
                    // 比較更新同步狀態
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                    {
                        // 更新失敗則再次確認
                        continue;            // loop to recheck cases
                    }
                    // 同步狀態成功更新為 0,則喚醒後繼節點的駐留執行緒
                    unparkSuccessor(h);
                }
                // 同步狀態為 0,並且比較更新為 PROPAGATE 失敗,則繼續迴圈
                else if (ws == 0 &&
                        !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                {
                    continue;                // loop on failed CAS
                }
            }
            /**
             * 1)後繼節點被喚醒並且還未成功獲取到鎖,則直接退出迴圈,此時只喚醒了一個執行緒。
             * 2)被喚醒的後繼節點成功獲取到讀鎖,駐留執行緒已經被釋放,此時頭節點已經改變,則進行重試。
             */
            if (h == head) {
                break;
            }
        }
    }

讀鎖釋放:ReadLock#unlock

        /**
         * 釋放讀鎖,如果讀鎖的持有計數值為 0,則寫鎖可以被獲取。
         */
        @Override
        public void unlock() {
            sync.releaseShared(1);
        }

        @Override
        @ReservedStackAccess
        protected final boolean tryReleaseShared(int unused) {
            // 讀取當前執行緒
            final Thread current = Thread.currentThread();
            // 1)當前執行緒是獲取讀鎖的第一個執行緒
            if (firstReader == current) {
                // 讀鎖持有計數為 1,則釋放後為 0,
                if (firstReaderHoldCount == 1) {
                    firstReader = null;
                } else {
                    // 讀鎖被當前執行緒多次持有,則遞減讀鎖持有計數值
                    firstReaderHoldCount--;
                }
            // 2)當前執行緒不是持有讀鎖的執行緒
            } else {
                // 最近持有讀鎖的執行緒計數值
                HoldCounter rh = cachedHoldCounter;
                if (rh == null ||
                        rh.tid != LockSupport.getThreadId(current)) {
                    // 讀取當前執行緒的讀鎖持計數值物件
                    rh = readHolds.get();
                }
                final int count = rh.count;
                if (count <= 1) {
                    // 如果為 1,則移除執行緒區域性物件
                    readHolds.remove();
                    if (count <= 0) {
                        throw Sync.unmatchedUnlockException();
                    }
                }
                // 遞減計數值
                --rh.count;
            }
            // 原子更新同步狀態值
            for (;;) {
                final int c = getState();
                final int nextc = c - Sync.SHARED_UNIT;
                if (compareAndSetState(c, nextc)) {
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
                }
            }
        }

寫鎖獲取:WriteLock#lock

ReentrantReadWriteLock#Sync
        @Override
        @ReservedStackAccess
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // 讀取同步狀態值
            final int c = getState();
            // 寫鎖被持有的計數值
            final int w = Sync.exclusiveCount(c);
            // 1)讀鎖或寫鎖至少有一個被執行緒持有
            if (c != 0) {
                /**
                 * 2)讀鎖被執行緒持有,或當前執行緒不是寫鎖持有執行緒。
                 * 目標執行緒不允許先獲取讀鎖,後獲取寫鎖,即 ReentrantReadWriteLock 不支援鎖升級。
                 */
                if (w == 0 || current != getExclusiveOwnerThread()) {
                    // 嘗試獲取失敗
                    return false;
                }
                // 寫鎖重入次數超出最大值
                if (w + Sync.exclusiveCount(acquires) > Sync.MAX_COUNT) {
                    throw new Error("Maximum lock count exceeded");
                }
                // 更新同步狀態,寫鎖獲取成功
                setState(c + acquires);
                return true;
            }
            // 讀鎖和寫鎖都未被執行緒持有,則原子更新同步狀態
            if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires)) {
                return false;
            }
            // 設定寫鎖的獨佔執行緒為當前執行緒
            setExclusiveOwnerThread(current);
            return true;
        }

寫鎖釋放:WriteLock#unlock

        /**
         * 釋放鎖,寫鎖已經被釋放,則返回 true
         */
        @Override
        @ReservedStackAccess
        protected final boolean tryRelease(int releases) {
            // 寫鎖是否被當前執行緒持有
            if (!isHeldExclusively()) {
                throw new IllegalMonitorStateException();
            }
            // 計算同步狀態值,寫鎖計數值儲存在低 16 位
            final int nextc = getState() - releases;
            // 新的寫鎖計數值為 0,則表示讀寫鎖已經自由了
            final boolean free = Sync.exclusiveCount(nextc) == 0;
            if (free) {
                // 清空獨佔鎖持有執行緒
                setExclusiveOwnerThread(null);
            }
            setState(nextc);
            return free;
        }