1. 程式人生 > 其它 >Java同步器之StampedLock原始碼解析

Java同步器之StampedLock原始碼解析

一、簡介

StampedLockjava8中新增的類,它是一個更加高效的讀寫鎖的實現,而且它不是基於AQS來實現的,它的內部自成一派邏輯。

StampedLock具有三種模式:寫模式、讀模式、樂觀讀模式。

ReentrantReadWriteLock中的讀和寫都是一種悲觀鎖的體現,StampedLock加入了一種新的模式——樂觀讀,它是指當樂觀讀時假定沒有其它執行緒修改資料,讀取完成後再檢查下版本號有沒有變化,沒有變化就讀取成功了,這種模式更適用於讀多寫少的場景。

二、使用方法

讓我們通過下面的例子瞭解一下StampedLock三種模式的使用方法:

class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();

    void move(double deltaX, double deltaY) {
        // 獲取寫鎖,返回一個版本號(戳)
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            // 釋放寫鎖,需要傳入上面獲取的版本號
            sl.unlockWrite(stamp);
        }
    }

    double distanceFromOrigin() {
        // 樂觀讀
        long stamp = sl.tryOptimisticRead();
        double currentX = x, currentY = y;
        // 驗證版本號是否有變化
        if (!sl.validate(stamp)) {
            // 版本號變了,樂觀讀轉悲觀讀
            stamp = sl.readLock();
            try {
                // 重新讀取x、y的值
                currentX = x;
                currentY = y;
            } finally {
                // 釋放讀鎖,需要傳入上面獲取的版本號
                sl.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    void moveIfAtOrigin(double newX, double newY) {
        // 獲取悲觀讀鎖
        long stamp = sl.readLock();
        try {
            while (x == 0.0 && y == 0.0) {
                // 轉為寫鎖
                long ws = sl.tryConvertToWriteLock(stamp);
                // 轉換成功
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                } else {
                    // 轉換失敗
                    sl.unlockRead(stamp);
                    // 獲取寫鎖
                    stamp = sl.writeLock();
                }
            }
        } finally {
            // 釋放鎖
            sl.unlock(stamp);
        }
    }
}

從上面的例子我們可以與ReentrantReadWriteLock進行對比:

(1)寫鎖的使用方式基本一對待;
(2)讀鎖(悲觀)的使用方式可以進行升級,通過tryConvertToWriteLock()方式可以升級為寫鎖;
(3)樂觀讀鎖是一種全新的方式,它假定資料沒有改變,樂觀讀之後處理完業務邏輯再判斷版本號是否有改變,如果沒改變則樂觀讀成功,如果有改變則轉化為悲觀讀鎖重試;

三、原始碼分析

3.1 內部類

static final class WNode {
    // 前一個節點
    volatile WNode prev;
    // 後一個節點
    volatile WNode next;
    // 讀執行緒所用的連結串列(實際是一個棧結果)
    volatile WNode cowait;    // list of linked readers
    // 阻塞的執行緒
    volatile Thread thread;   // non-null while possibly parked
    // 狀態
    volatile int status;      // 0, WAITING, or CANCELLED
    // 讀模式還是寫模式
    final int mode;           // RMODE or WMODE
    WNode(int m, WNode p) { mode = m; prev = p; }
}

佇列中的節點,類似於AQS佇列中的節點,可以看到它組成了一個雙向連結串列,內部維護著阻塞的執行緒。

3.2 主要屬性

// 一堆常量
// 讀執行緒的個數佔有低7位
private static final int LG_READERS = 7;
// 讀執行緒個數每次增加的單位
private static final long RUNIT = 1L;
// 寫執行緒個數所在的位置
private static final long WBIT  = 1L << LG_READERS;  // 128 = 1000 0000
// 讀執行緒個數所在的位置
private static final long RBITS = WBIT - 1L;  // 127 = 111 1111
// 最大讀執行緒個數
private static final long RFULL = RBITS - 1L;  // 126 = 111 1110
// 讀執行緒個數和寫執行緒個數的掩碼
private static final long ABITS = RBITS | WBIT;  // 255 = 1111 1111
// 讀執行緒個數的反數,高25位全部為1
private static final long SBITS = ~RBITS;  // -128 = 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1000 0000

// state的初始值
private static final long ORIGIN = WBIT << 1;  // 256 = 1 0000 0000
// 佇列的頭節點
private transient volatile WNode whead;
// 佇列的尾節點
private transient volatile WNode wtail;
// 儲存著當前的版本號,類似於AQS的狀態變數state
private transient volatile long state;

通過屬性可以看到,這是一個類似於AQS的結構,內部同樣維護著一個狀態變數state和一個CLH佇列。

3.3 構造方法

public StampedLock() {
    state = ORIGIN;
}

state的初始值為ORIGIN(256),它的二進位制是1 0000 0000,也就是初始版本號。

3.4 writeLock()方法

獲取寫鎖。

public long writeLock() {
    long s, next;
    // ABITS = 255 = 1111 1111
    // WBITS = 128 = 1000 0000
    // state與ABITS如果等於0,嘗試原子更新state的值加WBITS
    // 如果成功則返回更新的值,如果失敗呼叫acquireWrite()方法
    return ((((s = state) & ABITS) == 0L &&
             U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
            next : acquireWrite(false, 0L));
}

state等於初始值為例,則state & ABITS的結果為:

此時state為初始狀態,與ABITS與運算後的值為0,所以執行後面的CAS方法,s + WBITS的值為384 = 1 1000 0000

到這裡我們大膽猜測:state的高24位儲存的是版本號,低8位儲存的是是否有加鎖,第8位儲存的是寫鎖,低7位儲存的是讀鎖被獲取的次數,而且如果只有第8位儲存寫鎖的話,那麼寫鎖只能被獲取一次,也就不可能重入了。

到底我們猜測的對不對呢,接著來分析acquireWrite()方法:

private long acquireWrite(boolean interruptible, long deadline) {
    // node為新增節點,p為尾節點(即將成為node的前置節點)
    WNode node = null, p;
    
    // 第一次自旋——入隊
    for (int spins = -1;;) { // spin while enqueuing
        long m, s, ns;
        // 再次嘗試獲取寫鎖
        if ((m = (s = state) & ABITS) == 0L) {
            if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                return ns;
        } else if (spins < 0)
            // 如果自旋次數小於0,則計算自旋的次數
            // 如果當前有寫鎖獨佔且佇列無元素,說明快輪到自己了
            // 就自旋就行了,如果自旋完了還沒輪到自己才入隊
            // 則自旋次數為SPINS常量
            // 否則自旋次數為0
            spins = (m == WBIT && wtail == whead) ? SPINS : 0;
        else if (spins > 0) {
            // 當自旋次數大於0時,當前這次自旋隨機減一次自旋次數
            if (LockSupport.nextSecondarySeed() >= 0)
                --spins;
        } else if ((p = wtail) == null) {
            // 如果佇列未初始化,新建一個空節點並初始化頭節點和尾節點
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        } else if (node == null)
            // 如果新增節點還未初始化,則新建之,並賦值其前置節點為尾節點
            node = new WNode(WMODE, p);
        else if (node.prev != p)
            // 如果尾節點有變化,則更新新增節點的前置節點為新的尾節點
            node.prev = p;
        else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
            // 嘗試更新新增節點為新的尾節點成功,則退出迴圈
            p.next = node;
            break;
        }
    }

    // 第二次自旋——阻塞並等待喚醒
    for (int spins = -1;;) {
        // h為頭節點,np為新增節點的前置節點,pp為前前置節點,ps為前置節點的狀態
        WNode h, np, pp; int ps;
        // 如果頭節點等於前置節點,說明快輪到自己了
        if ((h = whead) == p) {
            if (spins < 0)
                // 初始化自旋次數
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                // 增加自旋次數
                spins <<= 1;
            
            // 第三次自旋,不斷嘗試獲取寫鎖
            for (int k = spins;;) { // spin at head
                long s, ns;
                if (((s = state) & ABITS) == 0L) {
                    if (U.compareAndSwapLong(this, STATE, s,
                                             ns = s + WBIT)) {
                        // 嘗試獲取寫鎖成功,將node設定為新頭節點並清除其前置節點(gc)
                        whead = node;
                        node.prev = null;
                        return ns;
                    }
                }
                // 隨機立減自旋次數,當自旋次數減為0時跳出迴圈再重試
                else if (LockSupport.nextSecondarySeed() >= 0 &&
                         --k <= 0)
                    break;
            }
        } else if (h != null) { // help release stale waiters
            // 這段程式碼很難進來,是用於協助喚醒讀節點的
            // 我是這麼除錯進來的:
            // 起三個寫執行緒,兩個讀執行緒
            // 寫執行緒1獲取鎖不要釋放
            // 讀執行緒1獲取鎖,讀執行緒2獲取鎖(會阻塞)
            // 寫執行緒2獲取鎖(會阻塞)
            // 寫執行緒1釋放鎖,此時會喚醒讀執行緒1
            // 在讀執行緒1裡面先不要喚醒讀執行緒2
            // 寫執行緒3獲取鎖,此時就會走到這裡來了
            WNode c; Thread w;
            // 如果頭節點的cowait連結串列(棧)不為空,喚醒裡面的所有節點
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        
        // 如果頭節點沒有變化
        if (whead == h) {
            // 如果尾節點有變化,則更新
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            } else if ((ps = p.status) == 0)
                // 如果尾節點狀態為0,則更新成WAITING
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                // 如果尾節點狀態為取消,則把它從連結串列中刪除
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            } else {
                // 有超時時間的處理
                long time; // 0 argument to park means no timeout
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    // 已超時,剔除當前節點
                    return cancelWaiter(node, node, false);
                // 當前執行緒
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                // 把node的執行緒指向當前執行緒
                node.thread = wt;
                if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                    whead == h && node.prev == p)
                    // 阻塞當前執行緒
                    U.park(false, time);  // 等同於LockSupport.park()
                    
                // 當前節點被喚醒後,清除執行緒
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                // 如果中斷了,取消當前節點
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

這裡對acquireWrite()方法做一個總結,這個方法裡面有三段自旋邏輯:

第一段自旋——入隊:

(1)如果頭節點等於尾節點,說明沒有其它執行緒排隊,那就多自旋一會,看能不能嘗試獲取到寫鎖;
(2)否則,自旋次數為0,直接讓其入隊;

第二段自旋——阻塞並等待被喚醒 + 第三段自旋——不斷嘗試獲取寫鎖:

(1)第三段自旋在第二段自旋內部;
(2)如果頭節點等於前置節點,那就進入第三段自旋,不斷嘗試獲取寫鎖;
(3)否則,嘗試喚醒頭節點中等待著的讀執行緒;
(4)最後,如果當前執行緒一直都沒有獲取到寫鎖,就阻塞當前執行緒並等待被喚醒;

這麼一大段邏輯看著比較鬧心,其實真正分解下來還是比較簡單的,無非就是自旋,把很多狀態的處理都糅合到一個for迴圈裡面處理了。

3.5 unlockWrite()方法

釋放寫鎖。

public void unlockWrite(long stamp) {
    WNode h;
    // 檢查版本號對不對
    if (state != stamp || (stamp & WBIT) == 0L)
        throw new IllegalMonitorStateException();
    // 這行程式碼實際有兩個作用:
    // 1. 更新版本號加1
    // 2. 釋放寫鎖
    // stamp + WBIT實際會把state的第8位置為0,也就相當於釋放了寫鎖
    // 同時會進1,也就是高24位整體加1了
    state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
    // 如果頭節點不為空,並且狀態不為0,呼叫release方法喚醒它的下一個節點
    if ((h = whead) != null && h.status != 0)
        release(h);
}

private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        // 將其狀態改為0
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
        // 如果頭節點的下一個節點為空或者其狀態為已取消
        if ((q = h.next) == null || q.status == CANCELLED) {
            // 從尾節點向前遍歷找到一個可用的節點
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        // 喚醒q節點所在的執行緒
        if (q != null && (w = q.thread) != null)
            U.unpark(w);
    }
}

寫鎖的釋放過程比較簡單:

(1)更改state的值,釋放寫鎖;
(2)版本號加1
(3)喚醒下一個等待著的節點;

3.6 readLock()方法

獲取讀鎖。

public long readLock() {
    long s = state, next;  // bypass acquireRead on common uncontended case
    // 沒有寫鎖佔用,並且讀鎖被獲取的次數未達到最大值
    // 嘗試原子更新讀鎖被獲取的次數加1
    // 如果成功直接返回,如果失敗呼叫acquireRead()方法
    return ((whead == wtail && (s & ABITS) < RFULL &&
             U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
            next : acquireRead(false, 0L));
}

獲取讀鎖的時候先看看現在有沒有其它執行緒佔用著寫鎖,如果沒有的話再檢測讀鎖被獲取的次數有沒有達到最大,如果沒有的話直接嘗試獲取一次讀鎖,如果成功了直接返回版本號,如果沒成功就呼叫acquireRead()排隊。

下面我們一起來看看acquireRead()方法,這又是一個巨長無比的方法,請保持耐心,我們一步步來分解:

private long acquireRead(boolean interruptible, long deadline) {
    // node為新增節點,p為尾節點
    WNode node = null, p;
    // 第一段自旋——入隊
    for (int spins = -1;;) {
        // 頭節點
        WNode h;
        // 如果頭節點等於尾節點
        // 說明沒有排隊的執行緒了,快輪到自己了,直接自旋不斷嘗試獲取讀鎖
        if ((h = whead) == (p = wtail)) {
            // 第二段自旋——不斷嘗試獲取讀鎖
            for (long m, s, ns;;) {
                // 嘗試獲取讀鎖,如果成功了直接返回版本號
                if ((m = (s = state) & ABITS) < RFULL ?
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
                    // 如果讀執行緒個數達到了最大值,會溢位,返回的是0
                    return ns;
                else if (m >= WBIT) {
                    // m >= WBIT表示有其它執行緒先一步獲取了寫鎖
                    if (spins > 0) {
                        // 隨機立減自旋次數
                        if (LockSupport.nextSecondarySeed() >= 0)
                            --spins;
                    }
                    else {
                        // 如果自旋次數為0了,看看是否要跳出迴圈
                        if (spins == 0) {
                            WNode nh = whead, np = wtail;
                            if ((nh == h && np == p) || (h = nh) != (p = np))
                                break;
                        }
                        // 設定自旋次數
                        spins = SPINS;
                    }
                }
            }
        }
        // 如果尾節點為空,初始化頭節點和尾節點
        if (p == null) { // initialize queue
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        }
        else if (node == null)
            // 如果新增節點為空,初始化之
            node = new WNode(RMODE, p);
        else if (h == p || p.mode != RMODE) {
            // 如果頭節點等於尾節點或者尾節點不是讀模式
            // 當前節點入隊
            if (node.prev != p)
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                p.next = node;
                break;
            }
        }
        else if (!U.compareAndSwapObject(p, WCOWAIT,
                                         node.cowait = p.cowait, node))
            // 接著上一個elseif,這裡肯定是尾節點為讀模式了
            // 將當前節點加入到尾節點的cowait中,這是一個棧
            // 上面的CAS成功了是不會進入到這裡來的
            node.cowait = null;
        else {
            // 第三段自旋——阻塞當前執行緒並等待被喚醒
            for (;;) {
                WNode pp, c; Thread w;
                // 如果頭節點不為空且其cowait不為空,協助喚醒其中等待的讀執行緒
                if ((h = whead) != null && (c = h.cowait) != null &&
                    U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null) // help release
                    U.unpark(w);
                // 如果頭節點等待前前置節點或者等於前置節點或者前前置節點為空
                // 這同樣說明快輪到自己了
                if (h == (pp = p.prev) || h == p || pp == null) {
                    long m, s, ns;
                    // 第四段自旋——又是不斷嘗試獲取鎖
                    do {
                        if ((m = (s = state) & ABITS) < RFULL ?
                            U.compareAndSwapLong(this, STATE, s,
                                                 ns = s + RUNIT) :
                            (m < WBIT &&
                             (ns = tryIncReaderOverflow(s)) != 0L))
                            return ns;
                    } while (m < WBIT); // 只有當前時刻沒有其它執行緒佔有寫鎖就不斷嘗試
                }
                // 如果頭節點未曾改變且前前置節點也未曾改
                // 阻塞當前執行緒
                if (whead == h && p.prev == pp) {
                    long time;
                    // 如果前前置節點為空,或者頭節點等於前置節點,或者前置節點已取消
                    // 從第一個for自旋開始重試
                    if (pp == null || h == p || p.status > 0) {
                        node = null; // throw away
                        break;
                    }
                    // 超時檢測
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        // 如果超時了,取消當前節點
                        return cancelWaiter(node, p, false);
                    
                    // 當前執行緒
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    // 設定進node中
                    node.thread = wt;
                    // 檢測之前的條件未曾改變
                    if ((h != pp || (state & ABITS) == WBIT) &&
                        whead == h && p.prev == pp)
                        // 阻塞當前執行緒並等待被喚醒
                        U.park(false, time);
                    
                    // 喚醒之後清除執行緒
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    // 如果中斷了,取消當前節點
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, p, true);
                }
            }
        }
    }
    
    // 只有第一個讀執行緒會走到下面的for迴圈處,參考上面第一段自旋中有一個break,當第一個讀執行緒入隊的時候break出來的
    
    // 第五段自旋——跟上面的邏輯差不多,只不過這裡單獨搞一個自旋針對第一個讀執行緒
    for (int spins = -1;;) {
        WNode h, np, pp; int ps;
        // 如果頭節點等於尾節點,說明快輪到自己了
        // 不斷嘗試獲取讀鎖
        if ((h = whead) == p) {
            // 設定自旋次數
            if (spins < 0)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
                
            // 第六段自旋——不斷嘗試獲取讀鎖
            for (int k = spins;;) { // spin at head
                long m, s, ns;
                // 不斷嘗試獲取讀鎖
                if ((m = (s = state) & ABITS) < RFULL ?
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                    // 獲取到了讀鎖
                    WNode c; Thread w;
                    whead = node;
                    node.prev = null;
                    // 喚醒當前節點中所有等待著的讀執行緒
                    // 因為當前節點是第一個讀節點,所以它是在佇列中的,其它讀節點都是掛這個節點的cowait棧中的
                    while ((c = node.cowait) != null) {
                        if (U.compareAndSwapObject(node, WCOWAIT,
                                                   c, c.cowait) &&
                            (w = c.thread) != null)
                            U.unpark(w);
                    }
                    // 返回版本號
                    return ns;
                }
                // 如果當前有其它執行緒佔有著寫鎖,並且沒有自旋次數了,跳出當前迴圈
                else if (m >= WBIT &&
                         LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                    break;
            }
        } else if (h != null) {
            // 如果頭節點不等待尾節點且不為空且其為讀模式,協助喚醒裡面的讀執行緒
            WNode c; Thread w;
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        
        // 如果頭節點未曾變化
        if (whead == h) {
            // 更新前置節點及其狀態等
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            else if ((ps = p.status) == 0)
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else {
                // 第一個讀節點即將進入阻塞
                long time;
                // 超時設定
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    // 如果超時了取消當前節點
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                node.thread = wt;
                if (p.status < 0 &&
                    (p != h || (state & ABITS) == WBIT) &&
                    whead == h && node.prev == p)
                    // 阻塞第一個讀節點並等待被喚醒
                    U.park(false, time);
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

讀鎖的獲取過程比較艱辛,一共有六段自旋,讓我們來大致地分解一下:

  1. 讀節點進來都是先判斷是頭節點如果等於尾節點,說明快輪到自己了,就不斷地嘗試獲取讀鎖,如果成功了就返回;
  2. 如果頭節點不等於尾節點,這裡就會讓當前節點入隊,這裡入隊又分成了兩種;
  3. 一種是首個讀節點入隊,它是會排隊到整個佇列的尾部,然後跳出第一段自旋;
  4. 另一種是非第一個讀節點入隊,它是進入到首個讀節點的cowait棧中,所以更確切地說應該是入棧;
  5. 不管是入隊還入棧後,都會再次檢測頭節點是不是等於尾節點了,如果相等,則會再次不斷嘗試獲取讀鎖;
  6. 如果頭節點不等於尾節點,那麼才會真正地阻塞當前執行緒並等待被喚醒;
  7. 上面說的首個讀節點其實是連續的讀執行緒中的首個,如果是兩個讀執行緒中間夾了一個寫執行緒,還是老老實實的排隊。

3.7 unlockRead()方法

釋放讀鎖。

public void unlockRead(long stamp) {
    long s, m; WNode h;
    for (;;) {
        // 檢查版本號
        if (((s = state) & SBITS) != (stamp & SBITS) ||
            (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
            throw new IllegalMonitorStateException();
        // 讀執行緒個數正常
        if (m < RFULL) {
            // 釋放一次讀鎖
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                // 如果讀鎖全部都釋放了,且頭節點不為空且狀態不為0,喚醒它的下一個節點
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                break;
            }
        }
        else if (tryDecReaderOverflow(s) != 0L)
            // 讀執行緒個數溢位檢測
            break;
    }
}

private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        // 將其狀態改為0
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
        // 如果頭節點的下一個節點為空或者其狀態為已取消
        if ((q = h.next) == null || q.status == CANCELLED) {
            // 從尾節點向前遍歷找到一個可用的節點
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        // 喚醒q節點所在的執行緒
        if (q != null && (w = q.thread) != null)
            U.unpark(w);
    }
}

讀鎖釋放的過程就比較簡單了,將state的低7位減1,當減為0的時候說明完全釋放了讀鎖,就喚醒下一個排隊的執行緒。

3.8 tryOptimisticRead()方法

樂觀讀。

public long tryOptimisticRead() {
    long s;
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

如果沒有寫鎖,就返回state的高25位,這裡把寫所在位置一起返回了,是為了後面檢測資料有沒有被寫過。

3.9 validate()方法

檢測樂觀讀版本號是否變化。

public boolean validate(long stamp) {
    // 強制加入記憶體屏障,重新整理資料
    U.loadFence();
    return (stamp & SBITS) == (state & SBITS);
}

檢測兩者的版本號是否一致,與SBITS與操作保證不受讀操作的影響。

四、變異的CLH佇列

StampedLock中的佇列是一種變異的CLH佇列,圖解如下:

五、總結

  1. StampedLock也是一種讀寫鎖,它不是基於AQS實現的;
  2. StampedLock相較於ReentrantReadWriteLock多了一種樂觀讀的模式,以及讀鎖轉化為寫鎖的方法;
  3. StampedLockstate儲存的是版本號,確切地說是高24位儲存的是版本號,寫鎖的釋放會增加其版本號,讀鎖不會;
  4. StampedLock的低7位儲存的讀鎖被獲取的次數,第8位儲存的是寫鎖被獲取的次數;
  5. StampedLock不是可重入鎖,因為只有第8位標識寫鎖被獲取了,並不能重複獲取;
  6. StampedLock中獲取鎖的過程使用了大量的自旋操作,對於短任務的執行會比較高效,長任務的執行會浪費大量CPU
  7. StampedLock不能實現條件鎖;

六、拓展:與ReentrantReadWriteLock的對比

StampedLockReentrantReadWriteLock作為兩種不同的讀寫鎖方式,歸納了它們的大致異同點:

  1. 兩者都有獲取讀鎖、獲取寫鎖、釋放讀鎖、釋放寫鎖的方法,這是相同點;
  2. 兩者的結構基本類似,都是使用state + CLH佇列;
  3. 前者的state分成三段,高24位儲存版本號、低7位儲存讀鎖被獲取的次數、第8位儲存寫鎖被獲取的次數;
  4. 後者的state分成兩段,高16位儲存讀鎖被獲取的次數,低16位儲存寫鎖被獲取的次數;
  5. 前者的CLH佇列可以看成是變異的CLH佇列,連續的讀執行緒只有首個節點儲存在佇列中,其它的節點儲存的首個節點的cowait棧中;
  6. 後者的CLH佇列是正常的CLH佇列,所有的節點都在這個佇列中;
  7. 前者獲取鎖的過程中有判斷首尾節點是否相同,也就是是不是快輪到自己了,如果是則不斷自旋,所以適合執行短任務;
  8. 後者獲取鎖的過程中非公平模式下會做有限次嘗試;
  9. 前者只有非公平模式,一上來就嘗試獲取鎖;
  10. 前者喚醒讀鎖是一次性喚醒連續的讀鎖的,而且其它執行緒還會協助喚醒;
  11. 後者是一個接著一個地喚醒的;
  12. 前者有樂觀讀的模式,樂觀讀的實現是通過判斷state的高25位是否有變化來實現的;
  13. 前者各種模式可以互轉,類似tryConvertToXxx()方法;
  14. 前者寫鎖不可重入,後者寫鎖可重入;
  15. 前者無法實現條件鎖,後者可以實現條件鎖;