1. 程式人生 > 其它 >Java中公平鎖與非公平鎖的tryAcquire()方法分析

Java中公平鎖與非公平鎖的tryAcquire()方法分析

公平鎖與非公平鎖

如果獲取一個鎖是按照請求的順序得到的,那麼就是公平鎖,否則就是非公平鎖。

公平鎖保證一個阻塞的執行緒最終能夠獲得鎖,因為是有序的,所以總是可以按照請求的順序獲得鎖。非公平鎖意味著後請求鎖的執行緒可能在其前面排列的休眠執行緒恢復前拿到鎖,這樣就有可能提高併發的效能。這是因為通常情況下掛起的執行緒重新開始與它真正開始執行,二者之間會產生嚴重的延時。因此非公平鎖就可以利用這段時間完成操作,這是非公平鎖在某些時候比公平鎖效能要好的原因之一。

公平鎖tryAcquire()方法分析

參考的是java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire

/**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         
*/ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 跟非公平鎖的實現相比,這裡多了!hasQueuedPredecessors()的判斷。Predecessor:前任,!hasQueuedPredecessors()顧名思義,沒有排隊著的前任(執行緒),則當前執行緒排第一
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }

在這段程式碼中,前面說明對於AQS存在一個state來描述當前有多少執行緒持有鎖。由於AQS支援共享鎖(例如讀寫鎖),所以這裡state>=0,但是由於ReentrantLock是獨佔鎖,所以這裡不妨理解為state >= 0,acquires=1。

來理解一下tryAcquire()方法的實現:

  1. c!=0,則說明有其它執行緒持有當前鎖,進行操作2。否則,如果當前執行緒在AQS佇列頭部,則嘗試將AQS狀態state設為acquires(其值等於1),成功後將AQS獨佔執行緒設為當前執行緒返回true,否則進行2。這裡可以看到compareAndSetState就是使用了CAS操作。
  2. 判斷當前執行緒與AQS的獨佔執行緒是否相同,如果相同,那麼就將當前狀態位加1(這裡+1後結果為負數後面會講,這裡暫且不理它),修改狀態位,返回true,否則進行3。這裡之所以不是將當前狀態位設定為1,而是修改為舊值+1呢?這是因為ReentrantLock是可重入鎖,同一個執行緒每持有一次就+1。
  3. 返回false。

看看java.util.concurrent.locks.AbstractQueuedSynchronizer#getState的實現:

/**
     * Returns the current value of synchronization state.
     * This operation has memory semantics of a {@code volatile} read.
     * @return current state value
     */
    protected final int getState() {
        return state;
    }

AbstractQueuedSynchronizer中state的定義如下:

/**
     * The synchronization state.
* state描述的是有多少個執行緒取得了鎖。對於互斥鎖來說,state <= 1
*/ private volatile int state;

這裡再擴充套件以下,講waitStatus,要跟上面的state區分開。AbstractQueuedSynchronizer中有個內部類Node,其中一個屬性是waitStatus:

/** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
* 非負值標識的節點不需要被通知(喚醒) * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes).
*/ volatile int waitStatus;

waitStatus,節點的等待狀態,一個節點可能處於以下幾種狀態:

  • CANCELLED = 1: 節點操作因為超時或者對應的執行緒被interrupt。節點不應該留在此狀態,一旦達到此狀態將從CHL佇列中踢出。
  • SIGNAL = -1: 節點的繼任節點是(或者將要成為)BLOCKED狀態(例如通過LockSupport.park()操作),因此一個節點一旦被釋放(解鎖)或者取消就需要喚醒(LockSupport.unpack())它的繼任節點。
  • CONDITION = -2:表明節點對應的執行緒因為不滿足一個條件(Condition)而被阻塞。
  • 0: 正常狀態,新生的非CONDITION節點都是此狀態。

上面的tryAcquire()方法中,還涉及到hasQueuedPredecessors()方法,看看相關程式碼:

/**
     * Queries whether any threads have been waiting to acquire longer
     * than the current thread.
     *
     * <p>An invocation of this method is equivalent to (but may be
     * more efficient than):
     *  <pre> {@code
     * getFirstQueuedThread() != Thread.currentThread() &&
     * hasQueuedThreads()}</pre>
     *
     * <p>Note that because cancellations due to interrupts and
     * timeouts may occur at any time, a {@code true} return does not
     * guarantee that some other thread will acquire before the current
     * thread.  Likewise, it is possible for another thread to win a
     * race to enqueue after this method has returned {@code false},
     * due to the queue being empty.
     *
     * <p>This method is designed to be used by a fair synchronizer to
     * avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.
     * Such a synchronizer's {@link #tryAcquire} method should return
     * {@code false}, and its {@link #tryAcquireShared} method should
     * return a negative value, if this method returns {@code true}
     * (unless this is a reentrant acquire).  For example, the {@code
     * tryAcquire} method for a fair, reentrant, exclusive mode
     * synchronizer might look like this:
     *
     *  <pre> {@code
     * protected boolean tryAcquire(int arg) {
     *   if (isHeldExclusively()) {
     *     // A reentrant acquire; increment hold count
     *     return true;
     *   } else if (hasQueuedPredecessors()) {
     *     return false;
     *   } else {
     *     // try to acquire normally
     *   }
     * }}</pre>
     *
     * @return {@code true} if there is a queued thread preceding the
     *         current thread, and {@code false} if the current thread
     *         is at the head of the queue or the queue is empty
     * @since 1.7
     */
    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

"Queries whether any threads have been waiting to acquire longerthan the current thread." 結合公平鎖的語義,說白了該方法就是用於判斷當前執行緒是不是嘗試獲取鎖的第一個執行緒。用虛擬碼的話,可以用isFirst(current)來代替!hasQueuedPredecessors()。

非公平鎖tryAcquire()方法分析

參考的是java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire

/**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            // 呼叫了java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire方法
            return nonfairTryAcquire(acquires);
        }
    }

可以看到,具體的實現並沒有在NonfairSync類中,而是在java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire:

/**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();

        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 跟公平鎖的實現相比,這裡沒有!hasQueuedPredecessors()的判斷
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        // 此處刪去其他無關程式碼
    }

非公平鎖不判斷當前節點是否在佇列頭,因此獲取鎖的順序並不等同於請求鎖的順序。

其他的實現同公平鎖。

參考來源

http://www.blogjava.net/xylz/archive/2010/07/06/325390.html

http://www.blogjava.net/xylz/archive/2010/07/07/325410.html

10年前的文章了,感謝強大的博主。