併發程式設計 - StampedLock (待續)
阿新 • • 發佈:2021-01-03
技術標籤:多執行緒
StampedLock
原始碼解析
屬性
// 讀執行緒的個數佔有低7位
private static final int LG_READERS = 7;
// 讀執行緒個數每次增加的單位
private static final long RUNIT = 1L;
// 寫鎖標記位
private static final long WBIT = 1L << LG_READERS; // 128 = 1000 0000
// 讀鎖標記位
private static final long RBITS = WBIT - 1L; // 127 = 111 1111
// 最大讀執行緒個數
private static final long RFULL = RBITS - 1L; // 126 = 111 1110
// 用於獲取讀寫狀態
private static final long ABITS = RBITS | WBIT; // 255 = 1111 1111
// 讀執行緒個數的反數,高25位全部為1
private static final long SBITS = ~RBITS; // -128 = 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1000 0000
// state的初始值
private static final long ORIGIN = WBIT << 1; // 256 = 1 0000 0000
// 佇列的頭節點
private transient volatile WNode whead;
// 佇列的尾節點
private transient volatile WNode wtail;
// 同步狀態
private transient volatile long state;
writeLock
搶佔寫鎖。
過程概述: 第一次自旋: 如果此刻沒有持有鎖,可以直接搶佔鎖。否則插入到佇列尾部。 第二次自旋: 如果當前節點的前驅節點是頭節點,進入第三次自旋。 如果頭節點的cowait連結串列不為空,喚醒裡面等待的讀執行緒。 如果當前節點一直沒有獲取到寫鎖,阻塞當前執行緒。 第三次自旋: 不斷嘗試獲取寫鎖。
public long writeLock() {
long s, next;
// 如果沒有持有任何鎖
// CAS操作,如果成功,則標記持有寫鎖
// 如果失敗,則執行 acquireWrite 方法
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
private long acquireWrite(boolean interruptible, long deadline) {
WNode node = null, p;
// 第一次自旋 - 入隊
for (int spins = -1;;) {
long m, s, ns;
// 如果不持有任何鎖
if ((m = (s = state) & ABITS) == 0L) {
// CAS操作,標記持有寫鎖
if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
// 直接返回
return ns;
}
// 如果持有鎖
// 如果自旋次數 < 0
else if (spins < 0)
// 修改自旋次數
// - 如果持有寫鎖 && 佇列中只有一個元素 -> SPINS
// - 否則 -> 0
spins = (m == WBIT && wtail == whead) ? SPINS : 0;
// 如果自旋次數 > 0
else if (spins > 0) {
if (LockSupport.nextSecondarySeed() >= 0)
// 自旋次數遞減
--spins;
}
// 如果佇列沒有初始化,則初始化佇列
else if ((p = wtail) == null) {
WNode hd = new WNode(WMODE, null);
// 加入頭節點
if (U.compareAndSwapObject(this, WHEAD, null, hd))
// 加入尾節點
wtail = hd;
}
// 初始化新節點
else if (node == null)
node = new WNode(WMODE, p);
// 將新節點插入到佇列尾部
else if (node.prev != p)
node.prev = p;
// 重新標記尾節點
else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
p.next = node;
// 結束自旋
break;
}
}
// 第二次自旋 - 阻塞並等待喚醒
for (int spins = -1;;) {
WNode h, np, pp; int ps;
// 如果當前節點的前驅節點是頭節點
if ((h = whead) == p) {
if (spins < 0)
spins = HEAD_SPINS;
else if (spins < MAX_HEAD_SPINS)
spins <<= 1;
// 第三次自旋 - 嘗試獲取寫鎖
for (int k = spins;;) {
long s, ns;
// 如果當前不持有任何鎖
if (((s = state) & ABITS) == 0L) {
// 如果CAS操作成功,標記持有寫鎖
if (U.compareAndSwapLong(this, STATE, s,
ns = s + WBIT)) {
// 將當前節點設定為頭節點
whead = node;
node.prev = null;
// 直接返回
return ns;
}
}
// 如果當前持有鎖
// 自旋次數遞減
else if (LockSupport.nextSecondarySeed() >= 0 &&
--k <= 0)
// 如果自旋次數<=0,跳出第三次自旋過程
break;
}
}
// 如果當前節點的前驅節點不是頭節點
else if (h != null) { // help release stale waiters
WNode c; Thread w;
// 如果頭節點的cowait連結串列不為空,喚醒裡面等待的讀執行緒。
while ((c = h.cowait) != null) {
if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
(w = c.thread) != null)
U.unpark(w);
}
}
// 如果頭節點沒有發生變化
if (whead == h) {
// 如果尾節點發生變化,則更新
if ((np = node.prev) != p) {
if (np != null)
(p = np).next = node;
}
// 如果尾節點的狀態為0
else if ((ps = p.status) == 0)
// CAS操作,修改狀態為-1
U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
// 如果尾節點的狀態為取消狀態,則將其從連結串列中刪除
else if (ps == CANCELLED) {
if ((pp = p.prev) != null) {
node.prev = pp;
pp.next = node;
}
}
else {
long time; // 0 argument to park means no timeout
if (deadline == 0L)
time = 0L;
// 如果超時
else if ((time = deadline - System.nanoTime()) <= 0L)
// 剔除當前節點
return cancelWaiter(node, node, false);
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this);
// 將節點node的執行緒設定為當前執行緒
node.thread = wt;
if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
whead == h && node.prev == p)
// 阻塞當前執行緒
U.park(false, time); // emulate LockSupport.park
// 將節點node的執行緒設定為null
node.thread = null;
U.putObject(wt, PARKBLOCKER, null);
// 如果中斷,則移除節點node
if (interruptible && Thread.interrupted())
return cancelWaiter(node, node, true);
}
}
}
}
unlockWrite
釋放寫鎖。
public void unlockWrite(long stamp) {
WNode h;
// 如果同步狀態不正確 或者 沒有持有寫鎖,則直接丟擲異常。
if (state != stamp || (stamp & WBIT) == 0L)
throw new IllegalMonitorStateException();
// 同步狀態修改為初始值
state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
// 如果頭節點不為空 && 狀態不為0
if ((h = whead) != null && h.status != 0)
// 喚醒後繼節點
release(h);
}
private void release(WNode h) {
if (h != null) {
WNode q; Thread w;
// 修改狀態為0
U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
// 如果後繼節點為空 或者 後繼節點的狀態為取消狀態
if ((q = h.next) == null || q.status == CANCELLED) {
// 從尾節點開始找合適的節點
for (WNode t = wtail; t != null && t != h; t = t.prev)
if (t.status <= 0)
q = t;
}
if (q != null && (w = q.thread) != null)
// 喚醒節點所在的執行緒
U.unpark(w);
}
}