1. 程式人生 > 其它 >併發程式設計 - StampedLock (待續)

併發程式設計 - StampedLock (待續)

技術標籤:多執行緒

StampedLock

原始碼解析

屬性

// 讀執行緒的個數佔有低7位
private static final int LG_READERS = 7;
// 讀執行緒個數每次增加的單位
private static final long RUNIT = 1L;
// 寫鎖標記位
private static final long WBIT  = 1L << LG_READERS;  // 128 = 1000 0000
// 讀鎖標記位
private static final long RBITS = WBIT - 1L;  // 127 = 111 1111
// 最大讀執行緒個數
private
static final long RFULL = RBITS - 1L; // 126 = 111 1110 // 用於獲取讀寫狀態 private static final long ABITS = RBITS | WBIT; // 255 = 1111 1111 // 讀執行緒個數的反數,高25位全部為1 private static final long SBITS = ~RBITS; // -128 = 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1000 0000 // state的初始值 private static
final long ORIGIN = WBIT << 1; // 256 = 1 0000 0000 // 佇列的頭節點 private transient volatile WNode whead; // 佇列的尾節點 private transient volatile WNode wtail; // 同步狀態 private transient volatile long state;

writeLock

搶佔寫鎖。

過程概述:

	第一次自旋:

		如果此刻沒有持有鎖,可以直接搶佔鎖。否則插入到佇列尾部。

	第二次自旋:

		如果當前節點的前驅節點是頭節點,進入第三次自旋。
		如果頭節點的cowait連結串列不為空,喚醒裡面等待的讀執行緒。
		如果當前節點一直沒有獲取到寫鎖,阻塞當前執行緒。
		
	第三次自旋:

		不斷嘗試獲取寫鎖。
public long writeLock() {
    long s, next;  
    // 如果沒有持有任何鎖
    // CAS操作,如果成功,則標記持有寫鎖
    // 如果失敗,則執行 acquireWrite 方法
    return ((((s = state) & ABITS) == 0L && 
             U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
            next : acquireWrite(false, 0L));
}
private long acquireWrite(boolean interruptible, long deadline) {
    WNode node = null, p;

	// 第一次自旋 - 入隊
    for (int spins = -1;;) { 
        long m, s, ns;
        // 如果不持有任何鎖
        if ((m = (s = state) & ABITS) == 0L) {
        	// CAS操作,標記持有寫鎖
            if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
            	// 直接返回
                return ns;
        }
        
        // 如果持有鎖
        
        // 如果自旋次數 < 0
        else if (spins < 0)
        	// 修改自旋次數
        	// - 如果持有寫鎖 && 佇列中只有一個元素 -> SPINS
			// - 否則 -> 0 
            spins = (m == WBIT && wtail == whead) ? SPINS : 0;
            
        // 如果自旋次數 > 0
        else if (spins > 0) {
            if (LockSupport.nextSecondarySeed() >= 0)
            	// 自旋次數遞減
                --spins;
        }
		
		// 如果佇列沒有初始化,則初始化佇列
        else if ((p = wtail) == null) { 
            WNode hd = new WNode(WMODE, null);
            // 加入頭節點
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
            	// 加入尾節點
                wtail = hd;
        }
        
        // 初始化新節點
        else if (node == null)
            node = new WNode(WMODE, p);
            
        // 將新節點插入到佇列尾部
        else if (node.prev != p)
            node.prev = p;
        
        // 重新標記尾節點
        else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
            p.next = node;
			// 結束自旋
            break;
        }
    }

	// 第二次自旋 - 阻塞並等待喚醒
    for (int spins = -1;;) {
        WNode h, np, pp; int ps;
        // 如果當前節點的前驅節點是頭節點
        if ((h = whead) == p) {
            if (spins < 0)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;

			// 第三次自旋 - 嘗試獲取寫鎖
            for (int k = spins;;) { 
                long s, ns;
                // 如果當前不持有任何鎖
                if (((s = state) & ABITS) == 0L) {
                	// 如果CAS操作成功,標記持有寫鎖
                    if (U.compareAndSwapLong(this, STATE, s,
                                             ns = s + WBIT)) {
                        // 將當前節點設定為頭節點
                        whead = node;
                        node.prev = null;
                        // 直接返回
                        return ns;
                    }
                }
                // 如果當前持有鎖 
                // 自旋次數遞減
                else if (LockSupport.nextSecondarySeed() >= 0 &&
                         --k <= 0)
                    // 如果自旋次數<=0,跳出第三次自旋過程
                    break;
            }
        }
        // 如果當前節點的前驅節點不是頭節點
        else if (h != null) { // help release stale waiters
            WNode c; Thread w;
            // 如果頭節點的cowait連結串列不為空,喚醒裡面等待的讀執行緒。
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)              
                    U.unpark(w);
            }
        }
		
		// 如果頭節點沒有發生變化
        if (whead == h) {
        	// 如果尾節點發生變化,則更新
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   
            }
            // 如果尾節點的狀態為0
            else if ((ps = p.status) == 0)
            	// CAS操作,修改狀態為-1
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            // 如果尾節點的狀態為取消狀態,則將其從連結串列中刪除
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else {
                long time; // 0 argument to park means no timeout
                if (deadline == 0L)
                    time = 0L;
                // 如果超時
                else if ((time = deadline - System.nanoTime()) <= 0L)
                	// 剔除當前節點
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                // 將節點node的執行緒設定為當前執行緒
                node.thread = wt;
                if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                    whead == h && node.prev == p)
                    // 阻塞當前執行緒
                    U.park(false, time);  // emulate LockSupport.park
                // 將節點node的執行緒設定為null
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                // 如果中斷,則移除節點node
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

unlockWrite

釋放寫鎖。

public void unlockWrite(long stamp) {
    WNode h;
    // 如果同步狀態不正確 或者 沒有持有寫鎖,則直接丟擲異常。
    if (state != stamp || (stamp & WBIT) == 0L)
        throw new IllegalMonitorStateException();
    // 同步狀態修改為初始值
    state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
    // 如果頭節點不為空 && 狀態不為0
    if ((h = whead) != null && h.status != 0)
    	// 喚醒後繼節點
        release(h);
}
private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        // 修改狀態為0
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
        // 如果後繼節點為空 或者 後繼節點的狀態為取消狀態
        if ((q = h.next) == null || q.status == CANCELLED) {
        	// 從尾節點開始找合適的節點
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        if (q != null && (w = q.thread) != null)
        	// 喚醒節點所在的執行緒
            U.unpark(w);
    }
}