Java同步器之StampedLock原始碼解析
一、簡介
StampedLock
是java8
中新增的類,它是一個更加高效的讀寫鎖的實現,而且它不是基於AQS
來實現的,它的內部自成一派邏輯。
StampedLock
具有三種模式:寫模式、讀模式、樂觀讀模式。
ReentrantReadWriteLock
中的讀和寫都是一種悲觀鎖的體現,StampedLock
加入了一種新的模式——樂觀讀,它是指當樂觀讀時假定沒有其它執行緒修改資料,讀取完成後再檢查下版本號有沒有變化,沒有變化就讀取成功了,這種模式更適用於讀多寫少的場景。
二、使用方法
讓我們通過下面的例子瞭解一下StampedLock
三種模式的使用方法:
class Point { private double x, y; private final StampedLock sl = new StampedLock(); void move(double deltaX, double deltaY) { // 獲取寫鎖,返回一個版本號(戳) long stamp = sl.writeLock(); try { x += deltaX; y += deltaY; } finally { // 釋放寫鎖,需要傳入上面獲取的版本號 sl.unlockWrite(stamp); } } double distanceFromOrigin() { // 樂觀讀 long stamp = sl.tryOptimisticRead(); double currentX = x, currentY = y; // 驗證版本號是否有變化 if (!sl.validate(stamp)) { // 版本號變了,樂觀讀轉悲觀讀 stamp = sl.readLock(); try { // 重新讀取x、y的值 currentX = x; currentY = y; } finally { // 釋放讀鎖,需要傳入上面獲取的版本號 sl.unlockRead(stamp); } } return Math.sqrt(currentX * currentX + currentY * currentY); } void moveIfAtOrigin(double newX, double newY) { // 獲取悲觀讀鎖 long stamp = sl.readLock(); try { while (x == 0.0 && y == 0.0) { // 轉為寫鎖 long ws = sl.tryConvertToWriteLock(stamp); // 轉換成功 if (ws != 0L) { stamp = ws; x = newX; y = newY; break; } else { // 轉換失敗 sl.unlockRead(stamp); // 獲取寫鎖 stamp = sl.writeLock(); } } } finally { // 釋放鎖 sl.unlock(stamp); } } }
從上面的例子我們可以與ReentrantReadWriteLock
進行對比:
(1)寫鎖的使用方式基本一對待;
(2)讀鎖(悲觀)的使用方式可以進行升級,通過tryConvertToWriteLock()
方式可以升級為寫鎖;
(3)樂觀讀鎖是一種全新的方式,它假定資料沒有改變,樂觀讀之後處理完業務邏輯再判斷版本號是否有改變,如果沒改變則樂觀讀成功,如果有改變則轉化為悲觀讀鎖重試;
三、原始碼分析
3.1 內部類
static final class WNode { // 前一個節點 volatile WNode prev; // 後一個節點 volatile WNode next; // 讀執行緒所用的連結串列(實際是一個棧結果) volatile WNode cowait; // list of linked readers // 阻塞的執行緒 volatile Thread thread; // non-null while possibly parked // 狀態 volatile int status; // 0, WAITING, or CANCELLED // 讀模式還是寫模式 final int mode; // RMODE or WMODE WNode(int m, WNode p) { mode = m; prev = p; } }
佇列中的節點,類似於AQS
佇列中的節點,可以看到它組成了一個雙向連結串列,內部維護著阻塞的執行緒。
3.2 主要屬性
// 一堆常量 // 讀執行緒的個數佔有低7位 private static final int LG_READERS = 7; // 讀執行緒個數每次增加的單位 private static final long RUNIT = 1L; // 寫執行緒個數所在的位置 private static final long WBIT = 1L << LG_READERS; // 128 = 1000 0000 // 讀執行緒個數所在的位置 private static final long RBITS = WBIT - 1L; // 127 = 111 1111 // 最大讀執行緒個數 private static final long RFULL = RBITS - 1L; // 126 = 111 1110 // 讀執行緒個數和寫執行緒個數的掩碼 private static final long ABITS = RBITS | WBIT; // 255 = 1111 1111 // 讀執行緒個數的反數,高25位全部為1 private static final long SBITS = ~RBITS; // -128 = 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1000 0000 // state的初始值 private static final long ORIGIN = WBIT << 1; // 256 = 1 0000 0000 // 佇列的頭節點 private transient volatile WNode whead; // 佇列的尾節點 private transient volatile WNode wtail; // 儲存著當前的版本號,類似於AQS的狀態變數state private transient volatile long state;
通過屬性可以看到,這是一個類似於AQS
的結構,內部同樣維護著一個狀態變數state
和一個CLH
佇列。
3.3 構造方法
public StampedLock() {
state = ORIGIN;
}
state
的初始值為ORIGIN(256)
,它的二進位制是1 0000 0000
,也就是初始版本號。
3.4 writeLock()方法
獲取寫鎖。
public long writeLock() {
long s, next;
// ABITS = 255 = 1111 1111
// WBITS = 128 = 1000 0000
// state與ABITS如果等於0,嘗試原子更新state的值加WBITS
// 如果成功則返回更新的值,如果失敗呼叫acquireWrite()方法
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
以state
等於初始值為例,則state & ABITS
的結果為:
此時state
為初始狀態,與ABITS
與運算後的值為0
,所以執行後面的CAS
方法,s + WBITS
的值為384 = 1 1000 0000
。
到這裡我們大膽猜測:state
的高24
位儲存的是版本號,低8
位儲存的是是否有加鎖,第8
位儲存的是寫鎖,低7
位儲存的是讀鎖被獲取的次數,而且如果只有第8
位儲存寫鎖的話,那麼寫鎖只能被獲取一次,也就不可能重入了。
到底我們猜測的對不對呢,接著來分析acquireWrite()
方法:
private long acquireWrite(boolean interruptible, long deadline) {
// node為新增節點,p為尾節點(即將成為node的前置節點)
WNode node = null, p;
// 第一次自旋——入隊
for (int spins = -1;;) { // spin while enqueuing
long m, s, ns;
// 再次嘗試獲取寫鎖
if ((m = (s = state) & ABITS) == 0L) {
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
return ns;
} else if (spins < 0)
// 如果自旋次數小於0,則計算自旋的次數
// 如果當前有寫鎖獨佔且佇列無元素,說明快輪到自己了
// 就自旋就行了,如果自旋完了還沒輪到自己才入隊
// 則自旋次數為SPINS常量
// 否則自旋次數為0
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
else if (spins > 0) {
// 當自旋次數大於0時,當前這次自旋隨機減一次自旋次數
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
} else if ((p = wtail) == null) {
// 如果佇列未初始化,新建一個空節點並初始化頭節點和尾節點
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
} else if (node == null)
// 如果新增節點還未初始化,則新建之,並賦值其前置節點為尾節點
node = new WNode(WMODE, p);
else if (node.prev != p)
// 如果尾節點有變化,則更新新增節點的前置節點為新的尾節點
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
// 嘗試更新新增節點為新的尾節點成功,則退出迴圈
p.next = node;
break;
}
}
// 第二次自旋——阻塞並等待喚醒
for (int spins = -1;;) {
// h為頭節點,np為新增節點的前置節點,pp為前前置節點,ps為前置節點的狀態
WNode h, np, pp; int ps;
// 如果頭節點等於前置節點,說明快輪到自己了
if ((h = whead) == p) {
if (spins < 0)
// 初始化自旋次數
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
// 增加自旋次數
spins <<= 1;
// 第三次自旋,不斷嘗試獲取寫鎖
for (int k = spins;;) { // spin at head
long s, ns;
if (((s = state) & ABITS) == 0L) {
if (U.compareAndSwapLong(this, STATE, s,
ns = s + WBIT)) {
// 嘗試獲取寫鎖成功,將node設定為新頭節點並清除其前置節點(gc)
whead = node;
node.prev = null;
return ns;
}
}
// 隨機立減自旋次數,當自旋次數減為0時跳出迴圈再重試
else if (LockSupport.nextSecondarySeed() >= 0 &&
--k <= 0)
break;
}
} else if (h != null) { // help release stale waiters
// 這段程式碼很難進來,是用於協助喚醒讀節點的
// 我是這麼除錯進來的:
// 起三個寫執行緒,兩個讀執行緒
// 寫執行緒1獲取鎖不要釋放
// 讀執行緒1獲取鎖,讀執行緒2獲取鎖(會阻塞)
// 寫執行緒2獲取鎖(會阻塞)
// 寫執行緒1釋放鎖,此時會喚醒讀執行緒1
// 在讀執行緒1裡面先不要喚醒讀執行緒2
// 寫執行緒3獲取鎖,此時就會走到這裡來了
WNode c; Thread w;
// 如果頭節點的cowait連結串列(棧)不為空,喚醒裡面的所有節點
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
// 如果頭節點沒有變化
if (whead == h) {
// 如果尾節點有變化,則更新
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
} else if ((ps = p.status) == 0)
// 如果尾節點狀態為0,則更新成WAITING
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
// 如果尾節點狀態為取消,則把它從連結串列中刪除
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
} else {
// 有超時時間的處理
long time; // 0 argument to park means no timeout
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
// 已超時,剔除當前節點
return cancelWaiter(node, node, false);
// 當前執行緒
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
// 把node的執行緒指向當前執行緒
node.thread = wt;
if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
whead == h && node.prev == p)
// 阻塞當前執行緒
U.park(false, time); // 等同於LockSupport.park()
// 當前節點被喚醒後,清除執行緒
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
// 如果中斷了,取消當前節點
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
這裡對acquireWrite()
方法做一個總結,這個方法裡面有三段自旋邏輯:
第一段自旋——入隊:
(1)如果頭節點等於尾節點,說明沒有其它執行緒排隊,那就多自旋一會,看能不能嘗試獲取到寫鎖;
(2)否則,自旋次數為0,直接讓其入隊;
第二段自旋——阻塞並等待被喚醒 + 第三段自旋——不斷嘗試獲取寫鎖:
(1)第三段自旋在第二段自旋內部;
(2)如果頭節點等於前置節點,那就進入第三段自旋,不斷嘗試獲取寫鎖;
(3)否則,嘗試喚醒頭節點中等待著的讀執行緒;
(4)最後,如果當前執行緒一直都沒有獲取到寫鎖,就阻塞當前執行緒並等待被喚醒;
這麼一大段邏輯看著比較鬧心,其實真正分解下來還是比較簡單的,無非就是自旋,把很多狀態的處理都糅合到一個for迴圈裡面處理了。
3.5 unlockWrite()方法
釋放寫鎖。
public void unlockWrite(long stamp) {
WNode h;
// 檢查版本號對不對
if (state != stamp || (stamp & WBIT) == 0L)
throw new IllegalMonitorStateException();
// 這行程式碼實際有兩個作用:
// 1. 更新版本號加1
// 2. 釋放寫鎖
// stamp + WBIT實際會把state的第8位置為0,也就相當於釋放了寫鎖
// 同時會進1,也就是高24位整體加1了
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
// 如果頭節點不為空,並且狀態不為0,呼叫release方法喚醒它的下一個節點
if ((h = whead) != null && h.status != 0)
release(h);
}
private void release(WNode h) {
if (h != null) {
WNode q; Thread w;
// 將其狀態改為0
U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
// 如果頭節點的下一個節點為空或者其狀態為已取消
if ((q = h.next) == null || q.status == CANCELLED) {
// 從尾節點向前遍歷找到一個可用的節點
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
// 喚醒q節點所在的執行緒
if (q != null && (w = q.thread) != null)
U.unpark(w);
}
}
寫鎖的釋放過程比較簡單:
(1)更改state
的值,釋放寫鎖;
(2)版本號加1
;
(3)喚醒下一個等待著的節點;
3.6 readLock()方法
獲取讀鎖。
public long readLock() {
long s = state, next; // bypass acquireRead on common uncontended case
// 沒有寫鎖佔用,並且讀鎖被獲取的次數未達到最大值
// 嘗試原子更新讀鎖被獲取的次數加1
// 如果成功直接返回,如果失敗呼叫acquireRead()方法
return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
獲取讀鎖的時候先看看現在有沒有其它執行緒佔用著寫鎖,如果沒有的話再檢測讀鎖被獲取的次數有沒有達到最大,如果沒有的話直接嘗試獲取一次讀鎖,如果成功了直接返回版本號,如果沒成功就呼叫acquireRead()
排隊。
下面我們一起來看看acquireRead()
方法,這又是一個巨長無比的方法,請保持耐心,我們一步步來分解:
private long acquireRead(boolean interruptible, long deadline) {
// node為新增節點,p為尾節點
WNode node = null, p;
// 第一段自旋——入隊
for (int spins = -1;;) {
// 頭節點
WNode h;
// 如果頭節點等於尾節點
// 說明沒有排隊的執行緒了,快輪到自己了,直接自旋不斷嘗試獲取讀鎖
if ((h = whead) == (p = wtail)) {
// 第二段自旋——不斷嘗試獲取讀鎖
for (long m, s, ns;;) {
// 嘗試獲取讀鎖,如果成功了直接返回版本號
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
// 如果讀執行緒個數達到了最大值,會溢位,返回的是0
return ns;
else if (m >= WBIT) {
// m >= WBIT表示有其它執行緒先一步獲取了寫鎖
if (spins > 0) {
// 隨機立減自旋次數
if (LockSupport.nextSecondarySeed() >= 0)
--spins;
}
else {
// 如果自旋次數為0了,看看是否要跳出迴圈
if (spins == 0) {
WNode nh = whead, np = wtail;
if ((nh == h && np == p) || (h = nh) != (p = np))
break;
}
// 設定自旋次數
spins = SPINS;
}
}
}
}
// 如果尾節點為空,初始化頭節點和尾節點
if (p == null) { // initialize queue
WNode hd = new WNode(WMODE, null);
if (U.compareAndSwapObject(this, WHEAD, null, hd))
wtail = hd;
}
else if (node == null)
// 如果新增節點為空,初始化之
node = new WNode(RMODE, p);
else if (h == p || p.mode != RMODE) {
// 如果頭節點等於尾節點或者尾節點不是讀模式
// 當前節點入隊
if (node.prev != p)
node.prev = p;
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
break;
}
}
else if (!U.compareAndSwapObject(p, WCOWAIT,
node.cowait = p.cowait, node))
// 接著上一個elseif,這裡肯定是尾節點為讀模式了
// 將當前節點加入到尾節點的cowait中,這是一個棧
// 上面的CAS成功了是不會進入到這裡來的
node.cowait = null;
else {
// 第三段自旋——阻塞當前執行緒並等待被喚醒
for (;;) {
WNode pp, c; Thread w;
// 如果頭節點不為空且其cowait不為空,協助喚醒其中等待的讀執行緒
if ((h = whead) != null && (c = h.cowait) != null &&
U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null) // help release
U.unpark(w);
// 如果頭節點等待前前置節點或者等於前置節點或者前前置節點為空
// 這同樣說明快輪到自己了
if (h == (pp = p.prev) || h == p || pp == null) {
long m, s, ns;
// 第四段自旋——又是不斷嘗試獲取鎖
do {
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s,
ns = s + RUNIT) :
(m < WBIT &&
(ns = tryIncReaderOverflow(s)) != 0L))
return ns;
} while (m < WBIT); // 只有當前時刻沒有其它執行緒佔有寫鎖就不斷嘗試
}
// 如果頭節點未曾改變且前前置節點也未曾改
// 阻塞當前執行緒
if (whead == h && p.prev == pp) {
long time;
// 如果前前置節點為空,或者頭節點等於前置節點,或者前置節點已取消
// 從第一個for自旋開始重試
if (pp == null || h == p || p.status > 0) {
node = null; // throw away
break;
}
// 超時檢測
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
// 如果超時了,取消當前節點
return cancelWaiter(node, p, false);
// 當前執行緒
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
// 設定進node中
node.thread = wt;
// 檢測之前的條件未曾改變
if ((h != pp || (state & ABITS) == WBIT) &&
whead == h && p.prev == pp)
// 阻塞當前執行緒並等待被喚醒
U.park(false, time);
// 喚醒之後清除執行緒
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
// 如果中斷了,取消當前節點
if (interruptible && Thread.interrupted())
return cancelWaiter(node, p, true);
}
}
}
}
// 只有第一個讀執行緒會走到下面的for迴圈處,參考上面第一段自旋中有一個break,當第一個讀執行緒入隊的時候break出來的
// 第五段自旋——跟上面的邏輯差不多,只不過這裡單獨搞一個自旋針對第一個讀執行緒
for (int spins = -1;;) {
WNode h, np, pp; int ps;
// 如果頭節點等於尾節點,說明快輪到自己了
// 不斷嘗試獲取讀鎖
if ((h = whead) == p) {
// 設定自旋次數
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
// 第六段自旋——不斷嘗試獲取讀鎖
for (int k = spins;;) { // spin at head
long m, s, ns;
// 不斷嘗試獲取讀鎖
if ((m = (s = state) & ABITS) < RFULL ?
U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
(m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
// 獲取到了讀鎖
WNode c; Thread w;
whead = node;
node.prev = null;
// 喚醒當前節點中所有等待著的讀執行緒
// 因為當前節點是第一個讀節點,所以它是在佇列中的,其它讀節點都是掛這個節點的cowait棧中的
while ((c = node.cowait) != null) {
if (U.compareAndSwapObject(node, WCOWAIT,
c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
// 返回版本號
return ns;
}
// 如果當前有其它執行緒佔有著寫鎖,並且沒有自旋次數了,跳出當前迴圈
else if (m >= WBIT &&
LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
break;
}
} else if (h != null) {
// 如果頭節點不等待尾節點且不為空且其為讀模式,協助喚醒裡面的讀執行緒
WNode c; Thread w;
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
// 如果頭節點未曾變化
if (whead == h) {
// 更新前置節點及其狀態等
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node; // stale
}
else if ((ps = p.status) == 0)
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
// 第一個讀節點即將進入阻塞
long time;
// 超時設定
if (deadline == 0L)
time = 0L;
else if ((time = deadline - System.nanoTime()) <= 0L)
// 如果超時了取消當前節點
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
node.thread = wt;
if (p.status < 0 &&
(p != h || (state & ABITS) == WBIT) &&
whead == h && node.prev == p)
// 阻塞第一個讀節點並等待被喚醒
U.park(false, time);
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
讀鎖的獲取過程比較艱辛,一共有六段自旋,讓我們來大致地分解一下:
- 讀節點進來都是先判斷是頭節點如果等於尾節點,說明快輪到自己了,就不斷地嘗試獲取讀鎖,如果成功了就返回;
- 如果頭節點不等於尾節點,這裡就會讓當前節點入隊,這裡入隊又分成了兩種;
- 一種是首個讀節點入隊,它是會排隊到整個佇列的尾部,然後跳出第一段自旋;
- 另一種是非第一個讀節點入隊,它是進入到首個讀節點的
cowait
棧中,所以更確切地說應該是入棧; - 不管是入隊還入棧後,都會再次檢測頭節點是不是等於尾節點了,如果相等,則會再次不斷嘗試獲取讀鎖;
- 如果頭節點不等於尾節點,那麼才會真正地阻塞當前執行緒並等待被喚醒;
- 上面說的首個讀節點其實是連續的讀執行緒中的首個,如果是兩個讀執行緒中間夾了一個寫執行緒,還是老老實實的排隊。
3.7 unlockRead()方法
釋放讀鎖。
public void unlockRead(long stamp) {
long s, m; WNode h;
for (;;) {
// 檢查版本號
if (((s = state) & SBITS) != (stamp & SBITS) ||
(stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
throw new IllegalMonitorStateException();
// 讀執行緒個數正常
if (m < RFULL) {
// 釋放一次讀鎖
if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
// 如果讀鎖全部都釋放了,且頭節點不為空且狀態不為0,喚醒它的下一個節點
if (m == RUNIT && (h = whead) != null && h.status != 0)
release(h);
break;
}
}
else if (tryDecReaderOverflow(s) != 0L)
// 讀執行緒個數溢位檢測
break;
}
}
private void release(WNode h) {
if (h != null) {
WNode q; Thread w;
// 將其狀態改為0
U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
// 如果頭節點的下一個節點為空或者其狀態為已取消
if ((q = h.next) == null || q.status == CANCELLED) {
// 從尾節點向前遍歷找到一個可用的節點
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
// 喚醒q節點所在的執行緒
if (q != null && (w = q.thread) != null)
U.unpark(w);
}
}
讀鎖釋放的過程就比較簡單了,將state
的低7
位減1
,當減為0
的時候說明完全釋放了讀鎖,就喚醒下一個排隊的執行緒。
3.8 tryOptimisticRead()方法
樂觀讀。
public long tryOptimisticRead() {
long s;
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
如果沒有寫鎖,就返回state
的高25
位,這裡把寫所在位置一起返回了,是為了後面檢測資料有沒有被寫過。
3.9 validate()方法
檢測樂觀讀版本號是否變化。
public boolean validate(long stamp) {
// 強制加入記憶體屏障,重新整理資料
U.loadFence();
return (stamp & SBITS) == (state & SBITS);
}
檢測兩者的版本號是否一致,與SBITS
與操作保證不受讀操作的影響。
四、變異的CLH佇列
StampedLock
中的佇列是一種變異的CLH
佇列,圖解如下:
五、總結
-
StampedLock
也是一種讀寫鎖,它不是基於AQS
實現的; -
StampedLock
相較於ReentrantReadWriteLock
多了一種樂觀讀的模式,以及讀鎖轉化為寫鎖的方法; -
StampedLock
的state
儲存的是版本號,確切地說是高24
位儲存的是版本號,寫鎖的釋放會增加其版本號,讀鎖不會; -
StampedLock
的低7
位儲存的讀鎖被獲取的次數,第8
位儲存的是寫鎖被獲取的次數; -
StampedLock
不是可重入鎖,因為只有第8
位標識寫鎖被獲取了,並不能重複獲取; -
StampedLock
中獲取鎖的過程使用了大量的自旋操作,對於短任務的執行會比較高效,長任務的執行會浪費大量CPU
; -
StampedLock
不能實現條件鎖;
六、拓展:與ReentrantReadWriteLock的對比
StampedLock
與ReentrantReadWriteLock
作為兩種不同的讀寫鎖方式,歸納了它們的大致異同點:
- 兩者都有獲取讀鎖、獲取寫鎖、釋放讀鎖、釋放寫鎖的方法,這是相同點;
- 兩者的結構基本類似,都是使用
state + CLH
佇列; - 前者的
state
分成三段,高24
位儲存版本號、低7
位儲存讀鎖被獲取的次數、第8
位儲存寫鎖被獲取的次數; - 後者的
state
分成兩段,高16
位儲存讀鎖被獲取的次數,低16
位儲存寫鎖被獲取的次數; - 前者的
CLH
佇列可以看成是變異的CLH
佇列,連續的讀執行緒只有首個節點儲存在佇列中,其它的節點儲存的首個節點的cowait
棧中; - 後者的
CLH
佇列是正常的CLH
佇列,所有的節點都在這個佇列中; - 前者獲取鎖的過程中有判斷首尾節點是否相同,也就是是不是快輪到自己了,如果是則不斷自旋,所以適合執行短任務;
- 後者獲取鎖的過程中非公平模式下會做有限次嘗試;
- 前者只有非公平模式,一上來就嘗試獲取鎖;
- 前者喚醒讀鎖是一次性喚醒連續的讀鎖的,而且其它執行緒還會協助喚醒;
- 後者是一個接著一個地喚醒的;
- 前者有樂觀讀的模式,樂觀讀的實現是通過判斷
state
的高25
位是否有變化來實現的; - 前者各種模式可以互轉,類似
tryConvertToXxx()
方法; - 前者寫鎖不可重入,後者寫鎖可重入;
- 前者無法實現條件鎖,後者可以實現條件鎖;