1. 程式人生 > >讀寫鎖--ReentrantReadWriteLock

讀寫鎖--ReentrantReadWriteLock

讀寫鎖,對於讀操作來說是共享鎖,對於寫操作來說是排他鎖,兩種操作都可重入的一種鎖。底層也是用AQS來實現的,我們來看一下它的結構跟程式碼: ----------------------------------------------------------------------------------------------- 讀寫鎖,當然要區分讀跟寫兩種操作,因此其內部有ReadLock跟WriteLock兩種具體實現。但兩者也有互動的地方,比如獲取寫鎖要判斷當前是否有執行緒在讀,有的話就需要等待,因此內部是使用的同一個佇列同步器。因為獲取鎖的時候,支援公平與非公平兩種方式,故而同步器的包裝類Sync也有兩個:FairSync跟NonfairSync。 從寫鎖的獲取開始:
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();monitor
    int c = getState();  // 重入鎖的計數
    int w = exclusiveCount(c); // 計數高低位拆開為讀計數跟寫計數,計算寫計數
    if (c != 0) { // 有人在佔有鎖
        if (w == 0 || current != getExclusiveOwnerThread())  // 寫計數為0,只有讀鎖直接返回(避免了讀鎖升級為寫鎖) 或者  當前執行緒不是執行執行緒(執行執行緒可能讀也可能寫)也返回
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)  //寫鎖重入次數 > 65525,丟擲異常
            throw new Error("Maximum lock count exceeded");
        setState(c + acquires);  //重入的寫執行緒,直接設定狀態(第6行程式碼沒有return,說明當前執行緒是重入的寫執行緒(寫計數不是0,且current就是獲取鎖的執行緒))
        return true;
    }
    if (writerShouldBlock() ||  !compareAndSetState(c, c + acquires))  //c!=0沒有return,說明當前鎖是空著的,所以cas搶佔
        return false;
    setExclusiveOwnerThread(current); // 當前執行緒引數設定
    return true;
}
  這裡有個writerShouldBlock(),看一下這個是幹啥的:   追蹤原始碼可以發現,在FairSync跟NonfairSync分別都有這個方法,nonfair中直接return false,fair中是呼叫的hasQueuedPredecessors(),該方法是用來判斷是否有執行緒排隊的,這個writerShouldBlock()就是字面上意思(讀鎖是否該阻塞(排隊)),如果是非公平鎖,直接compareAndSetState進行搶佔,搶佔不到則進入排隊掛起,公平鎖,則判斷是否有排隊的,有則自己進入排隊,而不是先進行搶佔。公平鎖就像一個老實人,先排隊,沒人排隊(自己是第一個)則自己搶座位(為啥要搶,因為可能這時候也來了一個認為自己是第一個的老實人),非公平鎖就像土匪,管你排不排隊,老子先搶一把座位試試,搶不到(被別的執行緒搶走了),被別人打臉了再去排隊。綜上,第13行的邏輯就是:對於寫鎖,當前鎖沒有被佔用,如果是非公平方式獲取,直接搶佔,失敗則直接返回,公平方式則檢視是否有排隊,有則獲取失敗直接返回,無則進行搶佔。方法整體上,沒獲取到鎖返回false,獲取到了返回true,然後結合AQS的程式碼:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&  acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //沒有獲取到,就往等待佇列新增節點,然後掛起執行緒
        selfInterrupt();
}
  so,整個這個獲取寫鎖的過程就是:檢視有沒有人佔用鎖,如果有讀鎖,則進入佇列等待;有寫鎖,則看是否是自己的執行緒,是則重設state然後繼續執行,不是則進入佇列等待。tryAcquire一定要跟acquire聯合起來看,否則難以理清整個流程。   對了,還有個exclusiveCount(int c)用來拆分高低位的方法要說一下,因為讀寫鎖要分別記錄讀跟寫被重入的次數,按照一般設計,這分兩個變數來計數就行了,但jdk就是jdk,它把這兩個用一個int變數來記錄了。方法麼,就是高低位分開計算的,高16位表示讀狀態,低16位表示寫狀態。假設同步狀態為s,則寫狀態為s & 0x0000FFFF,相當於高16位全部清0,同理,讀狀態則為s >>> 16,也就是右移,相當於把低16位都移除了。當然,複雜的就是讀狀態變化的時候,不是簡單的s +1,這樣的話就加到了寫操作上,而是s + 0x00010000,把低位補上0就行了。類似的與操作、位移等等,jdk中有大量的應用,比如hashmap中確定元素所在連結串列等操作都有應用。 寫鎖釋放程式碼:

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())  //沒有寫鎖,拋異常
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;  //次數是否清0了
    if (free)  //清0 了,說明完全釋放了
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}
  釋放鎖比較簡單,不做贅述。 讀鎖獲取:

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread(); 
    int c = getState();
    if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) // 有寫鎖 且當前執行緒不是寫鎖執行緒,不能重入,失敗
        return -1;
    int r = sharedCount(c);  //向右移位,獲取讀鎖計數
    if (!readerShouldBlock() &&  r < MAX_COUNT &&  compareAndSetState(c, c + SHARED_UNIT)) { // 若無需排隊  && 讀計數<65535(16位最大值) && 狀態設定成功(讀鎖的整體計數就是在這裡改的,注意加了一個預設值的操作)
        if (r == 0) { //讀鎖為0
              firstReader = current;  //記錄第一個獲取鎖的執行緒
            firstReaderHoldCount = 1;
        } else if (firstReader == current) { //是自己重入的
            firstReaderHoldCount++;
        } else { // 
            HoldCounter rh = cachedHoldCounter;  //用於計算讀計數的計數器
            if (rh == null || rh.tid != getThreadId(current))  
                cachedHoldCounter = rh = readHolds.get();  
            else if (rh.count == 0) 
                readHolds.set(rh);
            rh.count++;   //當前執行緒的讀計數+1
        }
        return 1;
    }
    return fullTryAcquireShared(current); //第7行條件不滿足,則for迴圈獲取讀鎖(實際不會死迴圈的)
}
  讀鎖的獲取比較複雜,這裡主要有一個多執行緒各自計數的問題。對於讀鎖,除了要對全域性的state中的讀鎖的計數進行修改,還要每個執行緒各自維護一份自己重入的次數計數,這個計數存在一個ThreadLocal(readHolds)中的一個物件(cachedHoldCounter)裡邊。讀鎖獲取的邏輯是:沒有寫鎖佔用,則直接獲取讀鎖,這就是第7行的邏輯(當然,可能跟其它讀執行緒衝突導致獲取失敗,則進入fullTryAcquireShared(current));如果有寫鎖佔用了呢,就呼叫fullTryAcquireShared(current)獲取鎖,看一下原始碼:

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) { // 有寫鎖
            if (getExclusiveOwnerThread() != current) // 寫鎖持有者不是當前執行緒,獲取失敗,通過aqs的doAcquireShared()進入排隊
                return -1;                                                   //這裡只做了不是當前執行緒的判斷,如果是當前執行緒,這個地方不能進行排隊,因為若已有寫執行緒在排隊的話,就會造成死鎖,原始碼中else一句的英文備註就是說這個
        } else if (readerShouldBlock()) { //沒寫鎖,但可能有寫鎖在等待讀鎖釋放!!需要排隊
                       // 寫鎖空閒  且  公平策略決定 執行緒應當被阻塞
            	       // 下面的處理是說,如果是已獲取讀鎖的執行緒重入讀鎖時, 即使公平策略指示應當阻塞也不會阻塞。
                       // 否則,這也會導致死鎖的。
            if (firstReader == current) { //
                // assert firstReaderHoldCount > 0;
            } else {
            // threadlocal 相關處理
                if (rh.count == 0)    // 需要阻塞且是非重入(還未獲取讀鎖的),獲取失敗
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) { // cas 搶鎖成功
   	//threadlocal相關處理
            return 1;
        }
    }
}
   讀鎖的釋放:
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {// 清理firstReader快取 或 readHolds裡的重入計數
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) { //完全釋放讀鎖
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;  // 主要用於重入退出
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
// 釋放讀鎖對其他讀執行緒沒有任何影響,
// 但可以允許等待的寫執行緒繼續,如果讀鎖、寫鎖都空閒。
            return nextc == 0;
    }
}
-------------------------------------------------------------------------   讀寫鎖的原始碼,讀起來比想象中的難度大得多,原因是讀鎖的部分設計比較複雜,主要涉及鎖降級,以及幾個變數跟threadlocal優化效能的處理。照著《java併發程式設計的藝術》跟一些部落格看了2天,還是沒把這部分完全理清楚,這個艱苦的工作以後繼續搞吧。   我們看一下關於鎖的降級跟升級的問題,看是如何實現的:   鎖降級的定義:鎖降級指的是寫鎖降級為讀鎖。如果當前執行緒持有寫鎖,然後將其釋放,最後再獲取讀鎖,這種分段完成的過程不能稱之為鎖降級。鎖降級指的是把持住(當前擁有的)寫鎖,再獲取到讀鎖,隨後釋放(先前擁有的)寫鎖的過程。----《java併發程式設計的藝術》   這段話的理解是不難的,但要在讀寫鎖的原始碼中去找到與之對應的邏輯是不太好找的。實際上:

if (exclusiveCount(c) != 0) {
    if (getExclusiveOwnerThread() != current)
        return -1;
  這個就是與之相關的邏輯程式碼,在tryAcquireShared跟fullTryAcquireShared(Thread current) 中都有體現。   上面的程式碼的意思是:當寫鎖被持有時,如果持有該鎖的執行緒不是當前執行緒,就返回 “獲取鎖失敗”,反之就會繼續獲取讀鎖。稱之為鎖降級。   關於鎖降級,書中還給出了一個例子:

public void processCachedData() {
    readLock.lock();
    if(!update){
        //必須先釋放讀鎖
        readLock.unlock();
        //鎖降級從寫鎖獲取到開始
        writeLock.lock();
        try{
            if(!update){
                //準備資料的流程(略)
                update = true;
            }
            readLock.lock();
        }finally {
            writeLock.unlock();
        }
        //鎖降級完成,寫鎖降級為讀鎖
    }
    try{
        //使用資料的流程
    }finally {
        readLock.unlock();
    }
}
  有一段文字說明:鎖降級中的讀鎖獲取是否必要呢?答案是必要的。主要是為了保證資料的可見性,如果當前執行緒不獲取讀鎖而是直接釋放寫鎖,假設另一個執行緒獲取了寫鎖並修改了資料,那麼當前執行緒無法感知該執行緒的資料更新。   可是,,,這裡有個疑問,另一個執行緒獲取了寫鎖,你當前執行緒還能獲取讀鎖嗎?既然不能獲取,何來無法感受資料更新一說?這個地方感覺有點問題。網上博文基本千篇一律也是說可見性如何的,跟書中觀點一樣,我覺得不對,比較贊同 https://www.jianshu.com/p/cd485e16456e這個所說的。   至於ThreadLocal相關程式碼,稍後再去理這裡邊的邏輯。