1. 程式人生 > >併發程式設計之——讀鎖原始碼分析(解釋關於鎖降級的爭議)

併發程式設計之——讀鎖原始碼分析(解釋關於鎖降級的爭議)

1. 前言

在前面的文章 併發程式設計之——寫鎖原始碼分析中,我們分析了 1.8 JUC 中讀寫鎖中的寫鎖的獲取和釋放過程,今天來分析一下讀鎖的獲取和釋放過程,讀鎖相比較寫鎖要稍微複雜一點,其中還有一點有爭議的地方——鎖降級

今天就來解開迷霧。

2. 獲取讀鎖 tryAcquireShared 方法

首先說明,獲取讀鎖的過程是獲取共享鎖的過程。

程式碼加註釋如下:

protected final int11 tryAcquireShared(int unused) {

    Thread current = Thread.currentThread();
    int c = getState();
    // exclusiveCount(c) != 0 ---》 用 state & 65535 得到低 16 位的值。如果不是0,說明寫鎖別持有了。
// getExclusiveOwnerThread() != current----> 不是當前執行緒 // 如果寫鎖被霸佔了,且持有執行緒不是當前執行緒,返回 false,加入佇列。獲取寫鎖失敗。 // 反之,如果持有寫鎖的是當前執行緒,就可以繼續獲取讀鎖了。 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) // 獲取鎖失敗 return -1; // 如果寫鎖沒有被霸佔,則將高16位移到低16位。 int
r = sharedCount(c);// c >>> 16 // !readerShouldBlock() 和寫鎖的邏輯一樣(根據公平與否策略和佇列是否含有等待節點) // 不能大於 65535,且 CAS 修改成功 if (!readerShouldBlock() && r < 65535 && compareAndSetState(c, c + 65536)) { // 如果讀鎖是空閒的, 獲取鎖成功。 if (r == 0) { // 將當前執行緒設定為第一個讀鎖執行緒
firstReader = current; // 計數器為1 firstReaderHoldCount = 1; }// 如果讀鎖不是空閒的,且第一個讀執行緒是當前執行緒。獲取鎖成功。 else if (firstReader == current) {// // 將計數器加一 firstReaderHoldCount++; } else {// 如果不是第一個執行緒,獲取鎖成功。 // cachedHoldCounter 代表的是最後一個獲取讀鎖的執行緒的計數器。 HoldCounter rh = cachedHoldCounter; // 如果最後一個執行緒計數器是 null 或者不是當前執行緒,那麼就新建一個 HoldCounter 物件 if (rh == null || rh.tid != getThreadId(current)) // 給當前執行緒新建一個 HoldCounter cachedHoldCounter = rh = readHolds.get(); // 如果不是 null,且 count 是 0,就將上個執行緒的 HoldCounter 覆蓋本地的。 else if (rh.count == 0) readHolds.set(rh); // 對 count 加一 rh.count++; } return 1; } // 死迴圈獲取讀鎖。包含鎖降級策略。 return fullTryAcquireShared(current); }

總結一下上面程式碼的邏輯吧!

  1. 判斷寫鎖是否空閒。
  2. 如果不是空閒,且當前執行緒不是持有寫鎖的執行緒,則返回 -1 ,表示搶鎖失敗。如果是空閒的,進入第三步。如果是當前執行緒,進入第三步。
  3. 判斷持有讀鎖的數量是否超過 65535,然後使用 CAS 設定 int 高 16 位的值,也就是加一。
  4. 如果設定成功,且是第一次獲取讀鎖,就設定 firstReader 相關的屬性(為了效能提升)。
  5. 如果不是第一次,噹噹前執行緒就是第一次獲取讀鎖的執行緒,對 “第一次獲取讀鎖執行緒計數器” 加 1.
  6. 如果都不是,則獲取最後一個讀鎖的執行緒計數器,判斷這個計數器是不是當前執行緒的。如果是,加一,如果不是,自己建立一個新計數器,並更新 “最後讀取的執行緒計數器”(也是為了效能考慮)。最後加一。返回成功。
  7. 如果上面的判斷失敗了(CAS 設定失敗,或者佇列有等待的執行緒(公平情況下))。就呼叫 fullTryAcquireShared 方法死迴圈執行上面的步驟。

步驟還是有點多哈,畫個圖吧,更清晰一點。

大圖地址:https://upload-images.jianshu.io/upload_images/4236553-c747934c55844272.png?imageMogr2/auto-orient/

其實,上面的邏輯裡,是有鎖降級的邏輯在裡面的。但我們等會放在後面說。

先看看 fullTryAcquireShared 方法,其實這個方法和 tryAcquireShared 高度類似。程式碼加註釋如下:

final int fullTryAcquireShared(Thread current) {
    /*
     * 這段程式碼與tryAcquireShared中的程式碼有部分重複,但整體更簡單。
     */
    HoldCounter rh = null;
    // 死迴圈
    for (;;) {
        int c = getState();
        // 如果存在寫鎖
        if (exclusiveCount(c) != 0) {
            // 並且不是當前執行緒,獲取鎖失敗,反之,如果持有寫鎖的是當前執行緒,那麼就會進入下面的邏輯。
            // 反之,如果存在寫鎖,但持有寫鎖的是當前執行緒。那麼就繼續嘗試獲取讀鎖。
            if (getExclusiveOwnerThread() != current)
                return -1;
        // 如果寫鎖空閒,且可以獲取讀鎖。
        } else if (readerShouldBlock()) {
            // 第一個讀執行緒是當前執行緒
            if (firstReader == current) {
            // 如果不是當前執行緒
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        // 從 ThreadLocal 中取出計數器
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        // 如果讀鎖次數達到 65535 ,丟擲異常
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 嘗試對 state 加 65536, 也就是設定讀鎖,實際就是對高16位加一。
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 如果讀鎖是空閒的
            if (sharedCount(c) == 0) {
                // 設定第一個讀鎖
                firstReader = current;
                // 計數器為 1
                firstReaderHoldCount = 1;
            // 如果不是空閒的,檢視第一個執行緒是否是當前執行緒。
            } else if (firstReader == current) {
                firstReaderHoldCount++;// 更新計數器
            } else {// 如果不是當前執行緒
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))// 如果最後一個讀計數器所屬執行緒不是當前執行緒。
                    // 自己建立一個。
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                // 對計數器 ++
                rh.count++;
                // 更新快取計數器。
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

這兩個方法其實高度相似的。就不再解釋了。

到這裡,其實留下了幾個問題:一個是 firstReader和 firstReaderHoldCount 的作用,還有就是 cachedHoldCounter 的作用。最後是鎖降級。

解釋一下:

  • firstReader 是獲取讀鎖的第一個執行緒。如果只有一個執行緒獲取讀鎖,很明顯,使用這樣一個變數速度更快。
  • firstReaderHoldCount是 firstReader的計數器。同上。
  • cachedHoldCounter是最後一個獲取到讀鎖的執行緒計數器,每當有新的執行緒獲取到讀鎖,這個變數都會更新。這個變數的目的是:當最後一個獲取讀鎖的執行緒重複獲取讀鎖,或者釋放讀鎖,就會直接使用這個變數,速度更快,相當於快取。

關於鎖降級,重點解釋一下,畢竟是我們的標題。

3. 鎖降級的爭議

首先,什麼是鎖降級?在讀鎖的哪個地方體現?

回答第一個問題,引自 JDK 的解釋:

鎖降級:
重入還允許從寫入鎖降級為讀取鎖,其實現方式是:先獲取寫入鎖,然後獲取讀取鎖,最後釋放寫入鎖。但是,從讀取鎖升級到寫入鎖是不可能的。

體現在讀鎖哪裡?

在 tryAcquireShared 方法和 fullTryAcquireShared 中都有體現,例如下面的判斷:

if (exclusiveCount(c) != 0) {
    if (getExclusiveOwnerThread() != current)
        return -1;

上面的程式碼的意思是:當寫鎖被持有時,如果持有該鎖的執行緒不是當前執行緒,就返回 “獲取鎖失敗”,反之就會繼續獲取讀鎖。稱之為鎖降級。

在很多書和文章中,對鎖降級都會有類似下面的解釋:

上面提到,鎖降級中,讀鎖的獲取的目的是 “為了保證資料的可見性”。而得到這個結論的依據是 “如果當前執行緒不獲取讀鎖而是直接釋放寫鎖,假設此刻另一個執行緒(記作執行緒 T)獲取了寫鎖並修改了資料,那麼當前執行緒無法感知執行緒 T 的資料更新”。

這裡貌似有個漏洞:如果另一個執行緒獲取了寫鎖(並修改了資料),那麼這個鎖就被獨佔了,沒有任何其他執行緒可以讀到資料,更不用談 “感知資料更新”。

樓主認為,鎖降級說白了就是寫鎖的一種特殊重入機制。通過這種重入,可以減少一步流程——釋放寫鎖後 再次 獲取讀鎖。

使用了鎖降級,就可以減去釋放寫鎖的步驟。直接獲取讀鎖。效率更高。而且沒有執行緒爭用。和 “可見性” 並沒有關係。

用一幅圖來展示鎖降級:

大圖地址:https://upload-images.jianshu.io/upload_images/4236553-f545a504abde8c2f.png?imageMogr2/auto-orient/

總的來說,鎖降級就是一種特殊的鎖重入機制,JDK 使用 先獲取寫入鎖,然後獲取讀取鎖,最後釋放寫入鎖 這個步驟,是為了提高獲取鎖的效率,而不是所謂的可見
最後再總結一下獲取鎖的邏輯,首先判斷寫鎖釋放被持有了,如果被持有了,且是當前執行緒,使用鎖降級,如果沒有,讀鎖正常獲取。

獲取過程中,會使用 firstReader 和 cachedHoldCounter 提高效能。

4. 讀鎖的釋放 tryReleaseShared 方法

程式碼加註釋如下:


protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    // 如果是第一個執行緒
    if (firstReader == current) {
        // 如果是 1,將第一個執行緒設定成 null。結束。
        if (firstReaderHoldCount == 1)
            firstReader = null;
        // 如果不是 1,減一操作
        else
            firstReaderHoldCount--;
    } else {//如果不是當前執行緒
        HoldCounter rh = cachedHoldCounter;
        // 如果快取是 null 或者快取所屬執行緒不是當前執行緒,則當前執行緒不是最後一個讀鎖。
        if (rh == null || rh.tid != getThreadId(current))
            // 獲取當前執行緒的計數器
            rh = readHolds.get();
        int count = rh.count;
        // 如果計數器小於等於一,就直接刪除計數器
        if (count <= 1) {
            readHolds.remove();
            // 如果計數器的值小於等於0,說明有問題了,丟擲異常
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        // 對計數器減一
        --rh.count;
    }
    for (;;) {// 死迴圈使用 CAS 修改狀態
        int c = getState();
        // c - 65536, 其實就是減去一個讀鎖。對高16位減一。
        int nextc = c - SHARED_UNIT;
        // 修改 state 狀態。
        if (compareAndSetState(c, nextc))
            // 修改成功後,如果是 0,表示讀鎖和寫鎖都空閒,則可以喚醒後面的等待執行緒
            return nextc == 0;
    }
}

釋放還是很簡單的,步驟如下:

  1. 如果當前執行緒是第一個持有讀鎖的執行緒,則只需要操作 firstReaderHoldCount 減一。如果不是,進入第二步。
  2. 獲取到快取計數器(最後一個執行緒的計數器),如果匹配到當前執行緒,就減一。如果不匹配,進入第三步。
  3. 獲取當前執行緒自己的計數器(由於每個執行緒都會多次獲取到鎖,所以,每個執行緒必須儲存自己的計數器。)。
  4. 做減一操作。
  5. 死迴圈修改 state 變數。

5. 總結

“讀寫鎖沒有想象中簡單” 是此次閱讀原始碼的最大感慨。事實上,花的最多時間是鎖降級,因為對這塊的不理解,參照了一些書籍和部落格,但還是雲裡霧裡,我也不敢確定我說的就是全對的,但我敢說,我寫的是經過我思考的。

總結下讀鎖的獲取邏輯。

讀鎖本質上是個共享鎖。

但讀鎖對鎖的獲取做了很多優化,比如使用 firstReader 和 cachedHoldCounter 最第一個讀鎖執行緒和最後一個讀鎖執行緒做優化,優化點主要在釋放的時候對計數器的獲取。

同時,如果在獲取讀鎖的過程中寫鎖被持有了,JUC 並沒有讓所有執行緒痴痴的等待,而是判斷入如果獲取讀鎖的執行緒是正巧是持有寫鎖的執行緒,那麼當前執行緒就可以降級獲取寫鎖,否則就會死鎖了(為什麼死鎖,當持有寫鎖的執行緒想獲取讀鎖,但卻無法降級,進入了等待佇列,肯定會死鎖)。

還有一點就是效能上的優化,如果先釋放寫鎖,再獲取讀鎖,勢必引起鎖的爭搶和執行緒上下文切換,影響效能。

作者:莫那·魯道

個人部落格:thinkinjava.cn