1. 程式人生 > 實用技巧 >AQS之ReentrantReadWriteLock精講分析上篇

AQS之ReentrantReadWriteLock精講分析上篇

1.用法

1.1 定義一個安全的list集合

public class LockDemo  {
  ArrayList<Integer> arrayList = new ArrayList<>();//定義一個集合
  // 定義讀鎖
  ReentrantReadWriteLock.ReadLock readLock = new          ReentrantReadWriteLock(true).readLock();
  // 定義寫鎖
  ReentrantReadWriteLock.WriteLock writeLock = new ReentrantReadWriteLock(true).writeLock();
  
  public void addEle(Integer ele) {
    writeLock.lock(); // 獲取寫鎖
    arrayList.add(ele);
    writeLock.unlock(); // 釋放寫鎖
  }
  public Integer getEle(Integer index) {
    try{
    readLock.lock(); // 獲取讀鎖
    Integer res = arrayList.get(index);
    return res;
    } finally{
      readLock.unlock();// 釋放讀鎖
    }   
   }
}

1.2 Sync類中的原始碼

Sync類中屬性介紹

abstract static class Sync extends AbstractQueuedSynchronizer {
  // 高16位為讀鎖,低16位為寫鎖
  static final int SHARED_SHIFT = 16;
  // 讀鎖單位
  static final int SHARED_UNIT  = (1 << SHARED_SHIFT);
  // 讀鎖最大數量
  static final int MAX_COUNT   = (1 << SHARED_SHIFT) - 1;
  // 寫鎖最大數量
  static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
  // 本地執行緒計數器
  private transient ThreadLocalHoldCounter readHolds;
  // 快取的計數器
  private transient HoldCounter cachedHoldCounter;
  // 第一個讀執行緒
  private transient Thread firstReader = null;
  // 第一個讀執行緒的計數
  private transient int firstReaderHoldCount;
}

Sync類中計數相關類

// 計數器
static final class HoldCounter {
    int count = 0; // 計數
    // 獲取當前執行緒的TID屬性的值
    final long tid = getThreadId(Thread.currentThread());
}

HoldCounter主要有兩個屬性,count和tid,其中count表示某個讀執行緒重入的次數,tid表示該執行緒的tid欄位的值,該欄位可以用來唯一標識一個執行緒

// 本地執行緒計數器
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    // 重寫初始化方法,在沒有進行set的情況下,獲取的都是該HoldCounter值
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

ThreadLocalHoldCounter重寫了ThreadLocal的initialValue方法,ThreadLocal類可以將執行緒與物件相關聯。在沒有進行set的情況下,get到的均是initialValue方法裡面生成的那個HolderCounter物件
Sync類中建構函式

// 建構函式
Sync() {
  // 本地執行緒計數器
  readHolds = new ThreadLocalHoldCounter();
  // 設定AQS的狀態
  setState(getState()); 
}

2.獲取讀鎖原始碼分析

2.1 讀鎖加鎖分析

先看讀鎖操作 readLock.lock(), 獲取讀取鎖定

  1. 如果寫鎖未被另一個執行緒持有,則獲取讀鎖並立即返回。
  2. 如果寫鎖由另一個執行緒持有,將當前執行緒將被阻塞,並處於休眠狀態,直到獲取讀鎖為止。
public void lock() {
sync.acquireShared(1);
}

以共享模式獲取,此方法不支援中斷。 通過首先至少呼叫一次tryAcquireShared ,並在成功後返回。 否則,執行緒將排隊,並可能反覆阻塞和解除阻塞,並呼叫tryAcquireShared直到成功。

  1. 返回負數表示獲取失敗
  2. 返回0表示成功,但是後繼爭用執行緒不會成功
  3. 返回正數表示獲取成功,並且後繼爭用執行緒也可能成功
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

2.2 tryAcquireShared 獲取鎖分析

protected final int tryAcquireShared(int unused) {
    // 獲取當前執行緒 
    Thread current = Thread.currentThread();
    // 獲取狀態
    int c = getState();
    /**
    計算獨佔的持有次數
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    */
    // exclusiveCount(c) 第一次返回的是0
    // 如果寫鎖執行緒數不等於0,並且獨佔鎖不是當前執行緒則返回失敗
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    /**
    計算共享的持有次數 直接將state右移16位,就可以得到讀鎖的執行緒數量
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    */
    // sharedCount(c) 第一次返回的是0
    // 讀鎖的數量
    int r = sharedCount(c);
    //readerShouldBlock() 當前讀執行緒是否堵塞
    if (!readerShouldBlock() &&
    // 持有執行緒小於最大數65535
        r < MAX_COUNT &&
     // 設定讀取鎖狀態
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
        // firstReader是第一個獲得讀取鎖定的執行緒,
        // 第一個讀鎖firstReader是不會加入到readHolds中
            firstReader = current;
        // firstReaderHoldCount是firstReader的保留計數也就是
        // 讀執行緒佔用的資源數為1
            firstReaderHoldCount = 1;
            // 如果第一個讀執行緒是當前執行緒那麼就將計數+1
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
           // 讀鎖數量不為0並且不為當前執行緒
           // 每個執行緒讀取保持計數的計數器。 維護為ThreadLocal
           // 快取在cachedHoldCounter中
            HoldCounter rh = cachedHoldCounter;
            // 計數器為空或者計數器的tid不為當前正在執行的執行緒的tid
            if (rh == null || rh.tid != getThreadId(current))
            // 獲取當前執行緒對應的計數器
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0) // 計數為0
                // 加入到readHolds中
                readHolds.set(rh);
            rh.count++; // +1 
        }
        // 獲取鎖成功
        return 1;
    }
    return fullTryAcquireShared(current);
}

readerShouldBlock()

final boolean readerShouldBlock() {
    return hasQueuedPredecessors();
}

在獨佔鎖中也呼叫了該方法,頭和尾部不為空不相等整明是有節點的,如果返回true,那麼就是有當前執行緒前面的執行緒在排隊,返回false,那麼就是當前執行緒是在佇列的頭部的下一個節點或者佇列是空的

public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

2.3 fullTryAcquireShared

獲取讀鎖的完整版本,可處理tryAcquireShared中未處理的CAS丟失和可重入讀操作

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
     // 獲取狀態
        int c = getState();
       // 如果寫執行緒數量不為0
        if (exclusiveCount(c) != 0) {
        // 如果不是當前執行緒
            if (getExclusiveOwnerThread() != current)
                return -1;
         //  寫執行緒數量為0並且讀執行緒被阻塞
        } else if (readerShouldBlock()) {
        // 確保我們沒有重新獲取讀鎖
            if (firstReader == current) {
            // 當前執行緒為第一個讀執行緒
            } else {
            // 當前執行緒不為第一個讀執行緒
                if (rh == null) {  // 計數器為空
                    rh = cachedHoldCounter;
           // 計數器為空或者計數器的tid不為當前正在執行的執行緒的tid
                    if (rh == null || rh.tid != getThreadId(current)) {          // 獲取當前執行緒對應的計數器
                        rh = readHolds.get();
                        if (rh.count == 0) // 計數為0
                     // 從readHolds中移除                      
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                 // 獲取鎖失敗了
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
            // 比較並且設定成功
        if (compareAndSetState(c, c + SHARED_UNIT)) {
        // 讀執行緒數量為0
            if (sharedCount(c) == 0) {
        // firstReader是第一個獲得讀取鎖定的執行緒,
        // 第一個讀鎖firstReader是不會加入到readHolds中
                firstReader = current;
                firstReaderHoldCount = 1;
        // 如果第一個讀執行緒是當前執行緒那麼就將計數+1
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
            //  讀鎖數量不為0並且不為當前執行緒
            //  每個執行緒讀取保持計數的計數器。 維護為ThreadLocal
            //  快取在cachedHoldCounter中
                if (rh == null)
                    rh = cachedHoldCounter;
              // 計數器為空或者計數器的tid不為當前正在執行的執行緒的tid
                if (rh == null || rh.tid != getThreadId(current))
              // 獲取當前執行緒對應的計數器
                    rh = readHolds.get();
                else if (rh.count == 0)
                // 加入到readHolds中
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            // 獲取鎖成功
            return 1;
        }
    }
}

2.4tryAcquireShared失敗

如果tryAcquireShared(arg)返回的值為正數或者為0,那麼意味著獲取鎖失敗,執行doAcquireShared(arg)方法

private void doAcquireShared(int arg) {
    // 將節點放入阻塞佇列中返回當前節點,addWaiter前一篇文章已經講過了
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            // 如果前置節點的waitStatus為喚醒那麼就可以安心睡眠了,並且掛起當
            // 前執行緒
            if (shouldParkAfterFailedAcquire(p, node) &&
            // 掛起當前執行緒
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

2.5setHeadAndPropagate

設定佇列的頭部,並檢查後繼者是否可能在共享模式下等待,如果正在傳播,則傳播是否設定為傳播> 0或PROPAGATE狀態

這個函式做的事情有兩件:

  1. 在獲取共享鎖成功後,設定head節點
  2. 根據呼叫tryAcquireShared返回的狀態以及節點本身的等待狀態來判斷是否要需要喚醒後繼執行緒

在該方法內部我們不僅呼叫了setHead(node),還在一定條件下呼叫了doReleaseShared()來喚醒後繼的節點。這是因為在共享鎖模式下,鎖可以被多個執行緒所共同持有,既然當前執行緒已經拿到共享鎖了,那麼就可以直接通知後繼節點來拿鎖,而不必等待鎖被釋放的時候再通知。

propagate是tryAcquireShared的返回值,這是決定是否傳播喚醒的依據之一

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    // h.waitStatus為SIGNAL或者PROPAGATE時也根據node的下一個節點共享來決定
    // 是否傳播喚醒
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

2.6 doReleaseShared()

共享模式下的釋放動作-訊號後繼並確保傳播

以下的迴圈做的事情就是,在佇列存在後繼執行緒的情況下,喚醒後繼執行緒;或者由於多執行緒同時釋放共享鎖由於處在中間過程,讀到head節點等待狀態為0的情況下,雖然不能unparkSuccessor,但為了保證喚醒能夠正確穩固傳遞下去,設定節點狀態為PROPAGATE。

這樣的話獲取鎖的執行緒在執行setHeadAndPropagate時可以讀到PROPAGATE,從而由獲取鎖的執行緒去釋放後繼等待執行緒。

在共享鎖模式下,頭節點就是持有共享鎖的節點,在它釋放共享鎖後,它也應該喚醒它的後繼節點,但是值得注意的是,我們在之前的setHeadAndPropagate方法中可能已經呼叫過該方法了,也就是說它可能會被同一個頭節點呼叫兩次,也有可能在我們從releaseShared方法中呼叫它時,當前的頭節點已經易主了

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 如果佇列中存在後繼執行緒
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            // 如果h節點的狀態為0,需要設定為PROPAGATE用以保證喚醒的傳播
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 檢查h是否仍然是head,如果不是的話需要再進行迴圈
        if (h == head)                   // loop if head changed
            break;
    }
}

在看該方法時,我們需要明確以下幾個問題:

  • 該方法有幾處呼叫?
  1. 該方法有兩處呼叫,一處在doAcquireShared方法的末尾,當執行緒成功獲取到共享鎖後,在一定條件下呼叫該方法;
  2. 一處在releaseShared方法中,當執行緒釋放共享鎖的時候呼叫
  • 呼叫該方法的執行緒是誰?

在共享鎖中,持有共享鎖的執行緒可以有多個,這些執行緒都可以呼叫releaseShared方法釋放鎖;因為這些執行緒想要獲得共享鎖,則它們必然曾經成為過頭節點,或者就是現在的頭節點。所以如果是在releaseShared方法中呼叫的doReleaseShared,那麼此時呼叫方法的執行緒可能已經不是頭節點所代表的執行緒了,此時頭節點可能已經被更換了好幾次了

  • 呼叫該方法的目的是什麼?

無論是在doAcquireShared中呼叫,還是在releaseShared方法中呼叫,該方法的目的都是在當前共享鎖是可獲取的狀態時,喚醒head節點的下一個節點。(看上去和獨佔鎖喚醒下一個節點似乎一樣),但是它們的一個重要的差別是在共享鎖中,當頭節點發生變化時,是會回到迴圈中再立即喚醒head節點的下一個節點的。

  • 退出該方法的條件是什麼

該方法是一個自旋操作,退出該方法的唯一辦法是走最後的break語句

if (h == head)   // loop if head changed
    break;

只有在當前head沒有變的時候,才會退出,否則繼續迴圈。為什麼呢?
為了說明問題,這裡我們假設目前sync queue佇列中依次排列有

dummy node -> A -> B -> C -> D

現在假設A已經拿到了共享鎖,則它將成為新的dummy node,

dummy node (A) -> B -> C -> D

此時,A執行緒會呼叫doReleaseShared,我們寫做doReleaseShared[A],在該方法中將喚醒後繼的節點B,它很快獲得了共享鎖,成為了新的頭節點:

dummy node (B) -> C -> D

此時,B執行緒也會呼叫doReleaseShared,我們寫做doReleaseShared[B],在該方法中將喚醒後繼的節點C,但是別忘了,在doReleaseShared[B]呼叫的時候,doReleaseShared[A]還沒執行結束呢,當它執行到if(h == head)時,發現頭節點現在已經變了,所以它將繼續回到for迴圈中,與此同時,doReleaseShared[B]也沒閒著,它在執行過程中也進入到了for迴圈中
我們這裡形成了一個doReleaseShared的呼叫迴圈,大量的執行緒在同時執行doReleaseShared,這極大地加速了喚醒後繼節點的速度,提升了效率,同時該方法內部的CAS操作又保證了多個執行緒同時喚醒一個節點時,只有一個執行緒能操作成功

那如果這裡doReleaseShared[A]執行結束時,節點B還沒有成為新的頭節點時,doReleaseShared[A]方法不就退出了嗎?是的,但即使這樣也沒有關係因為它已經成功喚醒了執行緒B,即使doReleaseShared[A]退出了,當B執行緒成為新的頭節點時doReleaseShared[B]就開始執行了,它也會負責喚醒後繼節點的,這樣即使變成這種每個節點只喚醒自己後繼節點的模式,從功能上講,最終也可以實現喚醒所有等待共享鎖的節點的目的,只是效率上沒有之前的快。

由此我們知道,這裡的呼叫迴圈事實上是一個優化操作,因為在我們執行到該方法的末尾的時候,unparkSuccessor基本上已經被呼叫過了,而由於現在是共享鎖模式,所以被喚醒的後繼節點極有可能已經獲取到了共享鎖,成為了新的head節點,當它成為新的head節點後,它可能還是要在setHeadAndPropagate方法中呼叫doReleaseShared喚醒它的後繼節點。


明確了上面幾個問題後,我們再來詳細分析這個方法

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 如果佇列中存在後繼執行緒也就是佇列至少有兩個節點
        if (h != null && h != tail) {
            int ws = h.waitStatus;
// 如果當前ws值為Node.SIGNAL,則說明後繼節點需要喚醒,這裡採用CAS操作先將
// Node.SIGNAL狀態改為0,這是因為可能有大量的doReleaseShared方法在
// 同時執行,我們只需要其中一個執行unparkSuccessor(h)操作就行了,這裡通過CAS
// 操作保證了unparkSuccessor(h)只被執行一次。
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            // 如果h節點的狀態為0,需要設定為PROPAGATE用以保證喚醒的傳播
// ws啥時候為0
// 一種是上面的compareAndSetWaitStatus(h, Node.SIGNAL, 0)會導致ws為0,
// 但是很明顯,如果是因為這個原因,則它是不會進入到else if語句塊的。所以這裡
// 的 ws為0是指當前佇列的最後一個節點成為了頭節點。為什麼是最後一個節點呢,因為
// 每次新的節點加進來,在掛起前一定會將自己的前驅節點的waitStatus修
// 改成 Node.SIGNAL的
            else if (ws == 0 &&
// compareAndSetWaitStatus(h, 0, Node.PROPAGATE)這個操作什麼時候會失敗?
// 這個操作失敗,說明就在執行這個操作的瞬間,ws此時已經不為0了,說明有新的節點
// 入隊了,ws的值被改為了Node.SIGNAL,此時我們將呼叫continue,在下次迴圈中
// 直接將這個剛剛新入隊但準備掛起的執行緒喚醒
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 檢查h是否仍然是head,如果不是的話需要再進行迴圈
        if (h == head)                   // loop if head changed
            break;
    }
}
private void unparkSuccessor(Node node) {
 // 獲取當前節點的node.waitStatus 此時為 SIGNAL所以將當前節點的waitStatus
 // 設定成 0  
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 獲取後繼節點
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 往前尋找遍歷找到第一個節點waitStatus為SIGNAL的節點,為了喚醒其節點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果不為空直接喚醒後繼節點
    if (s != null)
        LockSupport.unpark(s.thread);
}

這裡優化了一個點:

  1. 首先佇列裡至少有兩個節點
  2. 其次要執行到else if語句,說明我們跳過了前面的if條件,說明頭節點是剛剛成為頭節點的,它的waitStatus值還為0,尾節點是在這之後剛剛加進來的,它需要執行shouldParkAfterFailedAcquire,將它的前驅節點(即頭節點)的waitStatus值修改為Node.SIGNAL,但是目前這個修改操作還沒有來的及執行。這種情況使我們得以進入else if的前半部分else if (ws == 0 &&
  3. 再次,要滿足!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)這一條件,說明此時頭節點的 waitStatus 已經不再是 0 了,這說明之前那個沒有來得及執行的在shouldParkAfterFailedAcquire將前驅節點的的waitStatus值修改為Node.SIGNAL的操作現在執行完了。

注意:else if的&&連線了兩個不一致的狀態,分別對應了shouldParkAfterFailedAcquire的compareAndSetWaitStatus(pred, ws, Node.SIGNAL)執行成功前和執行成功後,因為doReleaseShared和shouldParkAfterFailedAcquire是可以併發執行的,所以這一條件是有可能滿足的,可能只是一瞬間發生的。

3.獲取讀鎖流程圖

流程解析:

讀鎖獲取鎖的過程比寫鎖稍微複雜些

  1. 首先判斷寫鎖是否為0並且當前執行緒不佔有獨佔鎖,直接返回;
  2. 否則,判斷讀執行緒是否需要被阻塞並且讀鎖數量是否小於最大值並且比較設定狀態成功,若當前沒有讀鎖,則設定第一個讀執行緒firstReader和firstReaderHoldCount;
  3. 若當前執行緒執行緒為第一個讀執行緒,則增加firstReaderHoldCount;
  4. 否則,將設定當前執行緒對應的HoldCounter物件的值。

4.釋放讀鎖原始碼分析

4.1釋放鎖的時候呼叫ReadLock的unlock方法

public void unlock() {
sync.releaseShared(1);
}

4.2sync.releaseShared(1)呼叫的是AQS中的 releaseShared方法

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

4.3tryReleaseShared方法的具體實現是在具體的子類中

protected final boolean tryReleaseShared(int unused) {
// 獲取當前執行緒
Thread current = Thread.currentThread();
// 當前執行緒是否是第一個讀執行緒
if (firstReader == current) {
// 如果讀執行緒佔用的資源為1那麼將firstReader設定成null
if (firstReaderHoldCount == 1)
firstReader = null;
// 如果不是那麼就減一
else
firstReaderHoldCount--;
} else {
// 如果當前執行緒不是第一個讀執行緒
// 獲取快取計數器
HoldCounter rh = cachedHoldCounter;
    // 計數器為空或者計數器的tid不為當前正在執行的執行緒的tid
        if (rh == null || rh.tid != getThreadId(current))
       // 獲取當前執行緒對應的計數器
            rh = readHolds.get();
       // 獲取計數器中count的值
        int count = rh.count;
        if (count <= 1) {
        // 移除
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
       // 如果不小於等於1 減少資源佔用
        --rh.count;
    }
    for (;;) { // 自旋
    // 獲取狀態
        int c = getState();
        // 計算狀態
        int nextc = c - SHARED_UNIT;
        // 設定狀態
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

如果tryReleaseShared(arg)方法返回true那麼執行doReleaseShared()方法,前文已經講過該方法了。

5.釋放讀鎖流程圖

流程解析:

  1. 首先判斷當前執行緒是否為第一個讀執行緒firstReader,若是,則判斷第一個讀執行緒佔有的資源數firstReaderHoldCount是否為1,若是,則設定第一個讀執行緒firstReader為空,否則,將第一個讀執行緒佔有的資源數firstReaderHoldCount減1;
  2. 若當前執行緒不是第一個讀執行緒,那麼首先會獲取快取計數器,若計數器為空或者tid不等於當前執行緒的tid值,則獲取當前執行緒的計數器,如果計數器的計數count小於等於1,則移除當前執行緒對應的計數器,如果計數器的計數count小於等於0,則丟擲異常,之後再減少計數即可。
  3. 哪種情況,都會進入自選操作,該迴圈可以確保成功設定狀態state

6.注意

6.1 HoldCounter的作用

在讀鎖的獲取、釋放過程中,總是會有一個物件存在著,同時該物件在獲取執行緒獲取讀鎖是+1,釋放讀鎖時-1,該物件就是HoldCounter

6.2 HoldCounter的原理

要明白HoldCounter就要先明白讀鎖。前面提過讀鎖的內在實現機制就是共享鎖,對於共享鎖它更加像一個計數器的概念。一次共享鎖操作就相當於一次計數器的操作,獲取共享鎖計數器+1,釋放共享鎖計數器-1。只有當執行緒獲取共享鎖後才能對共享鎖進行釋放、重入操作。所以HoldCounter的作用就是當前執行緒持有共享鎖的數量,這個數量必須要與執行緒繫結在一起,否則操作其他執行緒鎖就會丟擲異常。

6.3 讀鎖部分原始碼詳解

// 表示第一個讀鎖執行緒,第一個讀鎖firstRead是不會加入到readHolds中
if (r == 0) {
  firstReader = current;
  firstReaderHoldCount = 1;
// 第一個讀鎖執行緒重入
} else if (firstReader == current) {
  firstReaderHoldCount++; 
} else {
    // 如果當前執行緒不是第一個讀執行緒
    // 獲取快取計數器
  HoldCounter rh = cachedHoldCounter;
 // 計數器為空或者計數器的tid不為當前正在執行的執行緒的tid
  if (rh == null || rh.tid != current.getId()) 
   // 獲取當前執行緒對應的計數器
    cachedHoldCounter = rh = readHolds.get();
  else if (rh.count == 0)
   // 加入到readHolds中
    readHolds.set(rh);
   //計數+1
  rh.count++; 
}

這裡為什麼要搞一個firstRead、firstReaderHoldCount呢?而不是直接使用else那段程式碼?
這是為了一個效率問題,firstReader是不會放入到readHolds中的,如果讀鎖僅有一個的情況下就會避免查詢readHolds。我們先看firstReader、firstReaderHoldCount的定義:

private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

這兩個變數比較簡單,一個表示執行緒,一個是firstReader的計數。
HoldCounter的定義:(文章中第二次提到該程式碼)

HoldCounter主要有兩個屬性,count和tid,其中count表示某個讀執行緒重入的次數,tid表示該執行緒的tid欄位的值,該欄位可以用來唯一標識一個執行緒

static final class HoldCounter {
    int count = 0;
    final long tid = Thread.currentThread().getId();
}

在HoldCounter中僅有count和tid兩個變數。但是如果要將一個物件和執行緒繫結起來僅記錄tid肯定不夠的,而且HoldCounter根本不能起到繫結物件的作用,只是記錄執行緒tid而已。
在java中,我們知道如果要將一個執行緒和物件繫結在一起只有ThreadLocal才能實現。所以如下:

static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

ThreadLocalHoldCounter繼承ThreadLocal,並且重寫了initialValue方法
所以HoldCounter應該就是繫結執行緒上的一個計數器,而ThradLocalHoldCounter則是執行緒繫結的ThreadLocal。從上面我們可以看到ThreadLocal將HoldCounter繫結到當前執行緒上,同時HoldCounter也持有執行緒Id,這樣在釋放鎖的時候才能知道ReadWriteLock裡面快取的上一個讀取執行緒(cachedHoldCounter)是否是當前執行緒。這樣做的好處是可以減少ThreadLocal.get()的次數,因為這也是一個耗時操作。需要說明的是這樣HoldCounter繫結執行緒id而不繫結執行緒物件的原因是避免HoldCounter和ThreadLocal互相繫結而GC難以釋放它們,所以其實這樣做只是為了幫助GC快速回收物件而已。

7.總結

以上便是ReentrantReadWriteLock中讀鎖的分析,下一篇文章將是寫鎖的分析,如有錯誤之處,幫忙指出及時更正,謝謝,如果喜歡謝謝點贊加收藏加轉發(轉發註明出處謝謝!!!)