1. 程式人生 > 其它 >多執行緒之讀寫鎖原理

多執行緒之讀寫鎖原理

今天主要通過多讀單寫的例子來說下讀寫鎖的原理
概念
多讀單寫,簡單說,就是對資源的訪問分為兩種狀態,一種是讀操作,另一種是寫操作。由應用程式提示鎖應該做哪種操作。當為讀模式時,所有的寫動作被懸掛,而讀請求被允許通過,而寫動作時,所有操作被懸掛。並且,讀寫切換時,有足夠的狀態等待,直到真正安全時,才會切換動作。


如下圖所示:


 
 
業務場景舉例
比如現在有 A、B、C、D、E、F、G 6個執行緒,其中A、B、C、G 4個執行緒之行讀請求,E、F 2個執行緒之行寫請求,如何保證讀寫安全?

分析:

讀寫請求是可以在多個執行緒進行的

寫請求時,所有的請求都會被停止即懸掛

解決:使用讀寫鎖

程式碼:

demo裡面的程式碼就是業務場景的表達,即有多個執行緒同時執行讀寫請求的業務場景

- (void)demo {
    self.queue = dispatch_queue_create("rw_queue", DISPATCH_QUEUE_CONCURRENT);
    
    for (int i = 0; i < 2; i++) {
        dispatch_async(self.queue, ^{
            [self read:1];
        });
        
        dispatch_async(self.queue, ^{
            [self read:2];
        });
        
        dispatch_barrier_async(self.queue, ^{
            [self write];
        });
        
        dispatch_async(self.queue, ^{
            [self read:3];
        });
        
    }
}
 

下面的 read 和 write 方法裡,就是讀寫鎖的使用

- (void)read {
    pthread_rwlock_rdlock(&_lock);
    
    sleep(1);
    NSLog(@"%s", __func__);
    
    pthread_rwlock_unlock(&_lock);
}
 
- (void)write
{
    pthread_rwlock_wrlock(&_lock);
    
    sleep(1);
    NSLog(@"%s", __func__);
    
    pthread_rwlock_unlock(&_lock);
}
 
讀寫鎖的原理
在 AQS 中,通過 int 型別的全域性變數 state 來表示同步狀態,即用 state 來表示鎖。

ReentrantReadWriteLock 也是通過 AQS 來實現鎖的,但是 ReentrantReadWriteLock有兩把鎖:讀鎖和寫鎖,它們保護的都是同一個資源,那麼如何用一個共享變數來區分鎖是寫鎖還是讀鎖呢?答案就是按位拆分。

由於 state 是 int 型別的變數,在記憶體中佔用4個位元組,也就是32位。將其拆分為兩部分:高16位和低16位,其中高16位用來表示讀鎖狀態,低16位用來表示寫鎖狀態。

當設定讀鎖成功時,就將高16位加1,釋放讀鎖時,將高16位減1;

當設定寫鎖成功時,就將低16位加1,釋放寫鎖時,將第16位減1;

 

如下圖所示:

 

 

寫鎖加鎖的原理

獲取寫鎖的流程
c == 0表示鎖還沒有被任何執行緒佔用
w 寫鎖的數量

如果 c==0,標記鎖成功後,表述獲取寫鎖成功

如果 c!=0 && w==0,表示讀鎖在佔用鎖,所以獲取鎖失敗

如果 c!=0 && w!=0,表示寫鎖在佔用鎖,此時就需要判斷訪問該鎖的執行緒是否和佔用該鎖的執行緒為同一執行緒,如果不為同一執行緒就返回失敗;如果為同一執行緒,則判斷重入的數量,數量為超過就返回成功,否則丟擲異常

protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
// exclusiveCount()方法的作用是將同步變數與0xFFFF做&運算,計算結果就是寫鎖的數量。
// 因此w的值的含義就是寫鎖的數量
int w = exclusiveCount(c);
// 如果c不為0就表示鎖被佔用了,但是佔用的是寫鎖還是讀鎖呢?這個時候就需要根據w的值來判斷了。
// 如果c等於0就表示此時鎖還沒有被任何執行緒佔用,那就讓執行緒直接去嘗試獲取鎖
if (c != 0) {
    // (Note: if c != 0 and w == 0 then shared count != 0)
    //
    /**
     * 1. 如果w為0,說明寫鎖的數量為0,而此時又因為c不等於0,說明鎖被佔用,但是不是寫鎖,那麼此時鎖的狀態一定是讀鎖,
     * 既然是讀鎖狀態,那麼寫鎖此時來獲取鎖時,就肯定失敗,因此當w等於0時,tryAcquire()方法返回false。
     * 2. 如果w不為0,說明此時鎖的狀態時寫鎖,接著進行current != getExclusiveOwnerThread()判斷,判斷持有鎖的執行緒是否是當前執行緒
     * 如果不是當前執行緒,那麼tryAcquire()返回false;如果是當前執行緒,那麼就進行後面的邏輯。為什麼是當前執行緒持有鎖,就還能執行後面的邏輯呢?
     * 因為讀寫鎖是支援重入的。
     */
    if (w == 0 || current != getExclusiveOwnerThread())
        return false;
    // 下面一行程式碼是判斷,寫鎖的重入次數或不會超過最大限制,這個最大限制是:2的16次方減1
    // 為什麼是2的16次方減1呢?因為state的低16位存放的是寫鎖,因此寫鎖數量的最大值是2的16次方減1
    if (w + exclusiveCount(acquires) > MAX_COUNT)
        throw new Error("Maximum lock count exceeded");
    // Reentrant acquire
    setState(c + acquires);
    return true;
}
/**
 * 1. writerShouldBlock()方法的作用是判斷當前執行緒是否應該阻塞,對於公平的寫鎖和非公平寫鎖的具體實現不一樣。
 * 對於非公平寫鎖而言,直接返回false,因為非公平鎖獲取鎖之前不需要去判斷是否排隊
 * 對於公平鎖寫鎖而言,它會判斷同步佇列中是否有人在排隊,有人排隊,就返回true,表示當前執行緒需要阻塞。無人排隊就返回false。
 *
 * 2. 當writerShouldBlock()返回true時,表示當前執行緒還不能直接獲取鎖,因此tryAcquire()方法直接返回false。
 * 當writerShouldBlock()返回false時,表示當前執行緒可以嘗試去獲取鎖,因此會執行if判斷中後面的邏輯,即通過CAS方法嘗試去修改同步變數的值,
 * 如果修改同步變數成功,則表示當前執行緒獲取到了鎖,最終tryAcquire()方法會返回true。如果修改失敗,那麼tryAcquire()會返回false,表示獲取鎖失敗。
 *
 */
if (writerShouldBlock() ||
    !compareAndSetState(c, c + acquires))
    return false;
setExclusiveOwnerThread(current);
return true;
}
 

讀鎖加鎖的原理

獲取讀鎖的流程
c == 0 表示鎖還沒有被任何執行緒佔用
r 讀鎖的數量
w = exclusiveCount(c) 寫鎖的數量

如果c!=0 && w!=0,表示寫鎖在佔用鎖,改執行緒就未獲取到讀鎖所以立即執行fullTryAcquireShared(current);

如果c!=0 && r!=0,表示鎖被寫執行緒佔用

protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// exclusiveCount(c)返回的是寫鎖的數量,如果它不為0,說明寫鎖被佔用,如果此時佔用寫鎖的執行緒不是當前執行緒,就返回-1,表示獲取鎖失敗
if (exclusiveCount(c) != 0 &&
    getExclusiveOwnerThread() != current)
    return -1;
// r表示的是讀鎖的數量
int r = sharedCount(c);
/**
 * 在下面的程式碼中進行了三個判斷:
 * 1、讀鎖是否應該排隊。如果沒有人排隊,就進行if後面的判斷。有人排隊,就不會進行if後面的判斷,而是最終呼叫fullTryAcquireShared()方法
 * 2、讀鎖數量是否超過最大值。(最大數量為2的16次方-1)
 * 3、嘗試修改同步變數的值
 */
if (!readerShouldBlock() &&
    r < MAX_COUNT &&
    compareAndSetState(c, c + SHARED_UNIT)) {
    // 讀鎖數量為0時,就將當前執行緒設定為firstReader,firstReaderHoldCount=1
    if (r == 0) {
        firstReader = current;
        firstReaderHoldCount = 1;
    } else if (firstReader == current) {
        // 讀鎖數量不為0且firstReader(第一次獲取讀的執行緒)為當前執行緒,就將firstReaderHoldCount累加
        firstReaderHoldCount++;
    } else {
        // 讀鎖數量不為0,且第一個獲取到讀鎖的執行緒不是當前執行緒
        // 下面這一段邏輯就是儲存當前執行緒獲取讀鎖的次數,如何儲存的呢?
        // 通過ThreadLocal來實現的,readHolds就是一個ThreadLocal的例項
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            cachedHoldCounter = rh = readHolds.get();
        else if (rh.count == 0)
            readHolds.set(rh);
        rh.count++;
    }
    // 返回1表示獲取讀鎖成功
    return 1;
}
// 當if中的三個判斷均不滿足時,就會執行到這兒,呼叫fullTryAcquireShared()方法嘗試獲取鎖
return fullTryAcquireShared(current);
}
如果 r==0, firstReader = current

如果 r!=0 && firstReader為當前執行緒,firstReaderHoldCount++

如果第一個獲取到讀鎖的執行緒不是當前執行緒就記錄當前執行緒的獲取鎖的數量,並讓請求執行緒獲得鎖

 

讀鎖獲取鎖失敗後會迴圈的去執行下面這個方法,直到滿足相應的條件才會 return 退出,否則一直迴圈

 final int fullTryAcquireShared(Thread current) {
    /*
     * This code is in part redundant with that in
     * tryAcquireShared but is simpler overall by not
     * complicating tryAcquireShared with interactions between
     * retries and lazily reading hold counts.
     */
    HoldCounter rh = null;
    // for死迴圈,直到滿足相應的條件才會return退出,否則一直迴圈
    for (;;) {
        int c = getState();
        // 鎖的狀態為寫鎖時,持有鎖的執行緒不等於當期那執行緒,就說明當前執行緒獲取鎖失敗,返回-1
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 嘗試設定同步變數的值,只要設定成功了,就表示當前執行緒獲取到了鎖,然後就設定鎖的獲取次數等相關資訊
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}
 
問題
當有100個執行緒來併發的進行讀寫請求,其中有99個執行緒是進行讀請求,只有一個執行緒是進行寫請求(假設寫請求的編號為20)

先有1-19執行緒進行了讀請求

然後第20執行緒進行了寫請求

又來21-100執行緒80個執行緒進行讀請求

結果是第20執行緒等到所有讀執行緒執行完了才能執行寫請求

從而導致寫鎖飢餓問題

原文連結:https://blog.csdn.net/weixin_41754309/article/details/115305454