1. 程式人生 > >AbstractQueuedSynchronizer類原始碼解析

AbstractQueuedSynchronizer類原始碼解析

1、transient是Java語言的關鍵字,用來表示一個域不是該物件序列化的一部分。當一個物件被序列化的時候,transient型變數的值不包括在序列化的表示中,然而非transient型的變數是被包括進去的.
即transient修飾的欄位不會通過Serializable進行網路傳輸。
2 該同步器即可以作為排他模式也可以作為共享模式,當它被定義為一個排他模式時,其他執行緒對其的獲取就被阻止,而共享模式對於多個執行緒獲取都可以成功。
3 同步器的實現依賴於一個FIFO佇列

AbstractOwnableSynchronizer exclusiveOwnerThread排他執行緒

2、Node

Node SHARED = new Node() 表示Node處於共享模式
Node EXCLUSIVE = NULL 表示Node處於獨佔模式
int CANCELLED = 1 因為超時或者中斷,Node被設定為取消狀態,被取消的Node不應該去競爭鎖,只能保持取消狀態不變,不能轉換為其他狀態,處於這種狀態的Node會被踢出佇列,被GC回收
int SIGNAL = -1 表示當前節點的後繼節點包含的執行緒需要執行,也就是unpark
int CONDITION = -2 表示當前節點在等待condition,也就是在condition佇列中
int PROPAGATE = -3 表示當前場景下後續的acquireShared能夠得以執行;
0,表示當前節點在sync佇列中,等待著獲取鎖

Node prev 前驅節點
Node next 後繼節點
Node nextWaiter 儲存condition佇列中的後繼節點
Thread thread 入佇列時的當前執行緒

3、AbstractQueuedSynchronizer類
exclusiveOwnerThread 正在執行的執行緒,不為空則不允許其他執行緒獲得鎖資源
Node head 頭節點
Node tail 尾節點
int state 狀態
1:處於佔用狀態, 其他執行緒等待該資源釋放鎖。
0:空閒狀態,其他執行緒可以獲得資源鎖

void unparkSuccessor(Node node) 如果存在的話,喚醒node的繼任者
void doReleaseShared() 將head為waitStatus=-1變為0,並且喚醒head的繼任者。 如果為waitStatus=0則變為-3。
void cancelAcquire(Node node) 取消正在進行的Node獲取鎖的嘗試,將節點狀態設定為取消狀態,後續GC會回收該節點
boolean shouldParkAfterFailedAcquire(Node pred, Node node) 在嘗試獲取鎖失敗後將當前Node的waitStatus置為-1。
void selfInterrupt() 中斷當前執行緒本身
boolean parkAndCheckInterrupt() 禁用當前執行緒,進入等待狀態並中斷執行緒本身
boolean acquireQueued(final Node node, int arg) 自旋,直到當前節點滿足成為頭結點的繼任者,是則獲取鎖,不是等待佇列前面的節點執行緒執行完.

ConditionObject類
Node firstWaiter 條件佇列頭節點
Node lastWaiter 條件佇列尾節點

void unlinkCancelledWaiters() 將waitStatus != Node.CONDITION的所有節點移除
Node addConditionWaiter() 往條件佇列裡添加當前執行緒節點
void awaitUninterruptibly() 非中斷等待,除了singal(),否則一直處於阻塞狀態
void signal() 喚醒condition佇列的頭節點,使其進入sync佇列
void signalAll() 喚醒condition佇列中所有節點,使其進入sync佇列排隊獲得鎖資源
void awaitUninterruptibly() 阻塞當前執行緒,建立節點放入condition佇列尾部,當前執行緒被呼叫interrupt()方法後提前結束,丟擲異常
void await() throws InterruptedException 阻塞當前執行緒,釋放鎖資源,從sync佇列進入condition佇列尾部

LockSupport函式列表
// 返回提供給最近一次尚未解除阻塞的 park 方法呼叫的 blocker 物件,如果該呼叫不受阻塞,則返回 null。
static Object getBlocker(Thread t)
// 為了執行緒排程,禁用當前執行緒,除非許可可用。
static void park()
// 為了執行緒排程,在許可可用之前禁用當前執行緒。
static void park(Object blocker)
// 為了執行緒排程禁用當前執行緒,最多等待指定的等待時間,除非許可可用。
static void parkNanos(long nanos)
// 為了執行緒排程,在許可可用前禁用當前執行緒,並最多等待指定的等待時間。
static void parkNanos(Object blocker, long nanos)
// 為了執行緒排程,在指定的時限前禁用當前執行緒,除非許可可用。
static void parkUntil(long deadline)
// 為了執行緒排程,在指定的時限前禁用當前執行緒,除非許可可用。
static void parkUntil(Object blocker, long deadline)
// 如果給定執行緒的許可尚不可用,則使其可用。
static void unpark(Thread thread)

ReentrantLock類(重入鎖)

private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer 定義了內部抽象靜態類Sync
static final class NonfairSync extends Sync 非公平鎖
static final class FairSync extends Sync 公平鎖

new ReentrantLock() 定義的是非公平鎖, 公平鎖要比不公平鎖的吞吐率更低
public ReentrantLock(true) 定義的是公平鎖
void lock()
非公平鎖立即佔用鎖或者一直競爭鎖資源
公平鎖進入佇列排隊等待獲取鎖資源,開銷很大
void lockInterruptibly()
類似lock()方法,但當其他執行緒呼叫了該執行緒的interrupt()方法會丟擲異常。
boolean tryLock()
鎖資源沒有被其他執行緒持有則持有鎖資源並返回true,否則返回false。
boolean tryLock(long timeout, TimeUnit unit)
單元時間內獲得鎖資源立即返回true,否則超時返回false.
void unlock()
釋放持有的鎖資源
Condition newCondition()
建立Condition物件
boolean isHeldByCurrentThread()
是否當前執行緒持有鎖資源,監控使用
boolean isLocked()
資源是否被某個執行緒持有,監控使用
boolean isFair()
是否是公平鎖
Thread getOwner()
獲取正持有鎖資源的執行緒
boolean hasQueuedThreads()
佇列裡面是否有執行緒等待獲取鎖資源
boolean hasQueuedThread(Thread thread)
thread是否在佇列中等待獲取資源
int getQueueLength()
返回佇列長度
Collection getQueuedThreads()
返回佇列中所有執行緒的集合
boolean hasWaiters(Condition condition)
condition佇列中是否有等待被喚醒的執行緒
getWaitQueueLength(Condition condition)
返回condition佇列中所有等待被喚醒(waitState=-2)執行緒的個數
Collection getWaitingThreads(Condition condition)
返回condition佇列中所有等待被喚醒(waitState=-2)執行緒的集合

ReentrantLock與synchronize的區別
synchronize
受保護的程式碼丟擲異常,JVM 將確保鎖會獲得自動釋放
它無法中斷一個正在等候獲得鎖的執行緒,也無法通過投票得到鎖,如果不想等下去,也就沒法得到鎖
ReentrantLock:
ReentrantLock 類實現了 Lock ,它擁有與 synchronized 相同的併發性和記憶體語義,但是添加了類似鎖投票、定時鎖等候和可中斷鎖等候的一些特性
激烈爭用情況下更佳的效能(當許多執行緒都想訪問共享資源時,JVM 可以花更少的時候來排程執行緒,把更多時間用在執行執行緒上)
lock 必須在 finally 塊中釋放,受保護的程式碼丟擲異常,不會釋放鎖資源
Lock包含了Condition物件,每個condition都有自己的佇列,所以通過不同的condition喚醒特定的執行緒。執行condition之前必須先持有鎖資源。

吞吐率:Lock非公平鎖 > synchronize > Lock公平鎖

使用鎖,如果一個執行緒沒有獲取到鎖資源,那麼它是不是一直在競爭鎖資源,

每個condition都包含了一個condition佇列

有2個佇列,一個是waitStatus=-2的條件佇列,另一個是waitStatus!=-2的sync佇列

m<

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691L;

    protected AbstractQueuedSynchronizer() { }

    static final class Node {
        /**標記Node處於共享模式 */
        static final Node SHARED = new Node();
        /** 標記Node處於獨佔模式 */
        static final Node EXCLUSIVE = null;

        /** 因為超時或者中斷,Node被設定為取消狀態,被取消的Node不應該去競爭鎖,只能保持取消狀態不變,不能轉換為其他狀態,處於這種狀態的Node會被踢出佇列,被GC回收 */
        static final int CANCELLED =  1;
        /** 表示當前節點的後繼節點包含的執行緒需要執行,也就是unpark */
        static final int SIGNAL    = -1;
        /** 表示當前節點在等待condition,也就是在condition佇列中  */
        static final int CONDITION = -2;
        /**
         * 表示當前場景下後續的acquireShared能夠得以執行;
         */
        static final int PROPAGATE = -3;

        /**
         * 下列值中的一個:
         *   SIGNAL:     繼任者處於阻塞狀態, 所以當前節點被釋放或者取消後應該喚醒繼任者節點執行緒, acquire 方法必須首先指定為signal,
         *               然後重試原子獲取鎖。該狀態節點在佇列中等待獲取鎖資源
         *   CANCELLED:  因為超時或者interrupt處於該狀態.該節點狀態永遠不會被改變,一直到被移除.
         *   CONDITION:  該節點處於 condition佇列中.當被喚醒後置為0進入同步佇列
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          其他,初始化值
         */
        volatile int waitStatus;

        volatile Node prev;
        volatile Node next;

         // 入佇列時的當前執行緒
        volatile Thread thread;

         // 儲存condition佇列中的後繼節點,或者特殊的共享節點
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * 返回前驅節點
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

    //頭結點
    private transient volatile Node head;

    //尾節點
    private transient volatile Node tail;

    //鎖狀態
    private volatile int state;

    protected final int getState() {
        return state;
    }

    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * CAS原子操作修改狀態值
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    /**
     * 在park執行緒之前,先自旋spinForTimeoutThreshold的時間,一個粗略的估計足以在非常短的超時中提高響應能力
     */
    static final long spinForTimeoutThreshold = 1000L;

    /**
     * 入佇列
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            //為空初始化佇列
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    /**
     * 為當前執行緒給定模式並建立和成為佇列尾節點。
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    /**
     * 設定頭結點
     */
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

    //喚醒繼承者節點執行緒
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * 喚醒繼承者節點執行緒,如果next節點為null或者為取消狀態,則從尾節點依次往前查詢繼任者節點
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

    /**
     * 為共享模式釋放操作——通知繼任者,並確保傳播。(注意:對於獨佔模式,如果需要訊號的話,釋放就相當於呼叫unparkSuccessor需要通知的頭節點。)
     */
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

    /**
     * 將當期節點狀態設定為取消狀態,並從佇列中移除當前節點
     */
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        Node predNext = pred.next;

        node.waitStatus = Node.CANCELLED;

        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

    /**
     * 檢查並更新狀態
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            /*
             * 前驅為取消狀態,則找到非取消狀態的前驅者,並重試獲取鎖
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 狀態為0或PROPAGATE,表示當前節點需要執行,重試獲取鎖
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    /**
     * Convenience method to interrupt current thread.
     */
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

    /**
     * 停止當前執行緒執行,執行緒被中斷返回true
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }


    /**
     * 自旋判斷是否該自己執行,直到獲得鎖成功
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //當前驅節點是頭結點並且能夠獲取狀態,代表該當前節點佔有鎖
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //如果沒有輪到當前節點執行,那麼將當前執行緒從執行緒排程器上摘下,也就是進入等待狀態。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * Acquires in exclusive interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * Acquires in exclusive timed mode.
     *
     * @param arg the acquire argument
     * @param nanosTimeout max wait time
     * @return {@code true} if acquired
     */
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        //在獲取共享狀態失敗後,當前時刻有可能是獨佔鎖被其他執行緒所把持,那麼將當前執行緒構造成為節點(共享模式)加入到sync佇列中
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //判斷後繼節點是否是共享模式,如果是共享模式,那麼就直接對其進行喚醒操作,也就是同時激發多個執行緒併發的執行。
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //判斷後繼節點是否是共享模式,如果是共享模式,那麼就直接對其進行喚醒操作,也就是同時激發多個執行緒併發的執行。
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //通過使用LockSupport將當前執行緒從執行緒排程器上摘下,進入休眠狀態
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /**
     * Acquires in shared timed mode.
     *
     * @param arg the acquire argument
     * @param nanosTimeout max wait time
     * @return {@code true} if acquired
     */
    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    // Main exported methods

    /**
     * 由繼承類實現
     */
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 由繼承類實現
     */
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 子類實現,獲取共享鎖,如共享讀鎖
     */
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 子類實現,釋放共享鎖
     */
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 子類實現,判斷是否為獨佔鎖
     */
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

    /**
     * 獲取獨佔模式鎖
     */
    public final void acquire(int arg) {
        //嘗試獲取(呼叫tryAcquire更改狀態,需要保證原子性)
        //如果獲取不到,將當前執行緒構造成節點Node並加入sync佇列;
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    /**
     * 同acquire()方法,但當Thread.interrupt()時丟擲異常,並取消當前執行緒的執行
     */
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

    /**
     * 1 單位時間內獲得鎖成功則返回true,
     * 2 超時返回false
     * 3 中斷丟擲異常
     */
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

    /**
     * 釋放獨佔鎖
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    /**
     * ReentrantReadWriteLock中的讀鎖會呼叫
     * 當tryAcquireShared()方法小於0時,那麼會執行doAcquireShared方法將該執行緒加入到等待佇列中
     */
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    /**
     * 參考acquireShared
     */
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }

    /**
     * 釋放一個共享鎖
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }


    /**
     * 判斷當前節點是否在Sysn佇列中
     * 1 waitStatus=CONDITION在 condition佇列中,返回false
     * 2 node.prev == null返回false
     * 3 node.next != null返回true
     * 4 從佇列尾節點一直找到頭結點,找到當前節點返回true
     */
    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;

        return findNodeFromTail(node);
    }

    /**
     * 從佇列尾節點一直找到頭結點,找到當前節點返回true
     */
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

    /**
     * 將節點從condition佇列轉移到sync佇列
     */
    final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

    final boolean transferAfterCancelledWait(Node node) {
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

    /**
     * 釋放當前狀態的鎖
     */
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }


    /**
     * 條件實體,包含自己的佇列
     */
    public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

        public ConditionObject() { }

        /**
         * 將當前執行緒加入該condition佇列
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

        /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

        /**
         * Removes and transfers all nodes.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

        /**
         * 移除condition佇列中被喚醒的節點
         */
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

        // public methods

        /**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

        /**
         * Moves all threads from the wait queue for this condition to
         * the wait queue for the owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }

        /**
         * Implements uninterruptible condition wait.
         * <ol>
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * </ol>
         */
        public final void awaitUninterruptibly() {
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean interrupted = false;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if (Thread.interrupted())
                    interrupted = true;
            }
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt();
        }

        /*
         * For interruptible waits, we need to track whether to throw
         * InterruptedException, if interrupted while blocked on
         * condition, versus reinterrupt current thread, if
         * interrupted while blocked waiting to re-acquire.
         */

        /** Mode meaning to reinterrupt on exit from wait */
        private static final int REINTERRUPT =  1;
        /** Mode meaning to throw InterruptedException on exit from wait */
        private static final int THROW_IE    = -1;

        /**
         * Checks for interrupt, returning THROW_IE if interrupted
         * before signalled, REINTERRUPT if after signalled, or
         * 0 if not interrupted.
         */
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

        /**
         * Throws InterruptedException, reinterrupts current thread, or
         * does nothing, depending on mode.
         */
        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

        /**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

        /**
         * Implements timed condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled, interrupted, or timed out.
         *