1. 程式人生 > 其它 >Java 併發程式設計(五)讀寫鎖

Java 併發程式設計(五)讀寫鎖

本文使用的 JDK 版本為 JDK 8


JUC 中關於讀寫鎖的介面定義如下:

// java.util.concurrent.locks.ReadWriteLock
public interface ReadWriteLock {
    // 返回一個讀鎖
    Lock readLock();
    
    // 返回一個寫鎖
    Lock writeLock();
}

JUC 中,常用的具體實現為 ReentrantReadWriteLock,因此,在這裡以 ReentrantReadWriteLock 為例來介紹讀寫鎖的相關內容。


基本使用

讀寫鎖的一個常用的使用場景就是對於資料的讀取操作,在大部分的業務場景下,發生讀的情況要比發生寫的概率要高很多。在這種情況,可以針對熱點資料進行快取,從而提高系統的響應效能。

使用示例如下:

// 該程式碼來源於 JDK 的官方文件,稍作了一點修改
class CachedData {
    Object data;
    volatile boolean cacheValid;
    final DataAccess access;
    final Order order;
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    CachedData(DataAccess access, Order order) {
        this.access = access;
        this.order = order;
    }

    void processCachedData() {
        rwl.readLock().lock();
        if (!cacheValid) { // 當前快取已經失效了,即已經發生了寫事件
            // 在獲取寫鎖之前必須釋放讀鎖,否則會造成死鎖
            rwl.readLock().unlock();
            rwl.writeLock().lock();
            try {
                /*
                	重新判斷快取是否是失效的,
                	因為在這個過程中可能已經有其它的執行緒對這個快取的資料進行修改了
                */
                if (!cacheValid) {
                    data = access.queryData();
                    cacheValid = true;
                }
                /*
                	獲取讀鎖,在持有寫鎖的情況下,可以獲得讀鎖,這也被稱為 “鎖降級”
                */
                rwl.readLock().lock();
            } finally {
                // 釋放寫鎖,此時依舊持有讀鎖
                rwl.writeLock().unlock();
            }
        }

        try {
            order.useData(data);
        } finally {
            // 注意最後一定要釋放鎖
            rwl.readLock().unlock();
        }
    }

    interface DataAccess {
        Object queryData();
    }

    interface Order {
        void useData(Object data);
    }
}

原始碼解析


建構函式

首先,檢視 ReentrantReadWriteLock 例項物件的屬性

public class ReentrantReadWriteLock
    implements ReadWriteLock, java.io.Serializable {
    // ReentrantReadWriteLock 的靜態內部類,為讀鎖
    private final ReentrantReadWriteLock.ReadLock readerLock;
    // ReentrantReadWriteLock 的靜態內部類,為寫鎖
    private final ReentrantReadWriteLock.WriteLock writerLock;

    // 同步工具類,為 AQS 的具體子類
    final Sync sync;
    
    public ReentrantReadWriteLock() {
        this(false);
    }
    
    // 建構函式,初始化 ReentrantReadWriteLock,預設情況選擇使用非公平的同步工具 
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
}

再檢視 ReadLockWriteLock 的相關定義,首先檢視 ReadLock 相關的原始碼:

public static class ReadLock implements Lock, java.io.Serializable {
    private final Sync sync;

    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync; // 注意這裡的 sync
    }
    // 省略其它一些不是特別重要的程式碼
}

再檢視 WriteLock 相關的原始碼:

public static class WriteLock implements Lock, java.io.Serializable {
    private final Sync sync;

    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync; // 注意這裡的 sync
    }
    // 省略其它一些不是特別重要的程式碼
}

可以看到,ReadLockWriteLock 都使用了同一個 Sync 例項物件來維持自身的同步需求,這點很關鍵


原理

ReadLock 中關於獲取鎖和釋放鎖的原始碼:

// 獲取鎖
public void lock() {
    sync.acquireShared(1);
}

public void lockInterruptibly() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// 釋放鎖
public void unlock() {
    sync.releaseShared(1);
}

WriteLock 中關於獲取鎖和釋放鎖的原始碼:

// 獲取鎖
public void lock() {
    sync.acquire(1);
}

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

// 釋放鎖
public void unlock() {
    sync.release(1);
}

通過對比 ReadLockWriteLock 中獲取鎖和釋放鎖的原始碼,很明顯,ReadLock 是以 “共享模式” 的方式獲取和釋放鎖,而 WriteLock 則是通過以獨佔的方式來獲取和釋放鎖。這兩種獲取和釋放鎖的實現都在 AQS 中定義,在此不做過多的詳細介紹

再結合上文關於 ReadLockWriteLock 的建構函式,可以發現它們是使用了同一個 AQS 子類例項物件,也就是說,在 ReentrantReadWriteLock 中的 AQS 的具體子類既使用了“共享模式”,也使用了“獨佔模式”

更一般地來講,回憶一下 AQS 關於 “共享模式” 和 “獨佔模式” 對於 state 變數的使用,“共享模式” 將 state 共享,每個執行緒都能訪問 state;“獨佔模式” 下,state 被視作是獲取到鎖的狀態,0 表示還沒有執行緒獲取該鎖,大於 0 則表示執行緒獲取鎖的重入次數

為了能夠實現 ReentrantReadWriteLock 中的兩個模式的共用的功能,ReentrantReadWriteLockSync 類對 state 進行了如下的處理:

ReentrantReadWriteLock 使用了一個 16 位的狀態來表示寫入鎖的計數,並且使用了另外一個 16 位的狀態來表示讀鎖的計數

就是說,state 變數已經被拆分成了兩部分,由於 state 是一個 32 位的整數,現在 state 的前 16 位用於單獨處理“共享模式”,而後 16 位則用於處理 “獨佔模式”


Sync

核心部分就是分析 Sync 的原始碼,在這裡定義了對 state 變數的修改以及獲取鎖和釋放鎖的邏輯

首先檢視 Sync 相關欄位屬性:

abstract static class Sync extends AbstractQueuedSynchronizer {
    // 這幾個欄位的作用就是將 state 劃分為兩部分,前 16 位為共享模式,後 16 位為獨佔模式
    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    // -1 的目的值為了得到最大值
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1; 
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
    // 取 c 的前 16 位, 只需要右移即可
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    // 取 c 的後 16 位,只要與對應的掩碼按位與即可
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    // 該類的作用是記錄每個執行緒持有的讀鎖的數量
    static final class HoldCounter {
        // 執行緒持有的讀鎖的數量
        int count = 0;
        // 執行緒的 ID
        final long tid = getThreadId(Thread.currentThread());
    }
    
    // ThreadLocal 的子類
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }
    
    /*
    	用於記錄執行緒持有的讀鎖資訊
    */
    private transient ThreadLocalHoldCounter readHolds;

    /*
    	用於快取,用於記錄 “最後一個獲取讀鎖” 的執行緒的讀鎖的重入次數
    */
    private transient HoldCounter cachedHoldCounter;

    /*
    	第一個獲取讀鎖的執行緒(並且未釋放讀鎖)
    */
    private transient Thread firstReader = null;
    // 第一個獲取讀鎖的執行緒持有的讀鎖的數量(重入次數)
    private transient int firstReaderHoldCount;

    Sync() {
        // 初始化 readHolds
        readHolds = new ThreadLocalHoldCounter();
        // 確保 readHolds 的可見性
        setState(getState()); // ensures visibility of readHolds
    }
}

讀鎖

  • 讀鎖的獲取

    再回到 ReadLock 部分,獲取鎖的原始碼如下:

    public void lock() {
        sync.acquireShared(1);
    }
    
    // 與之對應的 AQS 的程式碼
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    

    重點在於 tryAcquireShared(arg) 方法,該方法在 Sync 中定義:

    protected final int tryAcquireShared(int unused) {
      Thread current = Thread.currentThread();
      int c = getState();
    
      /*
        	exclusiveCount(c) != 0 說明當前有執行緒持有寫鎖,在這種情況下就不能直接獲取讀鎖
        	但是如果持有寫鎖的執行緒時當前執行緒,那麼就可以繼續獲取讀鎖
        */
      if (exclusiveCount(c) != 0 &&
          getExclusiveOwnerThread() != current)
        return -1;
    
      int r = sharedCount(c); // 讀鎖的獲取次數
    
      if (!readerShouldBlock() && // 讀鎖是否需要阻塞
          r < MAX_COUNT && // 判斷獲取鎖的次數是否溢位(2^16 - 1)
          compareAndSetState(c, c + SHARED_UNIT)) { // 將讀鎖的獲取次數 +1
    
        // 此時已經獲取到了讀鎖
    
        // r == 0 說明執行緒是第一個獲取讀鎖的,或者前面獲取讀鎖的執行緒都已經釋放了讀鎖
        if (r == 0) {
          firstReader = current;
          firstReaderHoldCount = 1;
        } else if (firstReader == current) { // 是否重入
          firstReaderHoldCount++;
        } else {
          // 更新快取,即最後一個獲取讀鎖的執行緒
          HoldCounter rh = cachedHoldCounter;
          if (rh == null || rh.tid != getThreadId(current))
            cachedHoldCounter = rh = readHolds.get();
          else if (rh.count == 0)
            // 快取當前的執行緒持有的讀鎖的數量
            readHolds.set(rh);
          rh.count++;
        }
        
        // 返回一個大於 0 的數,表示已經獲取到了讀鎖
        return 1;
      }
      
      return fullTryAcquireShared(current);
    }
    

    如果要走到最後一個 return 語句,可能有以下幾種情況:

    • readerShouldBlock() 返回 true,這可能有兩種情況:在 FairSync 中的 hasQueuedPredecessors() 方法返回 true,即阻塞佇列中存在其它元素在等待鎖;在 NoFairSync 中的 apparentlyFirstQueuedIsExclusive() 方法返回 true,即判斷阻塞佇列中 head 的後繼節點是否是用來獲取寫鎖的,如果是的話,那麼讓這個鎖先來,避免寫鎖飢餓
    • 持有寫鎖的數量超過最大值(2^16 - 1)
    • CAS 失敗,即存在競爭關係,可能是多個執行緒爭奪一個讀鎖,或者多個執行緒爭奪一個寫鎖

    如果是發生了以上幾種情況,那麼就需要呼叫 fullTryAcquireShared 再次嘗試


    fullTryAcquireShared(current) 方法對應的原始碼:

    /*
    	引入這個方法的目的是為了減少鎖競爭
    */
    final int fullTryAcquireShared(Thread current) {
      HoldCounter rh = null;
      for (;;) { // 永真迴圈避免由於 CAS 失敗直接退出的情況
        int c = getState();
        // 如果其它執行緒持有了寫鎖,自然是獲取不到鎖了,因此需要進入到阻塞佇列
        if (exclusiveCount(c) != 0) {
          if (getExclusiveOwnerThread() != current)
            return -1;
          // else we hold the exclusive lock; blocking here
          // would cause deadlock.
        } else if (readerShouldBlock()) {
          // 處理重入
          if (firstReader == current) {
            // assert firstReaderHoldCount > 0;
          } else {
            if (rh == null) {
              rh = cachedHoldCounter;
              if (rh == null || rh.tid != getThreadId(current)) {
                /*
                	cachedHoldCounter 快取的不是當前的執行緒,那麼到 ThreadLocal 中獲取當前執行緒的 HolderCounter,
                	如果執行緒從來沒有初始化過 ThreadLocal 的值,那麼 get() 方法將會執行初始化
                */
                rh = readHolds.get();
                
                /*
                	rh.count == 0 說明上一行程式碼只是單純地初始化,那麼它依舊是需要去排隊的
                */
                if (rh.count == 0)
                  readHolds.remove();
              }
            }
            if (rh.count == 0)
              return -1;
          }
        }
        
        if (sharedCount(c) == MAX_COUNT)
          throw new Error("Maximum lock count exceeded");
        
        if (compareAndSetState(c, c + SHARED_UNIT)) {
          /*
          	如果在這裡已經 CAS 成功了,那麼久意味著成功獲取讀鎖了,
          	下面要做的就是設定 firstReader 或 cachedHoldCounter
          */
          
          if (sharedCount(c) == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
          } else if (firstReader == current) {
            firstReaderHoldCount++;
          } else {
            // 設定 cachedHoldCounter 為當前的執行緒
            if (rh == null)
              rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
              rh = readHolds.get();
            else if (rh.count == 0)
              readHolds.set(rh);
            rh.count++;
            cachedHoldCounter = rh; // cache for release
          }
          
          return 1; // 大於 0 表示獲取到了鎖
        }
      }
    }
    
  • 讀鎖的釋放

    ReadLock 釋放鎖的程式碼如下:

    public void unlock() {
      sync.releaseShared(1);
    }
    

    這個方法位於 AQS 中,具體的定義如下:

    // 就是一般的釋放共享變數的邏輯,具體的模版方法為 tryReleaseShared,需要子類去具體實現
    public final boolean releaseShared(int arg) {
      if (tryReleaseShared(arg)) {
        doReleaseShared(); // 這裡是 AQS 的相關知識,再次不做過多的介紹
        return true;
      }
      return false;
    }
    

    Sync 中對於 tryReleaseShared 的具體實現如下:

    protected final boolean tryReleaseShared(int unused) {
      Thread current = Thread.currentThread();
      if (firstReader == current) {
        if (firstReaderHoldCount == 1)
          /*
          	如果 firstReaderHoldCount == 1,那麼將 firstReader 置為 null
          	這是為了給後續的執行緒使用
          */
          firstReader = null;
        else
          firstReaderHoldCount--;
      } else {
        // 判斷 cachedHoldCounter 是否快取的是當前的執行緒,如果不是的話,那麼久需要從 ThreadLocal 中獲取
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
          rh = readHolds.get();
        int count = rh.count;
        
        if (count <= 1) {
          // 這一步將 ThreadLocal 移除掉,這是為了避免記憶體洩露,因此當前執行緒已經不再持有讀鎖了
          readHolds.remove();
          if (count <= 0)
            // unlock() 次數太多了
            throw unmatchedUnlockException();
        }
        --rh.count;
      }
      
      for (;;) {
        int c = getState();
        // nextc 是 state 高 16 位 -1 後的值
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
          /*
          	如果 nextc == 0,那就是 state 全部 32 位都為 0,即讀鎖和寫鎖都已經全部被釋放了
          	此時在這裡返回 true 的話,其實是幫助喚醒後繼節點中獲取鎖的執行緒
          */
          return nextc == 0;
      }
    }
    

寫鎖

  • 寫鎖的獲取

    寫鎖是獨佔鎖,因此如果已經有執行緒獲取到了讀鎖,那麼寫鎖需要進入到阻塞佇列中等待

    寫鎖加鎖的原始碼:

    public void lock() {
      sync.acquire(1); // 標準的 AQS 寫法
    }
    

    重點在於 Sync 類對於 tryAcquire 的實現,具體的原始碼如下:

    protected final boolean tryAcquire(int acquires) {
      
      Thread current = Thread.currentThread();
      int c = getState();
      int w = exclusiveCount(c);
      if (c != 0) {
        /*
        	c != 0 && w == 0: 寫鎖可用,但是有執行緒持有讀鎖(也可能是自己持有)
        	c != 0 && w != 0 && current != getExclusiveOwnerThread(): 其它執行緒持有寫鎖
        	也就是說,只要有讀鎖或者寫鎖被佔用,這次就不能獲取到寫鎖
        */
        if (w == 0 || current != getExclusiveOwnerThread())
          return false;
        
        if (w + exclusiveCount(acquires) > MAX_COUNT)
          throw new Error("Maximum lock count exceeded");
        
        // 這裡不需要 CAS,因為能夠走到這的只可能是寫鎖重入
        setState(c + acquires);
        return true;
      }
      
      // 如果寫鎖獲取不需要阻塞,那麼則執行 CAS,成功則代表獲取到了寫鎖
      if (writerShouldBlock() ||
          !compareAndSetState(c, c + acquires))
        return false;
      setExclusiveOwnerThread(current);
      return true;
    }
    
  • 寫鎖的釋放

    寫鎖釋放的原始碼如下:

    public void unlock() {
      sync.release(1); // 標準的 AQS 的使用
    }
    

    與之密切相關的就是 Sync 對於 tryRelease 方法的具體實現,具體的實現程式碼如下所示:

    // 簡單來講就是將 state 中關於寫鎖的持有的數量 -1
    protected final boolean tryRelease(int releases) {
      if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
      
      int nextc = getState() - releases;
      boolean free = exclusiveCount(nextc) == 0;
      
      if (free)
        setExclusiveOwnerThread(null);
      setState(nextc);
      
      return free;
    }
    

鎖降級

持有寫鎖的執行緒,去獲取讀鎖的這個過程被稱為鎖降級,這樣的話,一個執行緒可能既持有寫鎖,也持有讀鎖。但是,是不存在鎖升級這個情況的,因為如果一個持有讀鎖的執行緒,再去嘗試獲取寫鎖,這種情況下就有可能會發生死鎖


參考:

[1] https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.html

[2] https://javadoop.com/post/reentrant-read-write-lock