1. 程式人生 > 實用技巧 >執行緒基礎知識06- ReentrantReadWriteLock獨寫鎖

執行緒基礎知識06- ReentrantReadWriteLock獨寫鎖

同一時刻允許多個讀執行緒訪問,但是在寫執行緒訪問時,所有的讀執行緒和其他寫執行緒均被阻塞。
所有晚於寫操作的讀操作都會進入等待狀態。

資料結構

  • 內部類提供了讀寫鎖的子類
public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    private static final long serialVersionUID = -6992448646407690164L;
    /** 讀鎖 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** 寫鎖*/
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** 同步機制 */
    final Sync sync;
}

獨寫鎖的狀態設計

  • 獲取當前同步狀態,通過位運算,16位以上記錄的是讀的狀態數,16位以下獲取的是寫的狀態數。

  • S&(1 << 16 -1)(S是同步狀態值,將高位16位抹去)獲取寫狀態

  • S>>>16 獲取讀狀態

(圖片摘自《java併發藝術》)

同步機制Sync

  • 讀鎖和寫鎖的鎖獲取方法和釋放方法都在這個類進行實現

獨佔鎖的獲取

  • 判斷狀態是否為0,如果狀態為0,判斷是同步佇列的頭節點。如果是則設定獨佔鎖為當前佇列,如果不是返回false,增加到同步佇列中阻塞;

  • 狀態不為0時,獨佔鎖數量>0且獨佔執行緒是當前執行緒,說明當前執行緒獲得了鎖;

  • 如果獨佔鎖數量為0,或者獨佔鎖執行緒不是當前執行緒,則將當前執行緒增加到同步佇列中。

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    protected final boolean tryAcquire(int acquires) {
           
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);//獲取獨佔鎖數量
            if (c != 0) {//狀態不為0時
                if (w == 0 || current != getExclusiveOwnerThread())//獨佔鎖為空,或者當前執行緒非獨佔執行緒,返回false
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)//超過最大數量失敗
                    throw new Error("Maximum lock count exceeded");   
                setState(c + acquires);//更新鎖狀態
                return true;
            }
            /**
             * 1.同步佇列,如果當前節點還有前驅節點。
             * 2.更新鎖狀態失敗
             */ 
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);//設定當前執行緒為獨佔執行緒
            return true;
        }

/** 
 * 返回持有的鎖數量
 * 0000 0000 0000 1000  & 1111 1111 1111 1111 = 0000 0000 0000 1000
 */
tatic int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

獨佔鎖的釋放

  • 如果當前執行緒不是獨佔執行緒報錯

  • 如果獨佔執行緒狀態碼為空,設定獨佔執行緒為空

  • 更新狀態碼

 protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())//判斷當前執行緒是不是獨佔執行緒,不是的話報錯
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;//獨佔鎖是否完全釋放
            if (free)//是的話設定獨佔執行緒為空
                setExclusiveOwnerThread(null);
            setState(nextc);//設定狀態碼
            return free;
        }

共享鎖的獲取

  • 如果存在獨佔鎖並且獨佔鎖不是當前執行緒的情況,當前執行緒加入到佇列中;

  • 獲取同步佇列,如果當前執行緒是同步佇列頭節點,則調整狀態,並記錄當前執行緒。

  • 如果當前執行緒不是以上情況,則進行CAS自旋鎖獲取。

  /**
   * 繼承了ThreadLocal,其實質就是將資料儲存到當前執行緒的快取資料中
   */   
       static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

 protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)//如果獨佔鎖被其他執行緒獲得,返回-1,加入同步佇列
                return -1;
            int r = sharedCount(c);//同步佇列
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {//如果為同步佇列頭節點,且小於最大數,通過CAS設定狀態碼
                if (r == 0) {//如果共享鎖為0,則設定首讀執行緒為當前執行緒,首讀執行緒的次數為1
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {如果和首讀執行緒相同,資料++
                    firstReaderHoldCount++;
                } else {//如果不是第一個執行緒,快取鎖巢狀的次數
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))//獲取當前執行緒的資料
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);//如果不是上面情況,則進行自旋CAS進行處理
        }

/**獲取讀執行緒數量*/
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/**
 * 1.通過自旋CAS進行鎖獲取
 */
 final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();//獲取狀態碼
                if (exclusiveCount(c) != 0) {//有獨佔鎖的情況
                    if (getExclusiveOwnerThread() != current)//如果獨佔執行緒不是當前執行緒返回-1,加入到同步隊列當中,如果獨佔執行緒是當前執行緒就會死鎖而造成阻塞
                        return -1;
                } else if (readerShouldBlock()) {//讀鎖阻塞狀態
                    if (firstReader == current) {//確定不是重進入鎖
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {//獲取當前執行緒
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)//當前查詢次數為0,放入到同步佇列中
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {//通過CAS替換當前狀態,以下的操作和上面類似
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // 快取進行釋放
                    }
                    return 1;//返回獲取鎖成功
                }
            }
        }

/**
 * 作用:同步佇列中的頭節點是獨佔節點,並且頭節點的後繼節點也是獨佔節點
 */
 final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }


共享鎖的釋放

  • 判斷當前執行緒是不是首讀執行緒,如果是且只讀了一次,則首讀執行緒制空;

  • 如果首讀次數大於1,說明多次巢狀,count-1

  • 如果不是首讀執行緒,就判斷當前執行緒快取中的count數進行-1;

  • 通過CAS自旋進行狀態碼更改,如果更改後狀態碼為0,則表明完全釋放

  protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {//如果是首個read執行緒
                if (firstReaderHoldCount == 1)//只訪問一次,制空;否則將訪問數減1
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();//獲取當前執行緒的記錄
                int count = rh.count;//獲取當前執行緒的記錄次數
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {//CAS自旋進行鎖釋放
                int c = getState();
                int nextc = c - SHARED_UNIT;//讀鎖狀態碼-1
                  /**
                   * 釋放讀鎖不影響讀操作
                   * 但是如果同時釋放讀鎖和寫鎖,寫鎖一定要先於讀操作,防止出現數據不一致情況
                   */      
                if (compareAndSetState(c, nextc))
                    return nextc == 0;//如果==0,說明共享鎖完全釋放成功了
            }
        }