1. 程式人生 > >Java--讀寫鎖的實現原理

Java--讀寫鎖的實現原理

最近做的一個小專案中有這樣的需求:

整個專案有一份 config.json 儲存著專案的一些配置,是儲存在本地檔案的一個資源,並且應用中存在讀寫(讀>>寫)更新問題。既然讀寫併發操作,那麼就涉及到操作互斥,這裡自然想到了讀寫鎖,也順便對自己讀寫鎖方面的知識做個梳理。

為什麼需要讀寫鎖?

與傳統鎖不同的是讀寫鎖的規則是可以共享讀,但只能一個寫,總結起來為: 讀讀不互斥,讀寫互斥,寫寫互斥 ,而一般的獨佔鎖是: 讀讀互斥,讀寫互斥,寫寫互斥 ,而場景中往往 讀遠遠大於寫 ,讀寫鎖就是為了這種優化而創建出來的一種機制。

注意是 讀遠遠大於寫 ,一般情況下獨佔鎖的效率低來源於高併發下對臨界區的激烈競爭導致執行緒上下文切換。因此當併發不是很高的情況下,讀寫鎖由於需要額外維護讀鎖的狀態,可能還不如獨佔鎖的效率高。因此需要根據實際情況選擇使用。

一個簡單的讀寫鎖實現

根據上面理論可以利用兩個int變數來簡單實現一個讀寫鎖,實現雖然爛,但是原理都是差不多的,值得閱讀下。

public class ReadWriteLock {
  /**
   * 讀鎖持有個數
   */
  private int readCount = 0;
  /**
   * 寫鎖持有個數
   */
  private int writeCount = 0;

  /**
   * 獲取讀鎖,讀鎖在寫鎖不存在的時候才能獲取
   */
  public synchronized void lockRead() throws InterruptedException {
    // 寫鎖存在,需要wait
while (writeCount > 0) { wait(); } readCount++; } /** * 釋放讀鎖 */ public synchronized void unlockRead() { readCount--; notifyAll(); } /** * 獲取寫鎖,當讀鎖存在時需要wait. */ public synchronized void lockWrite() throws InterruptedException { // 先判斷是否有寫請求 while
(writeCount > 0) { wait(); } // 此時已經不存在獲取寫鎖的執行緒了,因此佔坑,防止寫鎖飢餓 writeCount++; // 讀鎖為0時獲取寫鎖 while (readCount > 0) { wait(); } } /** * 釋放讀鎖 */ public synchronized void unlockWrite() { writeCount--; notifyAll(); } }

ReadWriteLock的實現原理

在Java中 ReadWriteLock 的主要實現為 ReentrantReadWriteLock ,其提供了以下特性:

  1. 公平性選擇:支援公平與非公平(預設)的鎖獲取方式,吞吐量非公平優先於公平。
  2. 可重入:讀執行緒獲取讀鎖之後可以再次獲取讀鎖,寫執行緒獲取寫鎖之後可以再次獲取寫鎖
  3. 可降級:寫執行緒獲取寫鎖之後,其還可以再次獲取讀鎖,然後釋放掉寫鎖,那麼此時該執行緒是讀鎖狀態,也就是降級操作。

ReentrantReadWriteLock的結構

ReentrantReadWriteLock 的核心是由一個基於AQS的同步器 Sync 構成,然後由其擴展出 ReadLock (共享鎖), WriteLock (排它鎖)所組成。

並且從 ReentrantReadWriteLock 的建構函式中可以發現 ReadLock 與 WriteLock 使用的是同一個Sync,具體怎麼實現同一個佇列既可以為共享鎖,又可以表示排他鎖下文會具體分析。

清單一:ReentrantReadWriteLock建構函式

public ReentrantReadWriteLock(boolean fair) {
       sync = fair ? new FairSync() : new NonfairSync();
       readerLock = new ReadLock(this);
       writerLock = new WriteLock(this);
   }

Sync的實現

sync 是讀寫鎖實現的核心, sync 是基於AQS實現的,在AQS中核心是state欄位和雙端佇列,那麼一個一個問題來分析。

Sync是如何同時表示讀鎖與寫鎖?

清單2:讀寫鎖狀態獲取

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count  */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count  */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

從程式碼中獲取讀寫狀態可以看出其是把 state(int32位) 欄位分成高16位與低16位,其中高16位表示讀鎖個數,低16位表示寫鎖個數,如下圖所示(圖來自 Java併發程式設計藝術 )。

該圖表示當前一個執行緒獲取到了寫鎖,並且重入了兩次,因此低16位是3,並且該執行緒又獲取了讀鎖,並且重入了一次,所以高16位是2,當寫鎖被獲取時如果讀鎖不為0那麼讀鎖一定是獲取寫鎖的這個執行緒。

讀鎖的獲取

讀鎖的獲取主要實現是AQS中的 acquireShared 方法,其呼叫過程如下程式碼。

清單3:讀鎖獲取入口

// ReadLock
public void lock() {
    sync.acquireShared(1);
}
// AQS
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

其中 doAcquireShared(arg) 方法是獲取失敗之後AQS中入隊操作,等待被喚醒後重新獲取,那麼關鍵點就是 tryAcquireShared(arg) 方法,方法有點長,因此先總結出獲取讀鎖所經歷的步驟,獲取的第一部分步驟如下:

  • 操作1:讀寫需要互斥,因此當存在寫鎖並且持有寫鎖的執行緒不是該執行緒時獲取失敗。
  • 操作2:是否存在等待寫鎖的執行緒,存在的話則獲取讀鎖需要等待,避免寫鎖飢餓。(寫鎖優先順序是比較高的)
  • 操作3:CAS獲取讀鎖,實際上是state欄位的高16位自增。
  • 操作4:獲取成功後再ThreadLocal中記錄當前執行緒獲取讀鎖的次數。

清單4:讀鎖獲取的第一部分

protected final int tryAcquireShared(int unused) {
          Thread current = Thread.currentThread();
          int c = getState();
          // 操作1:存在寫鎖,並且寫鎖不是當前執行緒則直接去排隊
          if (exclusiveCount(c) != 0 &&
              getExclusiveOwnerThread() != current)
              return -1;

          int r = sharedCount(c);
          // 操作2:讀鎖是否該阻塞,對於非公平模式下寫鎖獲取優先順序會高,如果存在要獲取寫鎖的執行緒則讀鎖需要讓步,公平模式下則先來先到
          if (!readerShouldBlock() && 
              // 讀鎖使用高16位,因此存在獲取上限為2^16-1
              r < MAX_COUNT &&
              // 操作3:CAS修改讀鎖狀態,實際上是讀鎖狀態+1
              compareAndSetState(c, c + SHARED_UNIT)) {
              // 操作4:執行到這裡說明讀鎖已經獲取成功,因此需要記錄執行緒狀態。
              if (r == 0) {
                  firstReader = current; // firstReader是把讀鎖狀態從0變成1的那個執行緒
                  firstReaderHoldCount = 1;
              } else if (firstReader == current) { 
                  firstReaderHoldCount++;
              } else {
                  // 這些程式碼實際上是從ThreadLocal中獲取當前執行緒重入讀鎖的次數,然後自增下。
                  HoldCounter rh = cachedHoldCounter; // cachedHoldCounter是上一個獲取鎖成功的執行緒
                  if (rh == null || rh.tid != getThreadId(current))
                      cachedHoldCounter = rh = readHolds.get();
                  else if (rh.count == 0)
                      readHolds.set(rh);
                  rh.count++;
              }
              return 1;
          }
          // 當操作2,操作3失敗時執行該邏輯
          return fullTryAcquireShared(current);
      }

當操作2,操作3失敗時會執行 fullTryAcquireShared(current) ,為什麼會這樣寫呢?個人認為是一種補償操作, 操作2與操作3失敗並不代表當前執行緒沒有讀鎖的資格 ,並且這裡的讀鎖是共享鎖,有資格就應該被獲取成功,因此給予補償獲取讀鎖的操作。在 fullTryAcquireShared(current) 中是一個迴圈獲取讀鎖的過程,大致步驟如下:

  • 操作5:等同於操作2,存在寫鎖,且寫鎖執行緒並非當前執行緒則直接返回失敗
  • 操作6:當前執行緒是重入讀鎖,這裡只會偏向第一個獲取讀鎖的執行緒以及最後一個獲取讀鎖的執行緒,其他都需要去AQS中排隊。
  • 操作7:CAS改變讀鎖狀態
  • 操作8:同操作4,獲取成功後再ThreadLocal中記錄當前執行緒獲取讀鎖的次數。

清單5:讀鎖獲取的第二部分

final int fullTryAcquireShared(Thread current) {
           HoldCounter rh = null;
           // 最外層巢狀迴圈
           for (;;) {
               int c = getState();
               // 操作5:存在寫鎖,且寫鎖並非當前執行緒則直接返回失敗
               if (exclusiveCount(c) != 0) {
                   if (getExclusiveOwnerThread() != current)
                       return -1;
                   // else we hold the exclusive lock; blocking here
                   // would cause deadlock.
               // 操作6:如果當前執行緒是重入讀鎖則放行
               } else if (readerShouldBlock()) {
                   // Make sure we're not acquiring read lock reentrantly
                   // 當前是firstReader,則直接放行,說明是已獲取的執行緒重入讀鎖
                   if (firstReader == current) {
                       // assert firstReaderHoldCount > 0;
                   } else {
                       // 執行到這裡說明是其他執行緒,如果是cachedHoldCounter(其count不為0)也就是上一個獲取鎖的執行緒則可以重入,否則進入AQS中排隊
                       // **這裡也是對寫鎖的讓步**,如果佇列中頭結點為寫鎖,那麼當前獲取讀鎖的執行緒要進入佇列中排隊
                       if (rh == null) {
                           rh = cachedHoldCounter;
                           if (rh == null || rh.tid != getThreadId(current)) {
                               rh = readHolds.get();
                               if (rh.count == 0)
                                   readHolds.remove();
                           }
                       }
                       // 說明是上述剛初始化的rh,所以直接去AQS中排隊
                       if (rh.count == 0)
                           return -1;
                   }
               }
               if (sharedCount(c) == MAX_COUNT)
                   throw new Error("Maximum lock count exceeded");
               // 操作7:修改讀鎖狀態,實際上讀鎖自增操作
               if (compareAndSetState(c, c + SHARED_UNIT)) {
                   // 操作8:對ThreadLocal中維護的獲取鎖次數進行更新。
                   if (sharedCount(c) == 0) {
                       firstReader = current;
                       firstReaderHoldCount = 1;
                   } else if (firstReader == current) {
                       firstReaderHoldCount++;
                   } else {
                       if (rh == null)
                           rh = cachedHoldCounter;
                       if (rh == null || rh.tid != getThreadId(current))
                           rh = readHolds.get();
                       else if (rh.count == 0)
                           readHolds.set(rh);
                       rh.count++;
                       cachedHoldCounter = rh; // cache for release
                   }
                   return 1;
               }
           }
       }

讀鎖的釋放

清單6:讀鎖釋放入口

// ReadLock
public void unlock() {
    sync.releaseShared(1);
}
// Sync
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared(); // 這裡實際上是釋放讀鎖後喚醒寫鎖的執行緒操作
        return true;
    }
    return false;
}

讀鎖的釋放主要是 tryReleaseShared(arg) 函式,因此拆解其步驟如下:

  • 操作1:清理ThreadLocal中儲存的獲取鎖數量資訊
  • 操作2:CAS修改讀鎖個數,實際上是自減一

清單7:讀鎖的釋放流程

protected final boolean tryReleaseShared(int unused) {
         Thread current = Thread.currentThread();
         // 操作1:清理ThreadLocal對應的資訊
         if (firstReader == current) {;
             if (firstReaderHoldCount == 1)
                 firstReader = null;
             else
                 firstReaderHoldCount--;
         } else {
             HoldCounter rh = cachedHoldCounter;
             if (rh == null || rh.tid != getThreadId(current))
                 rh = readHolds.get();
             int count = rh.count;
             // 已釋放完的讀鎖的執行緒清空操作
             if (count <= 1) {
                 readHolds.remove();
                 // 如果沒有獲取鎖卻釋放則會報該錯誤
                 if (count <= 0)
                     throw unmatchedUnlockException();
             }
             --rh.count;
         }
         // 操作2:迴圈中利用CAS修改讀鎖狀態
         for (;;) {
             int c = getState();
             int nextc = c - SHARED_UNIT;
             if (compareAndSetState(c, nextc))
                 // Releasing the read lock has no effect on readers,
                 // but it may allow waiting writers to proceed if
                 // both read and write locks are now free.
                 return nextc == 0;
         }
     }

寫鎖的獲取

清單8:寫鎖的獲取入口

    // WriteLock
  public void lock() {
        sync.acquire(1);
    }
// AQS
  public final void acquire(int arg) {
        // 嘗試獲取,獲取失敗後入隊,入隊失敗則interrupt當前執行緒
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

寫鎖的獲取也主要是 tryAcquire(arg) 方法,這裡也拆解步驟:

  • 操作1:如果讀鎖數量不為0或者寫鎖數量不為0,並且不是重入操作,則獲取失敗。
  • 操作2:如果當前鎖的數量為0,也就是不存在操作1的情況,那麼該執行緒是有資格獲取到寫鎖,因此修改狀態,設定獨佔執行緒為當前執行緒

清單9:寫鎖的獲取

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    // 操作1:c != 0,說明存在讀鎖或者寫鎖
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)  
        // 寫鎖為0,讀鎖不為0 或者獲取寫鎖的執行緒並不是當前執行緒,直接失敗
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        // 執行到這裡說明是寫鎖執行緒的重入操作,直接修改狀態,也不需要CAS因為沒有競爭
        setState(c + acquires);
        return true;
    }
    // 操作2:獲取寫鎖,writerShouldBlock對於非公平模式直接返回fasle,對於公平模式則執行緒需要排隊,因此需要阻塞。
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

寫鎖的釋放

清單10:寫鎖的釋放入口

// WriteLock
public void unlock() {
        sync.release(1);
    }
// AQS
public final boolean release(int arg) {
    // 釋放鎖成功後喚醒佇列中第一個執行緒
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

寫鎖的釋放主要是 tryRelease(arg) 方法,其邏輯就比較簡單了,註釋很詳細。

清單11:寫鎖的釋放

protected final boolean tryRelease(int releases) {
     // 如果當前執行緒沒有獲取寫鎖卻釋放,則直接拋異常
     if (!isHeldExclusively())
         throw new IllegalMonitorStateException();
     // 狀態變更至nextc
     int nextc = getState() - releases;
     // 因為寫鎖是可以重入,所以在都釋放完畢後要把獨佔標識清空
     boolean free = exclusiveCount(nextc) == 0;
     if (free)
         setExclusiveOwnerThread(null);
     // 修改狀態
     setState(nextc);
     return free;
 }

一些其他問題

鎖降級操作哪裡體現?

鎖降級操作指的是一個執行緒獲取寫鎖之後再獲取讀鎖,然後讀鎖釋放掉寫鎖的過程。在 tryAcquireShared(arg) 獲取讀鎖的程式碼中有如下程式碼。

清單12:寫鎖降級策略

             Thread current = Thread.currentThread();
            // 當前狀態
            int c = getState();
            // 存在寫鎖,並且寫鎖不等於當前執行緒時返回,換句話說等寫鎖為當前執行緒時則可以繼續往下獲取讀鎖。
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                
            
           

相關推薦

JavaReentrantReadWriteLock原理詳解

轉載自:https://blog.csdn.net/fuyuwei2015/article/details/72597192 介紹 ReentrantLock屬於排他鎖,這些鎖在同一時刻只允許一個執行緒進行訪問,而讀寫鎖在同一時刻可以允許多個執行緒訪問,但是在寫執行緒訪問時,所有的讀和其他寫

Java--實現原理

最近做的一個小專案中有這樣的需求:整個專案有一份 config.json 儲存著專案的一些配置,是儲存在本地檔案的一個資源,並且應用中存在讀寫(讀>>寫)更新問題。既然讀寫併發操作,那麼就涉及到操作互斥,這裡自然想到了讀寫鎖,也順便對自己讀寫鎖方面的知識做個梳理。

JAVA使用JDK1.5提供的實現高併發本地快取工具類

package com.study; import java.util.LinkedHashMap; import jav

javaReadWriteLock

blog ner java readwrite pan exce oid rate this package com.java.concurrent; import java.util.concurrent.locks.ReadWriteLock; import jav

MySQL中的事務及實現並發訪問控制

hang dea 執行c 定時 ack 幫助 持久 表操作 查看 一、並發控制中鎖的概念   鎖是並發控制中最核心的概念之一,在MySQL中的鎖分兩大類,一種是讀鎖,一種是寫鎖,讀鎖也可以稱為共享鎖(shared lock),寫鎖也通常稱為排它鎖(exclusive loc

Java --

對於讀多寫少的場景,我們此時應該允許讀鎖的多次重入,提高讀操作的併發性,在這種情況下,我們將讀寫鎖分離。 /** * 執行緒安全的local cache demo */ class LocalCache { private Reentra

java應用在快取系統

package test; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.locks.ReadWriteLock;

Java ReentrantReadWriteLock 原始碼分析

本文內容:讀寫鎖 ReentrantReadWriteLock 的原始碼分析,基於 Java7/Java8。 閱讀建議:雖然我這裡會介紹一些 AQS 的知識,不過如果你完全不瞭解 AQS,看本文就有點吃力了。 使用示例 下面這個例子非常實用,我是 javadoc 的

Elasticsearch文件模型實現原理

   ES系列基於ElasticSearch6.4.x版本。 1、簡介    ElasticSearch,每個索引被分成多個分片(預設每個索引5個主分片primary shard),每個分片又可以有多個

多執行緒學習筆記五之實現分析

目錄 簡介 讀寫狀態 讀鎖計數器 共享鎖的獲取 tryAcquireShared(int unused) doAcquireShared(int arg) 共享鎖的釋放 tryReleaseShared(int unus

一個優先的實現

/* g++ -Wall -o rwlock rwlock.cpp -lpthread * * 一個寫優先讀寫鎖的實現,多執行緒頻繁讀,多執行緒少量寫,同時寫優先,效能極佳。 * 當寫鎖(獨佔鎖)lock成功的必要條件是: * 1. 將寫鎖計數++; * 2

java分離實現

資料庫配置為一個主庫 多個從庫 主庫用於寫操作 從庫只讀操作讀寫分離實現即為配置兩個資料來源,一個用於讀寫 連線主庫 假設為ds_wr,一個用於只讀 連線從庫 假設為ds_r。對資料庫讀操作時,操作ds_r資料來源。對資料來源寫操作時,操作ds_wr資料來源。讀寫分離可以有兩

實現一個快取系統

package cn.itcast.gz;   import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock;   /**  * 用讀寫鎖實現的一個快取系統,讀

圖文深入解析 JAVA ,為什麼會死,反過來卻不會?

一、回顧基本的讀寫鎖 我們知道讀寫鎖 #java.util.concurrent.locks.ReentrantReadWrite

——ReentrantReadWriteLock原理詳解

1.讀寫鎖ReentrantReadWriteLock的原理 解決執行緒安全問題使用ReentrantLock就可以了,

原始碼分析— javaReentrantReadWriteLock

前言 今天看Jraft的時候發現了很多地方都用到了讀寫鎖,所以心血來潮想要分析以下讀寫鎖是怎麼實現的。 先上一個doc裡面的例子: class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWri

從火車站車次公示欄來學Java

Java多執行緒併發之讀寫鎖 本文主要內容:讀寫鎖的理論;通過生活中例子來理解讀寫鎖;讀寫鎖的程式碼演示;讀寫鎖總結。通過理論(總結)-例子-程式碼-然後再次總結,這四個步驟來讓大家對讀寫鎖的深刻理解。 本篇是《凱哥(凱哥Java:kagejava)併發程式設計學習》系列之《Lock系列》教程的第七篇:《Ja

ReentrantReadWriteLock簡單原理案例證明

## ReentrantReadWriteLock存在原因? --- 我們知道List的實現類ArrayList,LinkedList都是非執行緒安全的,Vector類通過用synchronized修飾方法保證了List的多執行緒非安全問題,但是有個缺點:**讀寫同步,效率低下**。於是就出現了CopyO

自旋和順序實現原理

並且 保護 表達 min 返回 create creat rwlock ini 常用的同步原語鎖,到多核處理器時代鎖已經是必不可少的同步方式之一了。無論設計多優秀的多線程數據結構,都避不開有競爭的臨界區,此時高效的鎖顯得至關重要。鎖的顆粒度是框架/程序設計者所關註的,

java併發-Synchronized+CAS方式實現

Synchronized+CAS方式實現讀寫鎖 文章目錄 Synchronized+CAS方式實現讀寫鎖 思路 技術 程式碼 測試 結果 [GitHub主頁](https://gith