JAVA互斥鎖(synchronized&Lock):行為分析及原始碼
JVM中有這樣一段註釋:
// The base-class, PlatformEvent, is platform-specific while the ParkEvent is // platform-independent. PlatformEvent provides park(), unpark(), etc., and // is abstract -- that is, a PlatformEvent should never be instantiated except // as part of a ParkEvent. // Equivalently we could have defined a platform-independent base-class that // exported Allocate(), Release(), etc. The platform-specific class would extend // that base-class, adding park(), unpark(), etc. // // A word of caution: The JVM uses 2 very similar constructs: // 1. ParkEvent are used for Java-level "monitor" synchronization. // 2. Parkers are used by JSR166-JUC park-unpark. // // We'll want to eventually merge these redundant facilities and use ParkEvent.
其中是說ParkEvent用於Java語言級別的關鍵字synchronized。 Parkers用於Java類庫中的併發資料集合,該集合是由JSR166發展來的。 這裡說這兩個東西功能類似,將來會統一使用ParkEvent。 那麼它們究竟有什麼區別呢? 我們先看看這兩個類的大概介面樣子: (ParkEvent)
class ParkEvent : public os::PlatformEvent { private: ParkEvent * FreeNext ; // Current association Thread * AssociatedWith ; intptr_t RawThreadIdentity ; // LWPID etc volatile int Incarnation ; class PlatformEvent : public CHeapObj<mtInternal> { // Use caution with reset() and fired() -- they may require MEMBARs void reset() { _Event = 0 ; } int fired() { return _Event; } void park () ; void unpark () ; int TryPark () ; int park (jlong millis) ; // relative timed-wait only
class Parker : public os::PlatformParker { public: // For simplicity of interface with Java, all forms of park (indefinite, // relative, and absolute) are multiplexed into one call. void park(bool isAbsolute, jlong time); void unpark(); // Lifecycle operators static Parker * Allocate (JavaThread * t) ; static void Release (Parker * e) ; private: static Parker * volatile FreeList ; static volatile int ListLock ;
可以看到它們提供一致的相同介面,park和unpark。從而支撐Java中併發控制的功能。 它們究竟有什麼不同呢?我們首先來執行2段類似的程式碼。 阻塞執行緒獲取鎖的順序完全相反 首先是使用synchronized提供的鎖機制,我們隨便用一個Object lock = new Object()作為鎖關聯的物件,程式碼如下,它的功能是讓10個執行緒進入阻塞狀態,然後釋放鎖,觀察隨後執行緒獲取鎖的順序: (可執行程式碼)
package com.psly.testLocks; public class TestLockSynchronized { private static Object lock = new Object(); public static void main(String[] args) throws InterruptedException { // TODO Auto-generated method stub int N = 10; Thread[] threads = new Thread[N]; for(int i = 0; i < N; ++i){ threads[i] = new Thread(new Runnable(){ public void run() { synchronized(lock){ System.out.println(Thread.currentThread().getName() + " get synch lock!"); try { Thread.sleep(200); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); } synchronized(lock){ for(int i = 0; i < N; ++i){ threads[i].start(); Thread.sleep(200); } } for(int i = 0; i < N; ++i) threads[i].join(); } }
我們用一個0.2seconds的時間,從而讓先建立的執行緒能夠先進入阻塞狀態,輸出為:
Thread-9 get synch lock! Thread-8 get synch lock! Thread-7 get synch lock! Thread-6 get synch lock! Thread-5 get synch lock! Thread-4 get synch lock! Thread-3 get synch lock! Thread-2 get synch lock! Thread-1 get synch lock! Thread-0 get synch lock!
這有點奇怪,先嚐試獲取鎖的執行緒竟然後獲得鎖! 先不管這個, 我們把這個例子改為JSR166的Lock重做一遍: (可執行程式碼)
package com.psly.testLocks; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestLockSynchronized { private static Lock lock = new ReentrantLock(); public static void main(String[] args) throws InterruptedException { // TODO Auto-generated method stub int N = 10; Thread[] threads = new Thread[N]; for(int i = 0; i < N; ++i){ threads[i] = new Thread(new Runnable(){ public void run() { lock.lock(); System.out.println(Thread.currentThread().getName() + " get JSR166 lock!"); try { Thread.sleep(200); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } lock.unlock(); } }); } lock.lock(); for(int i = 0; i < N; ++i){ threads[i].start(); Thread.sleep(200); } lock.unlock(); for(int i = 0; i < N; ++i) threads[i].join(); } }
輸出為:
Thread-0 get JSR166 lock! Thread-1 get JSR166 lock! Thread-2 get JSR166 lock! Thread-3 get JSR166 lock! Thread-4 get JSR166 lock! Thread-5 get JSR166 lock! Thread-6 get JSR166 lock! Thread-7 get JSR166 lock! Thread-8 get JSR166 lock! Thread-9 get JSR166 lock!
這個輸出比較符合了我們的預期,畢竟先嚐試獲取鎖的的確先獲取了鎖。 為什麼這兩種實現有這樣的差異呢,我們來看下他們分別的阻塞佇列實現,首先是JAVA的:
public void lock() { sync.lock(); } final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
我們這裡重點檢視addWaiter(Node node);可以看出來,執行緒構造的阻塞節點是通過tail欄位加入進佇列的,並且作為next節點。這是個先進先出雙向佇列。 所以當鎖被釋放時,阻塞執行緒獲取鎖的順序與進阻塞佇列是一致的。 我們接著看下synchronized的實現,這裡涉及到JVM中系統程式設計的原始碼,這裡也只貼出跟進入阻塞佇列相關的程式碼:
class ObjectMonitor; class ObjectSynchronizer : AllStatic { static void fast_enter (Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS); static void slow_enter (Handle obj, BasicLock* lock, TRAPS); void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) { if (UseBiasedLocking) { if (!SafepointSynchronize::is_at_safepoint()) { BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD); if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) { return; } } else { assert(!attempt_rebias, "can not rebias toward VM thread"); BiasedLocking::revoke_at_safepoint(obj); } assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now"); } slow_enter (obj, lock, THREAD) ; } void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) { markOop mark = obj->mark(); assert(!mark->has_bias_pattern(), "should not see bias pattern here"); ................ ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD); } void ATTR ObjectMonitor::enter(TRAPS) { // The following code is ordered to check the most common cases first // and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors. Thread * const Self = THREAD ; void * cur ; ········· for (;;) { jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() // or java_suspend_self() EnterI (THREAD) ; if (!ExitSuspendEquivalent(jt)) break ; // // We have acquired the contended monitor, but while we were // waiting another thread suspended us. We don't want to enter // the monitor while suspended because that would surprise the // thread that suspended us. // _recursions = 0 ; ······ } void ATTR ObjectMonitor::EnterI (TRAPS) { Thread * Self = THREAD ; assert (Self->is_Java_thread(), "invariant") ; assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ; // Try the lock - TATAS if (TryLock (Self) > 0) { ...... } if (TrySpin (Self) > 0) { ...... } ObjectWaiter node(Self) ; Self->_ParkEvent->reset() ; node._prev = (ObjectWaiter *) 0xBAD ; node.TState = ObjectWaiter::TS_CXQ ; // Push "Self" onto the front of the _cxq. // Once on cxq/EntryList, Self stays on-queue until it acquires the lock. // Note that spinning tends to reduce the rate at which threads // enqueue and dequeue on EntryList|cxq. ObjectWaiter * nxt ; for (;;) { node._next = nxt = _cxq ; if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ; // Interference - the CAS failed because _cxq changed. Just retry. // As an optional optimization we retry the lock. if (TryLock (Self) > 0) { assert (_succ != Self , "invariant") ; assert (_owner == Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } }
這裡的重點是,ObjectWaiter node(Self)
ObjectWaiter node(Self) ; ........ for (;;) { node._next = nxt = _cxq ; if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ; // Interference - the CAS failed because _cxq changed. Just retry. // As an optional optimization we retry the lock. if (TryLock (Self) > 0) { assert (_succ != Self , "invariant") ; assert (_owner == Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } }
_cxq,我們採用比較並交換的原子指令,修改了_cxq,修改之前將_cxq的舊值填入node的next欄位,這樣一來我們就在_cxq上構造了個stack,也就是先進後出的佇列。於是下次當我們索取_cxq時候自然就取得了最後填入的值。這解釋了我們上面的執行示例,阻塞執行緒獲取鎖的順序與進佇列完全相反。 我們接著看下再複雜點的例子,依次啟動10個執行緒,依次獲取鎖,獲得鎖的同時列印自身資訊,然後主動呼叫wait語義的方法陷入阻塞狀態。等到這10個執行緒都阻塞之後主執行緒獲取鎖,接著再啟動10個無等待執行緒,這是個執行緒唯一做的事情就是依次獲取鎖,他們會按照我們上面所說的方式進入阻塞佇列。接著主執行緒依次傳送4次notify語義的訊號(注意時間間隔),然後釋放鎖。我們感興趣的是這幾個收到通知的執行緒,他們相對已經在阻塞佇列中的執行緒,誰會先獲取鎖?他們的排列又是怎麼樣的呢? 我們先執行JSR166的版本,程式碼如下: (可執行程式碼)
package com.psly.testLocks; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestLockSynchronized { private static Lock lock = new ReentrantLock(); private static Condition condition = lock.newCondition(); public static void main(String[] args) throws InterruptedException { int N = 10; Thread[] threads = new Thread[N]; Thread[] threadsForWaits = new Thread[N]; for(int i = 0; i < N; ++i){ threads[i] = new Thread(new Runnable(){ @Override public void run() { lock.lock(); System.out.println(Thread.currentThread().getName() + " nowait get lock"); try { Thread.sleep(200); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } lock.unlock(); } }); } for(int i = 0; i < N; ++i){ threadsForWaits[i] = new Thread(new Runnable(){ @Override public void run() { // TODO Auto-generated method stub lock.lock(); //synchronized(lock){ System.out.println(Thread.currentThread().getName() + " wait first get lock"); try { condition.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " wait second get lock"); try { Thread.sleep(200); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } lock.unlock(); } }); } for(int i = 0; i < N; ++i){ threadsForWaits[i].start(); Thread.sleep(200); } lock.lock(); //synchronized(lock){ for(int i = 0; i < N; ++i){ threads[i].start(); Thread.sleep(200); } for(int i = 0; i < 4 ; ++i){ condition.signal(); } Thread.sleep(200); lock.unlock(); for(int i = 0; i < N; ++i) threads[i].join(); for(int i = 0; i < N; ++i) threadsForWaits[i].join(); } }
Thread-10到Thread-19為主動呼叫wait阻塞的執行緒,Thread-0到Thread-9為只獲取鎖的執行緒。 輸出為:
Thread-10 wait first get lock Thread-11 wait first get lock Thread-12 wait first get lock Thread-13 wait first get lock Thread-14 wait first get lock Thread-15 wait first get lock Thread-16 wait first get lock Thread-17 wait first get lock Thread-18 wait first get lock Thread-19 wait first get lock Thread-0 nowait get lock Thread-1 nowait get lock Thread-2 nowait get lock Thread-3 nowait get lock Thread-4 nowait get lock Thread-5 nowait get lock Thread-6 nowait get lock Thread-7 nowait get lock Thread-8 nowait get lock Thread-9 nowait get lock Thread-10 wait second get lock Thread-11 wait second get lock Thread-12 wait second get lock Thread-13 wait second get
可以看到JSR166的實現依然滿足先進先出,即使Thread-10到Thread-13是先獲取鎖之後陷入wait的。我們接著看下這是如何做到的, 注意JSR166的實現是在JAVA層面完成的。 主要是三個呼叫:wait,notify,unlock。 await:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
這裡的addConditionWaiter嘗試新增等待佇列的節點。acquireQueued用於將來被喚醒之後的再次嘗試獲取鎖。 我們來看addConditionWaiter,
/** * Adds a new waiter to wait queue. * @return its new wait node */ private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
是將新節點作為lastWaiter的next節點,並且本身成為lastWaiter節點。那麼這裡說明這構造的是一個先進先出的佇列。(這裡是在已經獲取鎖的情況下,所以不需同步) 我們接著看 signal
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
以上已經拿到了等待佇列第一個節點,接著enq讓他轉移(transfer)到鎖的阻塞佇列
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
這樣一來我們就完成了將等待執行緒從處於wait的狀態,轉移到了未獲得鎖處於阻塞的狀態。 最後看下當主執行緒釋放鎖時的操作: unlock
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
可以看到,呼叫unlock的執行緒喚醒了阻塞佇列head中的第一個執行緒。 (OVER) 因為阻塞佇列跟等待佇列都是先進先出,這樣子能夠得到一個比較好的行為。 從而導致了我們之前的輸出,看上去比較符合預期。 最後我們來看看採用synchronized,這個示例下的輸出是什麼,程式碼如下: (可執行程式碼)
package com.psly.testLocks; public class TestLockSynchronized { private static Object lock = new Object(); public static void main(String[] args) throws InterruptedException { int N = 10; Thread[] threads = new Thread[N]; Thread[] threadsForWaits = new Thread[N]; for(int i = 0; i < N; ++i){ threads[i] = new Thread(new Runnable(){ @Override public void run() { synchronized(lock){ System.out.println(Thread.currentThread().getName() + " nowait get lock"); try { Thread.sleep(200); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); } for(int i = 0; i < N; ++i){ threadsForWaits[i] = new Thread(new Runnable(){ @Override public void run() { // TODO Auto-generated method stub synchronized(lock){ System.out.println(Thread.currentThread().getName() + " wait first get lock"); try { lock.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " wait second get lock"); try { Thread.sleep(200); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); } for(int i = 0; i < N; ++i){ threadsForWaits[i].start(); Thread.sleep(200); } synchronized(lock){ for(int i = 0; i < N; ++i){ threads[i].start(); Thread.sleep(200); } for(int i = 0; i < 4 ; ++i){ lock.notify(); } Thread.sleep(200); } for(int i = 0; i < N; ++i) threads[i].join(); for(int i = 0; i < N; ++i) threadsForWaits[i].join(); } }
輸出入下:
Thread-10 wait first get lock Thread-11 wait first get lock Thread-12 wait first get lock Thread-13 wait first get lock Thread-14 wait first get lock Thread-15 wait first get lock Thread-16 wait first get lock Thread-17 wait first get lock Thread-18 wait first get lock Thread-19 wait first get lock Thread-10 wait second get lock Thread-13 wait second get lock Thread-12 wait second get lock Thread-11 wait second get lock Thread-9 nowait get lock Thread-8 nowait get lock Thread-7 nowait get lock Thread-6 nowait get lock Thread-5 nowait get lock Thread-4 nowait get lock Thread-3 nowait get lock Thread-2 nowait get lock Thread-1 nowait get lock Thread-0 nowait get lock 這個結果很奇怪,最奇怪在於居然是連續輸出Thread-10,Thread-13,Thread-12,Thread-11: Thread-10 wait second get lock Thread-13 wait second get lock Thread-12 wait second get lock Thread-11 wait second get lock
我們調整一下發送notify的數量,給出所有等待執行緒數量的呼叫N。
for(int i = 0; i < N ; ++i){ lock.notify(); }
輸出為:
Thread-10 wait first get lock Thread-11 wait first get lock Thread-12 wait first get lock Thread-13 wait first get lock Thread-14 wait first get lock Thread-15 wait first get lock Thread-16 wait first get lock Thread-17 wait first get lock Thread-18 wait first get lock Thread-19 wait first get lock Thread-10 wait second get lock Thread-19 wait second get lock Thread-18 wait second get lock Thread-17 wait second get lock Thread-16 wait second get lock Thread-15 wait second get lock Thread-14 wait second get lock Thread-13 wait second get lock Thread-12 wait second get lock Thread-11 wait second get lock Thread-9 nowait get lock Thread-8 nowait get lock Thread-7 nowait get lock Thread-6 nowait get lock Thread-5 nowait get lock Thread-4 nowait get lock Thread-3 nowait get lock Thread-2 nowait get lock Thread-1 nowait get lock Thread-0 nowait get lock
依然是Thread-10莫名其妙出現在最前,後面緊接著Thread-19到Thread-11倒序。 我們再嘗試換下呼叫方式,採用notifyAll();
synchronized(lock){ for(int i = 0; i < N; ++i){ threads[i].start(); Thread.sleep(200); } lock.notifyAll(); Thread.sleep(200); }
輸出為:
Thread-10 wait first get lock Thread-11 wait first get lock Thread-12 wait first get lock Thread-13 wait first get lock Thread-14 wait first get lock Thread-15 wait first get lock Thread-16 wait first get lock Thread-17 wait first get lock Thread-18 wait first get lock Thread-19 wait first get lock Thread-19 wait second get lock Thread-18 wait second get lock Thread-17 wait second get lock Thread-16 wait second get lock Thread-15 wait second get lock Thread-14 wait second get lock Thread-13 wait second get lock Thread-12 wait second get lock Thread-11 wait second get lock Thread-10 wait second get lock Thread-9 nowait get lock Thread-8 nowait get lock Thread-7 nowait get lock Thread-6 nowait get lock Thread-5 nowait get lock Thread-4 nowait get lock Thread-3 nowait get lock Thread-2 nowait get lock Thread-1 nowait get lock Thread-0 nowait get lock
這下子又變了,Thread-10變為最後,完全逆序來獲取鎖了。 我們嘗試進入JVM去看下這一切是怎麼回事。與之前的過程類似,首先我們來看看等待之後執行緒節點如何組織的: 經研究,應該是下面這片程式碼: WAIT:
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) { Thread * const Self = THREAD ; TEVENT (Wait) ; assert (Self->_Stalled == 0, "invariant") ; Self->_Stalled = intptr_t(this) ; jt->set_current_waiting_monitor(this); ObjectWaiter node(Self); node.TState = ObjectWaiter::TS_WAIT ; Self->_ParkEvent->reset() ; OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag // Enter the waiting queue, which is a circular doubly linked list in this case // but it could be a priority queue or any data structure. // _WaitSetLock protects the wait queue. Normally the wait queue is accessed only // by the the owner of the monitor *except* in the case where park() // returns because of a timeout of interrupt. Contention is exceptionally rare // so we use a simple spin-lock instead of a heavier-weight blocking lock. Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ; AddWaiter (&node) ; Thread::SpinRelease (&_WaitSetLock) ; inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) { assert(node != NULL, "should not dequeue NULL node"); assert(node->_prev == NULL, "node already in list"); assert(node->_next == NULL, "node already in list"); // put node at end of queue (circular doubly linked list) if (_WaitSet == NULL) { _WaitSet = node; node->_prev = node; node->_next = node; } else { ObjectWaiter* head = _WaitSet ; ObjectWaiter* tail = head->_prev; assert(tail->_next == head, "invariant check"); tail->_next = node; head->_prev = node; node->_next = head; node->_prev = tail; } }
如果_WaitSet為空,則設定它,並且前驅和後繼都是它。 如果只有_WaitSet一個,則將節點增加到它的後面。 不為空的情況下統一新增到next節點。 所以這裡是個先進先出的佇列。 notify
void ObjectMonitor::notify(TRAPS) { CHECK_OWNER(); if (_WaitSet == NULL) { TEVENT (Empty-Notify) ; return ; } DTRACE_MONITOR_PROBE(notify, this, object(), THREAD); int Policy = Knob_MoveNotifyee ; Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ; ObjectWaiter * iterator = DequeueWaiter() ; inline ObjectWaiter* ObjectMonitor::DequeueWaiter() { // dequeue the very first waiter ObjectWaiter* waiter = _WaitSet; if (waiter) { DequeueSpecificWaiter(waiter); } return waiter; } inline ObjectWaiter* ObjectMonitor::DequeueWaiter() { // dequeue the very first waiter ObjectWaiter* waiter = _WaitSet; if (waiter) { DequeueSpecificWaiter(waiter); } return waiter; } inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) { assert(node != NULL, "should not dequeue NULL node"); assert(node->_prev != NULL, "node already removed from list"); assert(node->_next != NULL, "node already removed from list"); // when the waiter has woken up because of interrupt, // timeout or other spurious wake-up, dequeue the // waiter from waiting list ObjectWaiter* next = node->_next; if (next == node) { assert(node->_prev == node, "invariant check"); _WaitSet = NULL; } else { ObjectWaiter* prev = node->_prev; assert(prev->_next == node, "invariant check"); assert(next->_prev == node, "invariant check"); next->_prev = prev; prev->_next = next; if (_WaitSet == node) { _WaitSet = next; } } node->_next = NULL; node->_prev = NULL; } static int Knob_MoveNotifyee = 2 ; // notify() - disposition of notifyee if (Policy == 2) { // prepend to cxq // prepend to cxq if (List == NULL) { iterator->_next = iterator->_prev = NULL ; _EntryList = iterator ; } else { iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Front = _cxq ; iterator->_next = Front ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) { break ; } } } } else if (Policy == 3) { // append to cxq iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Tail ; Tail = _cxq ; if (Tail == NULL) {
這裡的DequeueWaiter呼叫DequeueSpecificWaiter,效果是隊列出一個元素,_WaitSet.next成為_WaitSet。這裡有_EntryList、_cxq兩個資料結構。 接著我們走Policy==2分支,注意這裡並不是全部放入cxq(儘管註釋如此),判斷是_EntryList==NULL的時候,直接將我們的節點放入它。否則,將我們的節點新增到_cxq這個stack前面。想象一個,假如第一個節點進來,發現_EntryList為空,_EntryList設定為它自己。從第二個節點開始,所有節點都是進stack,這樣的話是不是取出時,第二個往後的節點都顛倒了呢。假如我們取節點的方式是先驅_EntryList,然後再取stack中的元素。則就會發生示例中Thread-10提前的亂序情況。 但是注意,之前的notifyAll並沒有產生這種效果。所以我們來看下notifyAll的程式碼:
if (Policy == 2) { // prepend to cxq // prepend to cxq iterator->TState = ObjectWaiter::TS_CXQ ; for (;;) { ObjectWaiter * Front = _cxq ; iterator->_next = Front ; if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) { break ; } } } else
果然如此!notifyAll的邏輯跟notify大部分一樣,除了它將所有節點都加入cxq。所以我們才會觀察到notifyAll呼叫之後的節點獲取鎖順序是逆序。 unlock 我們接著看看unlock的時候,是不是如我們猜測的那樣先取_EntryList的元素,再來看cxq。
void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) { fast_exit (object, lock, THREAD) ; } void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) { assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here"); // if displaced header is null, the previous enter is recursive enter, no-op ......... ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ; } void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) { ................... for (;;) { assert (THREAD == _owner, "invariant") ; if (Knob_ExitPolicy == 0) { ......... } else { .......... } else { TEVENT (Inflated exit - complex egress) ; } } guarantee (_owner == THREAD, "invariant") ; ObjectWaiter * w = NULL ; int QMode = Knob_QMode ; w = _EntryList ; if (w != NULL) { assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ; ExitEpilog (Self, w) ; return ; } w = _cxq ; if (w == NULL) continue ; for (;;) { assert (w != NULL, "Invariant") ; ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ; if (u == w) break ; w = u ; } if (QMode == 1) { .............. } else { _EntryList = w ; ObjectWaiter * q = NULL ; ObjectWaiter * p ; for (p = w ; p != NULL ; p = p->_next) { guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ; p->TState = ObjectWaiter::TS_ENTER ; p->_prev = q ; q = p ; } } if (_succ != NULL) continue; w = _EntryList ; if (w != NULL) { guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ; ExitEpilog (Self, w) ; return ; } } static int Knob_QMode = 0 ; // EntryList-cxq policy - queue discipline static int Knob_ExitPolicy = 0 ;
這裡是先取_EntryList,假如有就呼叫ExitEpilog並返回,否則採用原子操作取_cxq,然後將這個值再次給_EntryList,並呼叫ExitEpilog。 總之這裡最終都是將資料給_EntryList,只不過假如_EntryList原本就有值,那麼我們會先使用它,之後再使用_cxq。 我們看下ExitEpilog完成了什麼事:
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) { assert (_owner == Self, "invariant") ; // Exit protocol: // 1. ST _succ = wakee // 2. membar #loadstore|#storestore; // 2. ST _owner = NULL // 3. unpark(wakee) _succ = Knob_SuccEnabled ? Wakee->_thread : NULL ; ParkEvent * Trigger = Wakee->_event ; Wakee = NULL ; // Drop the lock OrderAccess::release_store_ptr (&_owner, NULL) ; OrderAccess::fence() ; // ST _owner vs LD in unpark() if (SafepointSynchronize::do_call_back()) { TEVENT (unpark before SAFEPOINT) ; } Trigger->unpark() ; // Maintain stats and report events to JVMTI if (ObjectMonitor::_sync_Parks != NULL) { ObjectMonitor::_sync_Parks->inc() ; } }
果然這裡最後呼叫了unpark,從而喚醒了相應的那個執行緒。這裡的_EntryList的值會如何變化?我們最後看下,當等待執行緒從wait中醒過來會做什麼: // Note: a subset of changes to ObjectMonitor::wait() // will need to be replicated in complete_exit above
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) { .......... ObjectWaiter node(Self); node.TState = ObjectWaiter::TS_WAIT ; ............. Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ; AddWaiter (&node) ; Thread::SpinRelease (&_WaitSetLock) ; ............ if (node.TState == ObjectWaiter::TS_WAIT) { Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ; if (node.TState == ObjectWaiter::TS_WAIT) { DequeueSpecificWaiter (&node) ; // unlink from WaitSet assert(node._notified == 0, "invariant"); node.TState = ObjectWaiter::TS_RUN ; } .............. assert (_owner != Self, "invariant") ; ObjectWaiter::TStates v = node.TState ; if (v == ObjectWaiter::TS_RUN) { enter (Self) ; } else {
進入了enter(Self),
void ATTR ObjectMonitor::enter(TRAPS) { for (;;) { jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() // or java_suspend_self() EnterI (THREAD) ; 進入EnterI(THREAD), void ATTR ObjectMonitor::EnterI (TRAPS) { Thread * Self = THREAD ; assert (Self->is_Java_thread(), "invariant") ; assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ; if (TryLock (Self) > 0) { assert (_succ != Self , "invariant") ; assert (_owner == Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } DeferredInitialize () ; if (TrySpin (Self) > 0) { assert (_owner == Self , "invariant") ; assert (_succ != Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } // The Spin failed -- Enqueue and park the thread ... assert (_succ != Self , "invariant") ; assert (_owner != Self , "invariant") ; assert (_Responsible != Self , "invariant") ; ObjectWaiter node(Self) ; Self->_ParkEvent->reset() ; node._prev = (ObjectWaiter *) 0xBAD ; node.TState = ObjectWaiter::TS_CXQ ; ObjectWaiter * nxt ; for (;;) { node._next = nxt = _cxq ; if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ; // Interference - the CAS failed because _cxq changed. Just retry. // As an optional optimization we retry the lock. if (TryLock (Self) > 0) { assert (_succ != Self , "invariant") ; assert (_owner == Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } } if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) { // Try to assume the role of responsible thread for the monitor. // CONSIDER: ST vs CAS vs { if (Responsible==null) Responsible=Self } Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ; } TEVENT (Inflated enter - Contention) ; int nWakeups = 0 ; int RecheckInterval = 1 ; for (;;) { if (TryLock (Self) > 0) break ; assert (_owner != Self, "invariant") ; if ((SyncFlags & 2) && _Responsible == NULL) { Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ; } if (_Responsible == Self || (SyncFlags & 1)) { TEVENT (Inflated enter - park TIMED) ; Self->_ParkEvent->park ((jlong) RecheckInterval) ; RecheckInterval *= 8 ; if (RecheckInterval > 1000) RecheckInterval = 1000 ; } else { TEVENT (Inflated enter - park UNTIMED) ; Self->_ParkEvent->park() ; } if (TryLock(Self) > 0) break ; TEVENT (Inflated enter - Futile wakeup) ; if (ObjectMonitor::_sync_FutileWakeups != NULL) { ObjectMonitor::_sync_FutileWakeups->inc() ; } ++ nWakeups ; if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) { Self->_ParkEvent->reset() ; OrderAccess::fence() ; } if (_succ == Self) _succ = NULL ; // Invariant: after clearing _succ a thread *must* retry _owner before parking. OrderAccess::fence() ; } assert (_owner == Self , "invariant") ; assert (object() != NULL , "invariant") ; // I'd like to write: // guarantee (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ; // but as we're at a safepoint that's not safe. UnlinkAfterAcquire (Self, &node) ; if (_succ == Self) _succ = NULL ; assert (_succ != Self, "invariant") ; if (_Responsible == Self) { _Responsible = NULL ; OrderAccess::fence(); // Dekker pivot-point .........
這裡的重點是UnlinkAfterAcquire,
void ObjectMonitor::UnlinkAfterAcquire (Thread * Self, ObjectWaiter * SelfNode) { assert (_owner == Self, "invariant") ; assert (SelfNode->_thread == Self, "invariant") ; if (SelfNode->TState == ObjectWaiter::TS_ENTER) { // Normal case: remove Self from the DLL EntryList . // This is a constant-time operation. ObjectWaiter * nxt = SelfNode->_next ; ObjectWaiter * prv = SelfNode->_prev ; if (nxt != NULL) nxt->_prev = prv ; if (prv != NULL) prv->_next = nxt ; if (SelfNode == _EntryList ) _EntryList = nxt ; assert (nxt == NULL || nxt->TState == ObjectWaiter::TS_ENTER, "invariant") ; assert (prv == NULL || prv->TState == ObjectWaiter::TS_ENTER, "invariant") ; TEVENT (Unlink from EntryList) ; } else {
它會將_EntryList的值做更新,從而讓鎖的獲取繼續下去,保證不會出錯。 到這裡為止,我們終於大致走完了一遍synchronized鎖與lock鎖分別在JVM和JUC中的實現。 那麼有個問題,linux中pthread鎖的實現,行為模式又是怎麼樣的呢? 我們嘗試將使用pthread來執行測試這兩個例子: 鎖獲取程式碼: 編譯生成執行檔案testLock:gcc -pthread testLock.c -o testLock 執行:./testLock
#include <stdio.h> #include <pthread.h> #include <unistd.h> pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; #define N 10 void* runTask(void* pm){ pthread_mutex_lock(&mutex); printf("%d get lock\n", (int)pm); usleep(100000); pthread_mutex_unlock(&mutex); return 0; } int main(){ // int N = 10; pthread_t threads[N]; int i = 0; pthread_mutex_lock(&mutex); for(i = 0; i < N; ++i){ pthread_create(&threads[i], 0, runTask, (void*)i); usleep(100000); } pthread_mutex_unlock(&mutex); for(i = 0; i < N; ++i){ pthread_join(threads[i], NULL); } return 0; }
輸出:
0 get lock 1 get lock 2 get lock 3 get lock 4 get lock 5 get lock 6 get lock 7 get lock 8 get lock 9 get lock
鎖獲取+阻塞等待程式碼:
#include <stdio.h> #include <pthread.h> #include <unistd.h> pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; #define N 10 void* runTask(void* pm){ pthread_mutex_lock(&mutex); printf("%d get lock\n", (int)pm); usleep(100000); pthread_mutex_unlock(&mutex); return 0; } void* runTaskWithWait(void* pm){ pthread_mutex_lock(&mutex); printf("%d wait first get lock\n", (int)pm); pthread_cond_wait(&cond, &mutex); printf("%d wait second get lock\n", (int)pm); usleep(300000); pthread_mutex_unlock(&mutex); } int main(){ // int N = 10; pthread_t threads[N]; pthread_t threadsForWaits[N]; int i = 0; for(; i < N; ++i){ pthread_create(&threadsForWaits[i], 0, runTaskWithWait, (void*)i); usleep(100000); } pthread_mutex_lock(&mutex); for(i = 0; i < N; ++i){ pthread_create(&threads[i], 0, runTask, (void*)i); usleep(100000); } //pthread_cond_broadcast(&cond); for(i = 0; i < N; ++i) pthread_cond_signal(&cond); usleep(100000); pthread_mutex_unlock(&mutex); for(i = 0; i < N; ++i){ pthread_join(threads[i], NULL); } for(i = 0; i < N; ++i){ pthread_join(threadsForWaits[i], NULL); } return 0; }
輸出:
0 wait first get lock 1 wait first get lock 2 wait first get lock 3 wait first get lock 4 wait first get lock 5 wait first get lock 6 wait first get lock 7 wait first get lock 8 wait first get lock 9 wait first get lock 0 get lock 1 get lock 2 get lock 3 get lock 4 get lock 5 get lock 6 get lock 7 get lock 8 get lock 9 get lock 1 wait second get lock 2 wait second get lock 3 wait second get lock 0 wait second get lock 4 wait second get lock 5 wait second get lock 6 wait second get lock 7 wait second get lock 8 wait second get lock 9 wait second get lock
只能說等待執行緒轉移到阻塞執行緒之後的排列,看起來是沒啥規律 ([email protected][email protected]=)