ReentrantReadWriteLock實現原理
重入鎖ReentrantLock
是排它鎖,當一個執行緒獲得鎖時,其他執行緒均會處於阻塞狀態。
而對於網際網路產品來說,大多數情況是讀多寫少,不需要每次操作都阻塞,只需要保證在寫的場景下,其他讀鎖處於阻塞狀態,等寫執行緒釋放鎖後,讀請求才能執行;若沒有寫操作的情況下,讀請求不需要阻塞執行緒。為此,JDK1.5
提供了ReentrantReadWriteLock
類。
1.基本使用
public class ReentrantReadWriteLockDemo { static ReadWriteLock rwl = new ReentrantReadWriteLock(); static Lock readLock = rwl.readLock(); static Lock writeLock = rwl.writeLock(); void read() { readLock.lock(); String name = Thread.currentThread().getName(); System.out.printf("執行緒%s獲得讀鎖\n", name); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.printf("執行緒%s釋放讀鎖\n", name); readLock.unlock(); } } void write() { writeLock.lock(); String name = Thread.currentThread().getName(); System.out.printf("執行緒%s獲得寫鎖\n", name); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.printf("執行緒%s釋放寫鎖\n", name); writeLock.unlock(); } } public static void main(String[] args) { ReentrantReadWriteLockDemo demo = new ReentrantReadWriteLockDemo(); Thread[] threads = new Thread[20]; for (int i = 0; i < threads.length; i++) { if (i % 2 == 0) { threads[i] = new Thread(demo::read); } else { threads[i] = new Thread(demo::write); } } for (int i = 0; i < threads.length; i++) { threads[i].start(); } } }
通過ReentrantReadWriteLock
將讀鎖和寫鎖隔離開,當沒有寫操作時,讀鎖直接通過共享鎖的方式直接讀取,不會阻塞其他執行緒;只有在有寫入操作的情況下,讀操作才會進行阻塞,等寫操作釋放鎖之後,讀操作才能繼續執行。保證讀的操作不會出現髒讀的現象。
2.實現原理
首先,來看ReentrantReadWriteLock
構造方法,
public ReentrantReadWriteLock() { this(false); } public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); }
由上述原始碼得知:該類也是通過自定義佇列同步器實現的,有公平鎖和非公平鎖兩種實現方式;同時在構造器中初始化了ReadLock
和WriteLock
兩個鎖物件,這兩個物件都是定義在ReentrantReadWriteLock
類中的靜態內部類。
我們從WriteLock#lock()
方法入手,詳細分析下其底層實現
public static class WriteLock implements Lock, java.io.Serializable {
...
public void lock() {
sync.acquire(1);
}
...
}
這裡的acquire
方法是定義在AQS中的獲取鎖的方法,由子類自定義具體獲取鎖的方式
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
}
tryAcquire
方法具體由自定義同步器實現,本文主要分析預設的實現方式,由上述構造器方法可以得知,預設的同步器是非公平的方式,也就是由ReentrantReadWriteLock$NonfairSync
進行實現,這裡的tryAcquire
方法是定義在其父類Sync
類中的:
abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
...
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
...
}
當執行緒的狀態state
沒有被修改,也就是其值為預設值0,writeShouldBlock
定義在NonfairSync
中,
static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {
return false; // writers can always barge
}
}
因此,若沒有執行緒獲取鎖,則會呼叫CAS方法修改state
的值,然後將當前執行緒記錄到獨佔執行緒的標識中
獲取鎖的執行緒可以直接執行lock
和unlock
之間的程式碼,若此時執行緒A獲取鎖,在還未釋放時,執行緒B再次呼叫lock
方法,則會判斷寫鎖的執行緒數量,也就是exclusiveCount
方法,會判斷出當前寫鎖的標識,因為當前是執行緒A擁有鎖,則該方法返回1,執行緒B執行tryAcquire
最終會返回false。若還是執行緒A執行lock
操作,則直接增加重入次數。
在
Sync
類中,使用32位標識讀寫鎖的狀態,使用低16位標識寫鎖的狀態,使用高16位標識讀鎖的狀態。
而根據AQS中acquire()
方法的定義,若獲取鎖失敗,則會將執行緒加入阻塞佇列中
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
根據AQS的處理邏輯,獲取不到鎖的寫執行緒將會加入到一個雙向連結串列佇列中,如下:
只有當執行緒A釋放鎖之後,也就是呼叫unlock
方法後,才會根據連結串列的順序依次喚醒阻塞佇列中的執行緒,這裡與ReentrantLock實現原理中的實現是一樣的,再次不具體贅述,unlock
方法定義如下:
public static class WriteLock implements Lock, java.io.Serializable {
public void unlock() {
sync.release(1);
}
}
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
...
}
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
}
從上得知,執行緒A釋放鎖後,會執行unparkSuccessor
喚醒處於阻塞佇列中的第一個有效節點,這部分屬於AQS的實現範疇,不再贅述。
再來看看讀請求獲取鎖的情況
public static class ReadLock implements Lock, java.io.Serializable {
...
public void lock() {
sync.acquireShared(1);
}
...
}
同樣,這裡是呼叫的AQS裡面的獲取共享鎖的方法,
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
...
}
tryAcquireShared
方法具體由自定義的同步器進行實現
abstract static class Sync extends AbstractQueuedSynchronizer {
...
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
...
}
假設還是由執行緒A持有鎖,讀操作執行lock
方法時,都會返回-1,根據AQS中acquireShared
方法的定義,當tryAcquireShared
小於0時,就會加入阻塞佇列,佇列方式與寫執行緒類似,也是一個雙向連結串列的方式。
至此,也解釋了存在寫操作時,所有的讀操作被阻塞,直到寫操作釋放鎖後,讀操作才能繼續執行。
若當前沒有執行緒獲得鎖,也就是state
狀態為0,會呼叫sharedCount
方法,該方法在Sync
中使用高16位標識。readerShouldBlock
方法定義如下:
static final class NonfairSync extends Sync {
...
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
...
}
當執行緒1執行讀鎖的lock
方法,會執行CAS操作修改state
的值,當執行緒2再次執行讀鎖的lock
方法,則會將每一個讀執行緒的資訊儲存在ThreadLocalHoldCounter
中,定義如下:
abstract static class Sync extends AbstractQueuedSynchronizer {
...
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
/**
* ThreadLocal subclass. Easiest to explicitly define for sake
* of deserialization mechanics.
*/
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
/**
* The number of reentrant read locks held by current thread.
* Initialized only in constructor and readObject.
* Removed whenever a thread's read hold count drops to 0.
*/
private transient ThreadLocalHoldCounter readHolds;
...
}
每個執行緒都會有一個ThreadLocal
儲存的變數副本,在沒有寫操作時也不會對執行緒進行阻塞。而一旦有個寫操作,通過CAS修改了state
狀態的值,後續讀操作都會加入到一個阻塞佇列中,直到寫操作釋放鎖後,阻塞佇列中的執行緒才能再次進行執行。
至此,ReentrantReadWriteLock
的基本特性已分析完畢。