同步鎖基本原理與實現
為充分利用機器效能,人們發明了多執行緒。但同時帶來了執行緒安全問題,於是人們又發明了同步鎖。
這個問題自然人人知道,但你真的瞭解同步鎖嗎?還是說你會用其中的上鎖與解鎖功能?
今天我們就一起來深入看同步鎖的原理和實現吧!
一、同步鎖的職責
同步鎖的職責可以說就一個,限制資源的使用(執行緒安全從屬)。
它一般至少會包含兩個功能: 1. 給資源加鎖; 2. 給資源解鎖;另外,它一般還有 等待/通知 即 wait/notify 的功能;
同步鎖的應用場景:多個執行緒同時操作一個事務必須保證正確性;一個資源只能同時由一執行緒訪問操作;一個資源最多隻能接入k的併發訪問;保證訪問的順序性;
同步鎖的實現方式:作業系統排程實現;應用自行實現;CAS自旋;
同步鎖的幾個問題:
為什麼它能保證執行緒安全?
鎖等待耗CPU嗎?
使用鎖後效能下降嚴重的原因是啥?
二、同步鎖的實現一:lock/unlock
其實對於應用層來說,非常多就是 lock/unlock , 這也是鎖的核心。
AQS 是java中很多鎖實現的基礎,因為它遮蔽了很多繁雜而底層的阻塞操作,為上層抽象出易用的介面。
我們就以AQS作為跳板,先來看一下上鎖的過程。為不至於陷入具體鎖的業務邏輯中,我們先以最簡單的 CountDownLatch 看看。
// 先看看 CountDownLatch 的基礎資料結構,可以說是不能再簡單了,就繼承了 AQS,然後簡單覆寫了幾個必要方法。 // java.util.concurrent.CountDownLatch.Sync /** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { // 只有一種情況會獲取鎖成功,即 state == 0 的時候 return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; // 原始的鎖數量是在初始化時指定的不可變的,每次釋放一個鎖標識 int nextc = c-1; if (compareAndSetState(c, nextc)) // 只有一情況會釋放鎖成功,即本次釋放後 state == 0 return nextc == 0; } } } private final Sync sync;
重點1,我們看看上鎖過程,即 await() 的呼叫。
public void await() throws InterruptedException { // 呼叫 AQS 的介面,由AQS實現了鎖的骨架邏輯 sync.acquireSharedInterruptibly(1); } // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly /** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 首先嚐試獲取鎖,如果成功就不用阻塞了 // 而從上面的邏輯我們看到,獲取鎖相當之簡單,所以,獲取鎖本身並沒有太多的效能消耗喲 // 如果獲取鎖失敗,則會進行稍後嘗試,這應該是複雜而精巧的 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } /** * Acquires in shared interruptible mode. * @param arg the acquire argument */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 首先將當前執行緒新增排隊隊尾,此處會保證執行緒安全,稍後我們可以看到 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 獲取其上一節點,如果上一節點是頭節點,就代表當前執行緒可以再次嘗試獲取鎖了 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 先檢測是否需要阻塞,然後再進行阻塞等待,阻塞由 LockSupport 底層支援 // 如果阻塞後,將不會主動喚醒,只會由 unlock 時,主動被通知 // 因此,此處即是獲取鎖的最終等待點 // 作業系統將不會再次排程到本執行緒,直到獲取到鎖 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } // 如此執行緒安全地添加當前執行緒到隊尾? CAS 保證 /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } // 檢測是否需要進行阻塞 /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ // 只有前置節點是 SIGNAL 狀態的節點,才需要進行 阻塞等待,當然前置節點會在下一次迴圈中被設定好 return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } // park 阻塞實現 /** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { // 將當前 AQS 例項作為鎖物件 blocker, 進行作業系統呼叫阻塞, 所以所有等待鎖的執行緒將會在同一個鎖前提下執行 LockSupport.park(this); return Thread.interrupted(); }
如上,上鎖過程是比較簡單明瞭的。加入一佇列,然後由作業系統將執行緒調出。(那麼作業系統是如何把執行緒調出的呢?有興趣自行研究)
重點2. 解鎖過程,即 countDown() 呼叫
public void countDown() { // 同樣直接呼叫 AQS 的介面,由AQS實現了鎖的釋放骨架邏輯 sync.releaseShared(1); } // java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared /** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { // 呼叫業務實現的釋放邏輯,如果成功,再執行底層的釋放,如佇列移除,執行緒通知等等 // 在 CountDownLatch 的實現中,只有 state == 0 時才會成功,所以它只會執行一次底層釋放 // 這也是我們認為 CountDownLatch 能夠做到多執行緒同時執行的效果的原因之一 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } /** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; // 佇列不為空才進行釋放 if (h != null && h != tail) { int ws = h.waitStatus; // 看過上面的 lock 邏輯,我們知道只要在阻塞狀態,一定是 Node.SIGNAL if (ws == Node.SIGNAL) { // 狀態改變成功,才進行後續的喚醒邏輯 // 因為先改變狀態成功,才算是執行緒安全的,再進行喚醒,否則進入下一次迴圈再檢查 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 將頭節點的下一節點喚醒,如有必要 unparkSuccessor(h); } // 這裡的 propagates, 是要傳播啥呢?? // 為什麼只喚醒了一個執行緒,其他執行緒也可以動了? else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } /** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 喚醒下一個節點 // 但如果下一節點已經取消等待了,那麼就找下一個沒最近的沒被取消的執行緒進行喚醒 // 喚醒只是針對一個執行緒的喲 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
重要3. 執行緒解鎖的傳播性?
因為從上一節的講解中,我們看到,當用戶呼叫 countDown 時,僅僅是讓作業系統喚醒了 head 的下一個節點執行緒或者最近未取消的節點。那麼,從哪裡來的所有執行緒都獲取了鎖從而執行呢?
其實是在 獲取鎖的過程中,還有一點我們未看清:
// java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared /** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { // 當countDown被呼叫後,head節點被喚醒,執行 int r = tryAcquireShared(arg); if (r >= 0) { // 獲取到鎖後,設定node為下一個頭節點,並把喚醒狀態傳播下去,而這裡面肯定會做一些喚醒其他執行緒的操作,請看下文 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { // 如果有必要,則做一次喚醒下一執行緒的操作 // 在 countDown() 不會觸發此操作,所以這裡只是一個內部呼叫傳播 Node s = node.next; if (s == null || s.isShared()) // 此處鎖釋放邏輯如上,總之,又是另一次的喚醒觸發 doReleaseShared(); } }
到此,我們明白了它是怎麼做到一個鎖釋放,所有執行緒可通行的。也從根本上回答了我們猜想,所有執行緒同時併發執行。然而並沒有,它只是通過喚醒傳播性來依次喚醒各個等待執行緒的。從絕對時間性上來講,都是有先後關係的。以後可別再淺顯說是同時執行了喲。
三、 鎖的切換:wait/notify
上面看出,針對一個lock/unlock 的過程還是很簡單的,由作業系統負責大頭,實現程式碼也並不多。
但是針對稍微有點要求的場景,就會進行條件式的操作。比如:持有某個鎖執行一段程式碼,但是,執行時發現某條件不滿足,需要進行等待而不能直接結束,直到條件成立。即所謂的 wait 操作。
乍一看,wait/notify 與 lock/unlock 很像,其實不然。區分主要是 lock/unlock 是針對整個程式碼段的,而 wait/notify 則是針對某個條件的,即獲取了鎖不代表條件成立了,但是條件成立了一定要在鎖的前提下才能進行安全操作。
那麼,是否 wait/notify 也一樣的實現簡單呢?比如java的最基礎類 Object 類就提供了 wait/notify 功能。
我們既然想一探究竟,還是以併發包下的實現作為基礎吧,畢竟 java 才是我們的強項。
本次,咱們以 ArrayBlockingQueue#put/take 作為基礎看下這種場景的使用先。
ArrayBlockingQueue 的put/take 特性就是,put當佇列滿時,一直阻塞,直到有可用位置才繼續執行下一步。而take當佇列為空時一樣阻塞,直到佇列裡有資料才執行下一步。這種場景使用鎖主不好搞了,因為這是一個條件判斷。put/take 如下:
// java.util.concurrent.ArrayBlockingQueue#put /** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 當佇列滿時,一直等待 while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } // java.util.concurrent.ArrayBlockingQueue#take public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 當佇列為空時一直等待 while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
看起來相當簡單,完全符合人類思維。只是,這裡使用的兩個變數進行控制流程 notFull,notEmpty. 這兩個變數是如何進行關聯的呢?
在這之前,我們還需要補充下上面的例子,即 notFull.await(), notEmpty.await(); 被阻塞了,何時才能執行呢?如上程式碼在各自的入隊和出隊完成之後進行通知就可以了。
// 與 put 對應,入隊完成後,佇列自然就不為空了,通知下 notEmpty 就好了 /** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // 我已放入一個元素,不為空了 notEmpty.signal(); } // 與 take 對應,出隊完成後,自然就不可能是滿的了,至少一個空餘空間。 /** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // 我已移除一個元素,肯定沒有滿了,你們繼續放入吧 notFull.signal(); return x; }
是不是超級好理解。是的。不過,我們不是想看 ArrayBlockingQueue 是如何實現的,我們是要論清 wait/notify 是如何實現的。因為畢竟,他們不是一個鎖那麼簡單。
// 三個鎖的關係,即 notEmpty, notFull 都是 ReentrantLock 的條件鎖,相當於是其子集吧 /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } // lock.newCondition() 是什麼鬼?它是 AQS 中實現的 ConditionObject // java.util.concurrent.locks.ReentrantLock#newCondition public Condition newCondition() { return sync.newCondition(); } // java.util.concurrent.locks.ReentrantLock.Sync#newCondition final ConditionObject newCondition() { // AQS 中定義 return new ConditionObject(); }
接下來,我們要帶著幾個疑問來看這個 Condition 的物件:
1. 它的 wait/notify 是如何實現的?
2. 它是如何與互相進行聯絡的?
3. 為什麼 wait/notify 必須要在外面的lock獲取之後才能執行?
4. 它與Object的wait/notify 有什麼相同和不同點?
能夠回答了上面的問題,基本上對其原理與實現也就理解得差不多了。
重點1. wait/notify 是如何實現的?
我們從上面可以看到,它是通過呼叫 await()/signal() 實現的,到底做事如何,且看下面。
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await() /** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 添加當前執行緒到 等待執行緒佇列中,有 lastWaiter/firstWaiter 維護 Node node = addConditionWaiter(); // 釋放當前lock中持有的鎖,詳情且看下文 int savedState = fullyRelease(node); // 從以下開始,將不再保證執行緒安全性,因為當前的鎖已經釋放,其他執行緒將會重新競爭鎖使用 int interruptMode = 0; // 迴圈判定,如果當前節點不在 sync 同步佇列中,那麼就反覆阻塞自己 // 所以判斷是否在 同步佇列上,是很重要的 while (!isOnSyncQueue(node)) { // 沒有在同步佇列,阻塞 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 當條件被滿足後,需要重新競爭鎖,詳情看下文 // 競爭到鎖後,原樣返回到 wait 的原點,繼續執行業務邏輯 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 下面是異常處理,忽略 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } /** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * @param node the condition node for this wait * @return previous sync state */ final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // 預期的,都是釋放鎖成功,如果失敗,說明當前執行緒並並未獲取到鎖,引發異常 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { // tryRelease 由客戶端自定義實現 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // 如何判定當前執行緒是否在同步佇列中或者可以進行同步佇列? /** * Returns true if a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue. * @param node the node * @return true if is reacquiring */ final boolean isOnSyncQueue(Node node) { // 如果上一節點還沒有被移除,當前節點就不能被加入到同步佇列 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 如果當前節點的下游節點已經存在,則它自身必定已經被移到同步佇列中 if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ // 最終直接從同步佇列中查詢,如果找到,則自身已經在同步佇列中 return findNodeFromTail(node); } /** * Returns true if node is on sync queue by searching backwards from tail. * Called only when needed by isOnSyncQueue. * @return true if present */ private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } } // 當條件被滿足後,需要重新競爭鎖,以保證外部的鎖語義,因為之前自己已經將鎖主動釋放 // 這個鎖與 lock/unlock 時的一毛一樣,沒啥可講的 // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued /** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
總結一下 wait 的邏輯:
1. 前提:自身已獲取到外部鎖;
2. 將當前執行緒新增到 ConditionQueue 等待佇列中;
3. 釋放已獲取到的鎖;
4. 反覆檢查進入等待,直到當前節點被移動到同步佇列中;
5. 條件滿足被喚醒,重新競爭外部鎖,成功則返回,否則繼續阻塞;(外部鎖是同一個,這也是要求兩個物件必須存在依賴關係的原因)
6. wait前執行緒持有鎖,wait後執行緒持有鎖,沒有一點外部鎖變化;
重點2. 釐清了 wait, 接下來,我們看 signal() 通知喚醒的實現:
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal /** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { // 只有獲取鎖的例項,才可以進行signal,否則你拿什麼去保證執行緒安全呢 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; // 通知 firstWaiter if (first != null) doSignal(first); } /** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { // 最多隻轉移一個 節點 do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } // 將一個節點從 等待佇列 移動到 同步佇列中,即可參與下一輪競爭 // 只有確實移動成功才會返回 true // 說明:當前執行緒是持有鎖的執行緒 // java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal /** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ // 同步佇列由 head/tail 指標維護 Node p = enq(node); int ws = p.waitStatus; // 注意,此處正常情況下並不會喚醒等待執行緒,僅是將佇列轉移。 // 因為當前執行緒的鎖保護區域並未完成,完成後自然會喚醒其他等待執行緒 // 否則將會存在當前執行緒任務還未執行完成,卻被其他執行緒搶了先去,那接下來的任務當如何?? if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
總結一下,notify 的功能原理如下:
1. 前提:自身已獲取到外部鎖;
2. 轉移下一個等待佇列的節點到同步佇列中;
3. 如果遇到下一節點被取消情況,順延到再下一節點直到為空,至多轉移一個節點;
4. 正常情況下不做執行緒的喚醒操作;
所以,實現 wait/notify, 最關鍵的就是維護兩個佇列,等待佇列與同步佇列,而且都要求是在有外部鎖保證的情況下執行。
到此,我們也能回答一個問題:為什麼wait/notify一定要在鎖模式下才能執行?
因為wait是等待條件成立,此時必定存在競爭需要做保護,而它自身又必須釋放鎖以使外部條件可成立,且後續需要做恢復動作;而notify之後可能還有後續工作必須保障安全,notify只是鎖的一個子集。。。
四、通知所有執行緒的實現:notifyAll
有時條件成立後,可以允許所有執行緒通行,這時就可以進行 notifyAll, 那麼如果達到通知所有的目的呢?是一起通知還是??
以下是 AQS 中的實現:
// java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalAll public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } /** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
可以看到,它是通過遍歷所有節點,依次轉移等待佇列到同步佇列(通知)的,原本就沒有人能同時幹幾件事的!
本文從java實現的角度去解析同步鎖的原理與實現,但並不侷限於java。道理總是相通的,只是像作業系統這樣的大佬,能幹的活更純粹:比如讓cpu根本不用排程一個執行緒。
&n