JVM:鎖實現(synchronized&JSR166)行為分析和相關原始碼
JVM中有這樣一段註釋:
其中是說ParkEvent用於Java語言級別的關鍵字synchronized。// The base-class, PlatformEvent, is platform-specific while the ParkEvent is // platform-independent. PlatformEvent provides park(), unpark(), etc., and // is abstract -- that is, a PlatformEvent should never be instantiated except // as part of a ParkEvent. // Equivalently we could have defined a platform-independent base-class that // exported Allocate(), Release(), etc. The platform-specific class would extend // that base-class, adding park(), unpark(), etc. // // A word of caution: The JVM uses 2 very similar constructs: // 1. ParkEvent are used for Java-level "monitor" synchronization. // 2. Parkers are used by JSR166-JUC park-unpark. // // We'll want to eventually merge these redundant facilities and use ParkEvent.
Parkers用於Java類庫中的併發資料集合,該集合是由JSR166發展來的。
這裡說這兩個東西功能類似,將來會統一使用ParkEvent。
那麼它們究竟有什麼區別呢?
我們先看看這兩個類的大概介面樣子:
(ParkEvent)
(Parkers)class ParkEvent : public os::PlatformEvent { private: ParkEvent * FreeNext ; // Current association Thread * AssociatedWith ; intptr_t RawThreadIdentity ; // LWPID etc volatile int Incarnation ; class PlatformEvent : public CHeapObj<mtInternal> { // Use caution with reset() and fired() -- they may require MEMBARs void reset() { _Event = 0 ; } int fired() { return _Event; } void park () ; void unpark () ; int TryPark () ; int park (jlong millis) ; // relative timed-wait only
class Parker : public os::PlatformParker { public: // For simplicity of interface with Java, all forms of park (indefinite, // relative, and absolute) are multiplexed into one call. void park(bool isAbsolute, jlong time); void unpark(); // Lifecycle operators static Parker * Allocate (JavaThread * t) ; static void Release (Parker * e) ; private: static Parker * volatile FreeList ; static volatile int ListLock ;
可以看到它們提供一致的相同介面,park和unpark。從而支撐Java中併發控制的功能。
它們究竟有什麼不同呢?我們首先來執行2段類似的程式碼。
阻塞執行緒獲取鎖的順序完全相反
首先是使用synchronized提供的鎖機制,我們隨便用一個Object lock = new Object()作為鎖關聯的物件,程式碼如下,它的功能是讓10個執行緒進入阻塞狀態,然後釋放鎖,觀察隨後執行緒獲取鎖的順序:
package com.psly.testLocks;
public class TestLockSynchronized {
private static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
int N = 10;
Thread[] threads = new Thread[N];
for(int i = 0; i < N; ++i){
threads[i] = new Thread(new Runnable(){
public void run() {
synchronized(lock){
System.out.println(Thread.currentThread().getName() + " get synch lock!");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
}
synchronized(lock){
for(int i = 0; i < N; ++i){
threads[i].start();
Thread.sleep(200);
}
}
for(int i = 0; i < N; ++i)
threads[i].join();
}
}
我們用一個0.2seconds的時間,從而讓先建立的執行緒能夠先進入阻塞狀態,輸出為:
Thread-9 get synch lock!
Thread-8 get synch lock!
Thread-7 get synch lock!
Thread-6 get synch lock!
Thread-5 get synch lock!
Thread-4 get synch lock!
Thread-3 get synch lock!
Thread-2 get synch lock!
Thread-1 get synch lock!
Thread-0 get synch lock!
這有點奇怪,先嚐試獲取鎖的執行緒竟然後獲得鎖!
先不管這個, 我們把這個例子改為JSR166的Lock重做一遍:
package com.psly.testLocks;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestLockSynchronized {
private static Lock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
int N = 10;
Thread[] threads = new Thread[N];
for(int i = 0; i < N; ++i){
threads[i] = new Thread(new Runnable(){
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName() + " get JSR166 lock!");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
lock.unlock();
}
});
}
lock.lock();
for(int i = 0; i < N; ++i){
threads[i].start();
Thread.sleep(200);
}
lock.unlock();
for(int i = 0; i < N; ++i)
threads[i].join();
}
}
輸出為:
Thread-0 get JSR166 lock!
Thread-1 get JSR166 lock!
Thread-2 get JSR166 lock!
Thread-3 get JSR166 lock!
Thread-4 get JSR166 lock!
Thread-5 get JSR166 lock!
Thread-6 get JSR166 lock!
Thread-7 get JSR166 lock!
Thread-8 get JSR166 lock!
Thread-9 get JSR166 lock!
這個輸出比較符合了我們的預期,畢竟先嚐試獲取鎖的的確先獲取了鎖。
為什麼這兩種實現有這樣的差異呢,我們來看下他們分別的阻塞佇列實現,首先是JAVA的:
public void lock() {
sync.lock();
}
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
我們這裡重點檢視addWaiter(Node node);可以看出來,執行緒構造的阻塞節點是通過tail欄位加入進佇列的,並且作為next節點。這是個先進先出雙向佇列。
所以當鎖被釋放時,阻塞執行緒獲取鎖的順序與進阻塞佇列是一致的。
我們接著看下synchronized的實現,這裡涉及到JVM中系統程式設計的原始碼,這裡也只貼出跟進入阻塞佇列相關的程式碼:
class ObjectMonitor;
class ObjectSynchronizer : AllStatic {
static void fast_enter (Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS);
static void slow_enter (Handle obj, BasicLock* lock, TRAPS);
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
if (UseBiasedLocking) {
if (!SafepointSynchronize::is_at_safepoint()) {
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
return;
}
} else {
assert(!attempt_rebias, "can not rebias toward VM thread");
BiasedLocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
slow_enter (obj, lock, THREAD) ;
}
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
................
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
void ATTR ObjectMonitor::enter(TRAPS) {
// The following code is ordered to check the most common cases first
// and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors.
Thread * const Self = THREAD ;
void * cur ;
·········
for (;;) {
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition()
// or java_suspend_self()
EnterI (THREAD) ;
if (!ExitSuspendEquivalent(jt)) break ;
//
// We have acquired the contended monitor, but while we were
// waiting another thread suspended us. We don't want to enter
// the monitor while suspended because that would surprise the
// thread that suspended us.
//
_recursions = 0 ;
······
}
void ATTR ObjectMonitor::EnterI (TRAPS) {
Thread * Self = THREAD ;
assert (Self->is_Java_thread(), "invariant") ;
assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ;
// Try the lock - TATAS
if (TryLock (Self) > 0) {
......
}
if (TrySpin (Self) > 0) {
......
}
ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;
// Push "Self" onto the front of the _cxq.
// Once on cxq/EntryList, Self stays on-queue until it acquires the lock.
// Note that spinning tends to reduce the rate at which threads
// enqueue and dequeue on EntryList|cxq.
ObjectWaiter * nxt ;
for (;;) {
node._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
// Interference - the CAS failed because _cxq changed. Just retry.
// As an optional optimization we retry the lock.
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
}
這裡的重點是,ObjectWaiter node(Self) ;
ObjectWaiter node(Self) ;
........
for (;;) {
node._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
// Interference - the CAS failed because _cxq changed. Just retry.
// As an optional optimization we retry the lock.
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
}
_cxq,我們採用比較並交換的原子指令,修改了_cxq,修改之前將_cxq的舊值填入node的next欄位,這樣一來我們就在_cxq上構造了個stack,也就是先進後出的佇列。於是下次當我們索取_cxq時候自然就取得了最後填入的值。這解釋了我們上面的執行示例,阻塞執行緒獲取鎖的順序與進佇列完全相反。
我們接著看下再複雜點的例子,依次啟動10個執行緒,依次獲取鎖,獲得鎖的同時列印自身資訊,然後主動呼叫wait語義的方法陷入阻塞狀態。等到這10個執行緒都阻塞之後主執行緒獲取鎖,接著再啟動10個無等待執行緒,這是個執行緒唯一做的事情就是依次獲取鎖,他們會按照我們上面所說的方式進入阻塞佇列。接著主執行緒依次傳送4次notify語義的訊號(注意時間間隔),然後釋放鎖。我們感興趣的是這幾個收到通知的執行緒,他們相對已經在阻塞佇列中的執行緒,誰會先獲取鎖?他們的排列又是怎麼樣的呢?
我們先執行JSR166的版本,程式碼如下:
package com.psly.testLocks;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestLockImplWithWaits {
private static Lock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
int N = 10;
Thread[] threads = new Thread[N];
Thread[] threadsForWaits = new Thread[N];
for(int i = 0; i < N; ++i){
threads[i] = new Thread(new Runnable(){
@Override
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName() + " nowait get lock");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
lock.unlock();
}
});
}
for(int i = 0; i < N; ++i){
threadsForWaits[i] = new Thread(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
lock.lock(); //synchronized(lock){
System.out.println(Thread.currentThread().getName() + " wait first get lock");
try {
condition.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " wait second get lock");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
lock.unlock();
}
});
}
for(int i = 0; i < N; ++i){
threadsForWaits[i].start();
Thread.sleep(200);
}
lock.lock(); //synchronized(lock){
for(int i = 0; i < N; ++i){
threads[i].start();
Thread.sleep(200);
}
for(int i = 0; i < 4 ; ++i){
condition.signal();
}
Thread.sleep(200);
lock.unlock();
for(int i = 0; i < N; ++i)
threads[i].join();
for(int i = 0; i < N; ++i)
threadsForWaits[i].join();
}
}
Thread-10到Thread-19為主動呼叫wait阻塞的執行緒,Thread-0到Thread-9為只獲取鎖的執行緒。
輸出為:
Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-0 nowait get lock
Thread-1 nowait get lock
Thread-2 nowait get lock
Thread-3 nowait get lock
Thread-4 nowait get lock
Thread-5 nowait get lock
Thread-6 nowait get lock
Thread-7 nowait get lock
Thread-8 nowait get lock
Thread-9 nowait get lock
Thread-10 wait second get lock
Thread-11 wait second get lock
Thread-12 wait second get lock
Thread-13 wait second get lock
可以看到JSR166的實現依然滿足先進先出,即使Thread-10到Thread-13是先獲取鎖之後陷入wait的。我們接著看下這是如何做到的,
注意JSR166的實現是在JAVA層面完成的。
主要是三個呼叫:wait,notify,unlock。
await:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
這裡的addConditionWaiter嘗試新增等待佇列的節點。acquireQueued用於將來被喚醒之後的再次嘗試獲取鎖。
我們來看addConditionWaiter,
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
是將新節點作為lastWaiter的next節點,並且本身成為lastWaiter節點。那麼這裡說明這構造的是一個先進先出的佇列。(這裡是在已經獲取鎖的情況下,所以不需同步)
我們接著看 signal
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
以上已經拿到了等待佇列第一個節點,接著enq讓他轉移(transfer)到鎖的阻塞佇列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
這樣一來我們就完成了將等待執行緒從處於wait的狀態,轉移到了未獲得鎖處於阻塞的狀態。
最後看下當主執行緒釋放鎖時的操作:
unlock
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
可以看到,呼叫unlock的執行緒喚醒了阻塞佇列head中的第一個執行緒。 (OVER)
因為阻塞佇列跟等待佇列都是先進先出,這樣子能夠得到一個比較好的行為。
從而導致了我們之前的輸出,看上去比較符合預期。
最後我們來看看採用synchronized,這個示例下的輸出是什麼,程式碼如下:
package com.psly.testLocks;
public class TestLockImplWithWaits {
private static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
int N = 10;
Thread[] threads = new Thread[N];
Thread[] threadsForWaits = new Thread[N];
for(int i = 0; i < N; ++i){
threads[i] = new Thread(new Runnable(){
@Override
public void run() {
synchronized(lock){
System.out.println(Thread.currentThread().getName() + " nowait get lock");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
}
for(int i = 0; i < N; ++i){
threadsForWaits[i] = new Thread(new Runnable(){
@Override
public void run() {
// TODO Auto-generated method stub
synchronized(lock){
System.out.println(Thread.currentThread().getName() + " wait first get lock");
try {
lock.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " wait second get lock");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
}
for(int i = 0; i < N; ++i){
threadsForWaits[i].start();
Thread.sleep(200);
}
synchronized(lock){
for(int i = 0; i < N; ++i){
threads[i].start();
Thread.sleep(200);
}
for(int i = 0; i < 4 ; ++i){
lock.notify();
}
Thread.sleep(200);
}
for(int i = 0; i < N; ++i)
threads[i].join();
for(int i = 0; i < N; ++i)
threadsForWaits[i].join();
}
}
輸出入下:Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-10 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
Thread-9 nowait get lock
Thread-8 nowait get lock
Thread-7 nowait get lock
Thread-6 nowait get lock
Thread-5 nowait get lock
Thread-4 nowait get lock
Thread-3 nowait get lock
Thread-2 nowait get lock
Thread-1 nowait get lock
Thread-0 nowait get lock
這個結果很奇怪,最奇怪在於居然是連續輸出Thread-10,Thread-13,Thread-12,Thread-11:
Thread-10 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
我們調整一下發送notify的數量,給出所有等待執行緒數量的呼叫N。
for(int i = 0; i < N ; ++i){
lock.notify();
}
輸出為:
Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-10 wait second get lock
Thread-19 wait second get lock
Thread-18 wait second get lock
Thread-17 wait second get lock
Thread-16 wait second get lock
Thread-15 wait second get lock
Thread-14 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
Thread-9 nowait get lock
Thread-8 nowait get lock
Thread-7 nowait get lock
Thread-6 nowait get lock
Thread-5 nowait get lock
Thread-4 nowait get lock
Thread-3 nowait get lock
Thread-2 nowait get lock
Thread-1 nowait get lock
Thread-0 nowait get lock
依然是Thread-10莫名其妙出現在最前,後面緊接著Thread-19到Thread-11倒序。
我們再嘗試換下呼叫方式,採用notifyAll();
synchronized(lock){
for(int i = 0; i < N; ++i){
threads[i].start();
Thread.sleep(200);
}
lock.notifyAll();
Thread.sleep(200);
}
輸出為:
Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-19 wait second get lock
Thread-18 wait second get lock
Thread-17 wait second get lock
Thread-16 wait second get lock
Thread-15 wait second get lock
Thread-14 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
Thread-10 wait second get lock
Thread-9 nowait get lock
Thread-8 nowait get lock
Thread-7 nowait get lock
Thread-6 nowait get lock
Thread-5 nowait get lock
Thread-4 nowait get lock
Thread-3 nowait get lock
Thread-2 nowait get lock
Thread-1 nowait get lock
Thread-0 nowait get lock
這下子又變了,Thread-10變為最後,完全逆序來獲取鎖了。
我們嘗試進入JVM去看下這一切是怎麼回事。與之前的過程類似,首先我們來看看等待之後執行緒節點如何組織的:
經研究,應該是下面這片程式碼:
WAIT:
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
Thread * const Self = THREAD ;
TEVENT (Wait) ;
assert (Self->_Stalled == 0, "invariant") ;
Self->_Stalled = intptr_t(this) ;
jt->set_current_waiting_monitor(this);
ObjectWaiter node(Self);
node.TState = ObjectWaiter::TS_WAIT ;
Self->_ParkEvent->reset() ;
OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag
// Enter the waiting queue, which is a circular doubly linked list in this case
// but it could be a priority queue or any data structure.
// _WaitSetLock protects the wait queue. Normally the wait queue is accessed only
// by the the owner of the monitor *except* in the case where park()
// returns because of a timeout of interrupt. Contention is exceptionally rare
// so we use a simple spin-lock instead of a heavier-weight blocking lock.
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
assert(node != NULL, "should not dequeue NULL node");
assert(node->_prev == NULL, "node already in list");
assert(node->_next == NULL, "node already in list");
// put node at end of queue (circular doubly linked list)
if (_WaitSet == NULL) {
_WaitSet = node;
node->_prev = node;
node->_next = node;
} else {
ObjectWaiter* head = _WaitSet ;
ObjectWaiter* tail = head->_prev;
assert(tail->_next == head, "invariant check");
tail->_next = node;
head->_prev = node;
node->_next = head;
node->_prev = tail;
}
}
如果_WaitSet為空,則設定它,並且前驅和後繼都是它。
如果只有_WaitSet一個,則將節點增加到它的後面。
不為空的情況下統一新增到next節點。
所以這裡是個先進先出的佇列。
notify
void ObjectMonitor::notify(TRAPS) {
CHECK_OWNER();
if (_WaitSet == NULL) {
TEVENT (Empty-Notify) ;
return ;
}
DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
int Policy = Knob_MoveNotifyee ;
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
ObjectWaiter * iterator = DequeueWaiter() ;
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
// dequeue the very first waiter
ObjectWaiter* waiter = _WaitSet;
if (waiter) {
DequeueSpecificWaiter(waiter);
}
return waiter;
}
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
// dequeue the very first waiter
ObjectWaiter* waiter = _WaitSet;
if (waiter) {
DequeueSpecificWaiter(waiter);
}
return waiter;
}
inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) {
assert(node != NULL, "should not dequeue NULL node");
assert(node->_prev != NULL, "node already removed from list");
assert(node->_next != NULL, "node already removed from list");
// when the waiter has woken up because of interrupt,
// timeout or other spurious wake-up, dequeue the
// waiter from waiting list
ObjectWaiter* next = node->_next;
if (next == node) {
assert(node->_prev == node, "invariant check");
_WaitSet = NULL;
} else {
ObjectWaiter* prev = node->_prev;
assert(prev->_next == node, "invariant check");
assert(next->_prev == node, "invariant check");
next->_prev = prev;
prev->_next = next;
if (_WaitSet == node) {
_WaitSet = next;
}
}
node->_next = NULL;
node->_prev = NULL;
}
static int Knob_MoveNotifyee = 2 ; // notify() - disposition of notifyee
if (Policy == 2) { // prepend to cxq
// prepend to cxq
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
}
} else
if (Policy == 3) { // append to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Tail ;
Tail = _cxq ;
if (Tail == NULL) {
這裡的DequeueWaiter呼叫DequeueSpecificWaiter,效果是隊列出一個元素,_WaitSet.next成為_WaitSet。這裡有_EntryList、_cxq兩個資料結構。
接著我們走Policy==2分支,注意這裡並不是全部放入cxq(儘管註釋如此),判斷是_EntryList==NULL的時候,直接將我們的節點放入它。否則,將我們的節點新增到_cxq這個stack前面。想象一個,假如第一個節點進來,發現_EntryList為空,_EntryList設定為它自己。從第二個節點開始,所有節點都是進stack,這樣的話是不是取出時,第二個往後的節點都顛倒了呢。假如我們取節點的方式是先驅_EntryList,然後再取stack中的元素。則就會發生示例中Thread-10提前的亂序情況。
但是注意,之前的notifyAll並沒有產生這種效果。所以我們來看下notifyAll的程式碼:
if (Policy == 2) { // prepend to cxq
// prepend to cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
} else
果然如此!notifyAll的邏輯跟notify大部分一樣,除了它將所有節點都加入cxq。所以我們才會觀察到notifyAll呼叫之後的節點獲取鎖順序是逆序。
unlock
我們接著看看unlock的時候,是不是如我們猜測的那樣先取_EntryList的元素,再來看cxq。
void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
fast_exit (object, lock, THREAD) ;
}
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here");
// if displaced header is null, the previous enter is recursive enter, no-op
.........
ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
}
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
...................
for (;;) {
assert (THREAD == _owner, "invariant") ;
if (Knob_ExitPolicy == 0) {
.........
} else {
..........
} else {
TEVENT (Inflated exit - complex egress) ;
}
}
guarantee (_owner == THREAD, "invariant") ;
ObjectWaiter * w = NULL ;
int QMode = Knob_QMode ;
w = _EntryList ;
if (w != NULL) {
assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
w = _cxq ;
if (w == NULL) continue ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
if (QMode == 1) {
..............
} else {
_EntryList = w ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
}
if (_succ != NULL) continue;
w = _EntryList ;
if (w != NULL) {
guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
}
static int Knob_QMode = 0 ; // EntryList-cxq policy - queue discipline
static int Knob_ExitPolicy = 0 ;
這裡是先取_EntryList,假如有就呼叫ExitEpilog並返回,否則採用原子操作取_cxq,然後將這個值再次給_EntryList,並呼叫ExitEpilog。
總之這裡最終都是將資料給_EntryList,只不過假如_EntryList原本就有值,那麼我們會先使用它,之後再使用_cxq。
我們看下ExitEpilog完成了什麼事:
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
assert (_owner == Self, "invariant") ;
// Exit protocol:
// 1. ST _succ = wakee
// 2. membar #loadstore|#storestore;
// 2. ST _owner = NULL
// 3. unpark(wakee)
_succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
ParkEvent * Trigger = Wakee->_event ;
Wakee = NULL ;
// Drop the lock
OrderAccess::release_store_ptr (&_owner, NULL) ;
OrderAccess::fence() ; // ST _owner vs LD in unpark()
if (SafepointSynchronize::do_call_back()) {
TEVENT (unpark before SAFEPOINT) ;
}
Trigger->unpark() ;
// Maintain stats and report events to JVMTI
if (ObjectMonitor::_sync_Parks != NULL) {
ObjectMonitor::_sync_Parks->inc() ;
}
}
果然這裡最後呼叫了unpark,從而喚醒了相應的那個執行緒。這裡的_EntryList的值會如何變化?我們最後看下,當等待執行緒從wait中醒過來會做什麼:
// Note: a subset of changes to ObjectMonitor::wait()
// will need to be replicated in complete_exit above
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
..........
ObjectWaiter node(Self);
node.TState = ObjectWaiter::TS_WAIT ;
.............
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;
............
if (node.TState == ObjectWaiter::TS_WAIT) {
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
if (node.TState == ObjectWaiter::TS_WAIT) {
DequeueSpecificWaiter (&node) ; // unlink from WaitSet
assert(node._notified == 0, "invariant");
node.TState = ObjectWaiter::TS_RUN ;
}
..............
assert (_owner != Self, "invariant") ;
ObjectWaiter::TStates v = node.TState ;
if (v == ObjectWaiter::TS_RUN) {
enter (Self) ;
} else {
進入了enter(Self),
void ATTR ObjectMonitor::enter(TRAPS) {
for (;;) {
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition()
// or java_suspend_self()
EnterI (THREAD) ;
進入EnterI(THREAD),
void ATTR ObjectMonitor::EnterI (TRAPS) {
Thread * Self = THREAD ;
assert (Self->is_Java_thread(), "invariant") ;
assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ;
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
DeferredInitialize () ;
if (TrySpin (Self) > 0) {
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
// The Spin failed -- Enqueue and park the thread ...
assert (_succ != Self , "invariant") ;
assert (_owner != Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;
ObjectWaiter * nxt ;
for (;;) {
node._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
// Interference - the CAS failed because _cxq changed. Just retry.
// As an optional optimization we retry the lock.
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
}
if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
// Try to assume the role of responsible thread for the monitor.
// CONSIDER: ST vs CAS vs { if (Responsible==null) Responsible=Self }
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
TEVENT (Inflated enter - Contention) ;
int nWakeups = 0 ;
int RecheckInterval = 1 ;
for (;;) {
if (TryLock (Self) > 0) break ;
assert (_owner != Self, "invariant") ;
if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT (Inflated enter - park TIMED) ;
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
TEVENT (Inflated enter - park UNTIMED) ;
Self->_ParkEvent->park() ;
}
if (TryLock(Self) > 0) break ;
TEVENT (Inflated enter - Futile wakeup) ;
if (ObjectMonitor::_sync_FutileWakeups != NULL) {
ObjectMonitor::_sync_FutileWakeups->inc() ;
}
++ nWakeups ;
if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) {
Self->_ParkEvent->reset() ;
OrderAccess::fence() ;
}
if (_succ == Self) _succ = NULL ;
// Invariant: after clearing _succ a thread *must* retry _owner before parking.
OrderAccess::fence() ;
}
assert (_owner == Self , "invariant") ;
assert (object() != NULL , "invariant") ;
// I'd like to write:
// guarantee (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
// but as we're at a safepoint that's not safe.
UnlinkAfterAcquire (Self, &node) ;
if (_succ == Self) _succ = NULL ;
assert (_succ != Self, "invariant") ;
if (_Responsible == Self) {
_Responsible = NULL ;
OrderAccess::fence(); // Dekker pivot-point
.........
這裡的重點是UnlinkAfterAcquire,
void ObjectMonitor::UnlinkAfterAcquire (Thread * Self, ObjectWaiter * SelfNode)
{
assert (_owner == Self, "invariant") ;
assert (SelfNode->_thread == Self, "invariant") ;
if (SelfNode->TState == ObjectWaiter::TS_ENTER) {
// Normal case: remove Self from the DLL EntryList .
// This is a constant-time operation.
ObjectWaiter * nxt = SelfNode->_next ;
ObjectWaiter * prv = SelfNode->_prev ;
if (nxt != NULL) nxt->_prev = prv ;
if (prv != NULL) prv->_next = nxt ;
if (SelfNode == _EntryList ) _EntryList = nxt ;
assert (nxt == NULL || nxt->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (prv == NULL || prv->TState == ObjectWaiter::TS_ENTER, "invariant") ;
TEVENT (Unlink from EntryList) ;
} else {
它會將_EntryList的值做更新,從而讓鎖的獲取繼續下去,保證不會出錯。
到這裡為止,我們終於大致走完了一遍synchronized鎖與lock鎖分別在JVM和JUC中的實現。
那麼有個問題,linux中pthread鎖的實現,行為模式又是怎麼樣的呢?
我們嘗試將使用pthread來執行測試這兩個例子:
鎖獲取程式碼:
編譯生成執行檔案testLock:gcc -pthread testLock.c -o testLock
執行:./testLock
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
#define N 10
void* runTask(void* pm){
pthread_mutex_lock(&mutex);
printf("%d get lock\n", (int)pm);
usleep(100000);
pthread_mutex_unlock(&mutex);
return 0;
}
int main(){
// int N = 10;
pthread_t threads[N];
int i = 0;
pthread_mutex_lock(&mutex);
for(i = 0; i < N; ++i){
pthread_create(&threads[i], 0, runTask, (void*)i);
usleep(100000);
}
pthread_mutex_unlock(&mutex);
for(i = 0; i < N; ++i){
pthread_join(threads[i], NULL);
}
return 0;
}
輸出:
0 get lock
1 get lock
2 get lock
3 get lock
4 get lock
5 get lock
6 get lock
7 get lock
8 get lock
9 get lock
鎖獲取+阻塞等待程式碼:
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
#define N 10
void* runTask(void* pm){
pthread_mutex_lock(&mutex);
printf("%d get lock\n", (int)pm);
usleep(100000);
pthread_mutex_unlock(&mutex);
return 0;
}
void* runTaskWithWait(void* pm){
pthread_mutex_lock(&mutex);
printf("%d wait first get lock\n", (int)pm);
pthread_cond_wait(&cond, &mutex);
printf("%d wait second get lock\n", (int)pm);
usleep(300000);
pthread_mutex_unlock(&mutex);
}
int main(){
// int N = 10;
pthread_t threads[N];
pthread_t threadsForWaits[N];
int i = 0;
for(; i < N; ++i){
pthread_create(&threadsForWaits[i], 0, runTaskWithWait, (void*)i);
usleep(100000);
}
pthread_mutex_lock(&mutex);
for(i = 0; i < N; ++i){
pthread_create(&threads[i], 0, runTask, (void*)i);
usleep(100000);
}
//pthread_cond_broadcast(&cond);
for(i = 0; i < N; ++i)
pthread_cond_signal(&cond);
usleep(100000);
pthread_mutex_unlock(&mutex);
for(i = 0; i < N; ++i){
pthread_join(threads[i], NULL);
}
for(i = 0; i < N; ++i){
pthread_join(threadsForWaits[i], NULL);
}
return 0;
}
輸出:
0 wait first get lock
1 wait first get lock
2 wait first get lock
3 wait first get lock
4 wait first get lock
5 wait first get lock
6 wait first get lock
7 wait first get lock
8 wait first get lock
9 wait first get lock
0 get lock
1 get lock
2 get lock
3 get lock
4 get lock
5 get lock
6 get lock
7 get lock
8 get lock
9 get lock
1 wait second get lock
2 wait second get lock
3 wait second get lock
0 wait second get lock
4 wait second get lock
5 wait second get lock
6 wait second get lock
7 wait second get lock
8 wait second get lock
9 wait second get lock
只能說等待執行緒轉移到阻塞執行緒之後的排列,看起來是沒啥規律 ([email protected][email protected]=)
相關推薦
JVM:鎖實現(synchronized&JSR166)行為分析和相關原始碼
JVM中有這樣一段註釋: // The base-class, PlatformEvent, is platform-specific while the ParkEvent is // platform-independent. PlatformEvent provid
深度學習助力實現智慧行為分析和事件識別
作者:趙放、杜勇、王洪鬆、吳子豐 行為識別是指通過分析視訊、深度感測器等資料,利用特定的演算法,對行人的行為進行識別、分析的技術。這項技術被廣泛應用在視訊分類、人機互動、安防監控等領域。行為識別包含兩個研究方向:個體行為識別與群體行為(事件)識別。近年來,深度攝像技術的發展使得人體運動的深度影象序列變得容
水印第三版 ~ 變態水印(這次用Magick.NET來實現,附需求分析和原始碼)
以前的水印,只是簡單走起,用的是原生態的方法。現在各種變態水印,於是就不再用原生態的了,太麻煩了,這次用的是Magick,這個類庫還是很有名的,圖形化處理基本上都是支援的,至少逆天是挺喜歡的 歷史文章: 1.逆天通用水印支援Winform,WPF,Web,WP,Win10。支援位置選擇(9個位置
hive實現網站使用者行為分析指標
欄位解釋accessDate //訪問時間,精確到日期,String格式accessTime //訪問時間,精確到毫秒,int格式accessHour //訪問小時,區間為0-23,int格式 requestMethod //請求方式(get post 統計的
java中自定義鎖實現synchronized功能
public class Test {private static long count = 0;private Lock lock = new Lock();private int m = 0;private int a = 0;private int b = 0;pub
S4系統模型分析和關鍵原始碼讀解
S4(Simple Scalable Stream System) 流資料處理系統是Yahoo!公司提出的,在2011年的時候成為Apache軟體基金下的一個孵化專案,可惜的是在2014年的時候該孵化專案“退休”了,具體原因未知!!從這裡可以瞭解它當前的狀態資訊:link. 閱讀了所發表的論
分析思維 第四篇:資料分析入門階段——描述性統計分析和相關分析
資料分析的入門思維,首先要認識資料,然後對資料進行簡單的分析,比如描述性統計分析和相關性分析等。 一,認識變數和資料 變數和資料是資料分析中常用的概念,用變數來描述事物的特徵,而資料是變數的具體值,把變數的值也叫做觀測值。 1,變數 變數是用來描述總體中成員的某一個特性,例如,性別、年齡、身高、收入等。 變數
presto UI 分析及相關原始碼分析
CLUSTER OVERVIEW RESERVED MEMORY QUERY DETAILS
JAVA互斥鎖(synchronized&Lock):行為分析及原始碼
JVM中有這樣一段註釋: // The base-class, PlatformEvent, is platform-specific while the ParkEvent is // platform-independent. PlatformEvent provides park()
死鎖實現和怎樣分析死鎖
Java Deadlock Example and How to analyze deadlock situation 所謂死鎖:是指兩個或兩個以上的程序在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產
簡單理解javascript中的原型對象,實現對之間共享屬性和行為
type屬性 定義 say 能夠 方法 () post spa popu javascript中提供了構造函數。可以方便的創建對象。典型的構造函數例如以下: function Person(name, age) { this.name = name;
leetcode: Longest Valid Parentheses分析和實現
fin 最大 區間 之間 n) parent 復雜度 gif package 題目大意:給出一個只包含字符‘(‘和‘)‘的字符串S,求最長有效括號序列的長度。 很有趣的題目,有助於我們對這種人類自身制定的規則的深入理解,可能我們大多數人都從沒有真正理解過怎樣
RabbitMQ實戰:可用性分析和實現
RabbitMQ本系列是「RabbitMQ實戰:高效部署分布式消息隊列」書籍的總結筆記。 上一篇介紹了各種場景下的最佳實踐,大部分場景可以使用「發後即忘」的模式,不需要響應,如果需要響應,可以使用RabbitMQ的RPC模型。 RabbitMQ以異步的方式解耦系統間的關系,調用者將業務請求發送到Rabbit
數據庫事務隔離級別和鎖實現機制
約定 表鎖 四種 back 數據庫操作 升級 數據對象 三級封鎖 pro 1. 數據庫事務處理中出現的數據不一致的情況 在多個事務並發做數據庫操作的時候,如果沒有有效的避免機制,就會出現種種問題。大體上有四種問題,歸結如下: 1.1 丟失更新 如果兩個事務都要更新數據庫一個
Mybatis實現一對一查詢 對ResultType和ResultMap分析
結果 列名 單獨 定義 延遲加載 map 映射 包括 增加 實現一對一查詢: ResultMap:使用ResultType實現較為簡單,如果pojo中沒有包括查詢出來的列名,需要增加 列名對應的屬性,即可完成映射。 如果沒有查詢結果的特殊要求建議使用
基於redis和zookeeper的分布式鎖實現方式
自動 key-value 判斷 nosql 順序 種類型 超時時間 key存在 sql數據庫 先來說說什麽是分布式鎖,簡單來說,分布式鎖就是在分布式並發場景中,能夠實現多節點的代碼同步的一種機制。從實現角度來看,主要有兩種方式:基於redis的方式和基於zookeeper的
類鎖和對象鎖,synchronized修飾static方法與非static方法的區別
ati nbsp 的區別 一個 靜態方法 範圍 之間 對象 ron 當synchronized修飾一個static方法時,多線程下,獲取的是類鎖(即Class本身,註意:不是實例), 作用範圍是整個靜態方法,作用的對象是這個類的所有對象。 當synchro
Java內部鎖(synchronized)中類鎖和物件鎖
版權宣告:本文為博主原創文章,轉載請註明出處。 https://blog.csdn.net/qq_25827845/article/details/77688880 synchronized是Java提供的內部鎖,裡邊有類鎖和物件鎖;在靜態方
Java 多執行緒分段下載原理分析和實現
多執行緒下載介紹 多執行緒下載技術是很常見的一種下載方案,這種方式充分利用了多執行緒的優勢,在同一時間段內通過多個執行緒發起下載請求,將需要下載的資料分割成多個部分,每一個執行緒只負責下載其中一個部分,然後將下載後的資料組裝成完整的資料檔案,這樣便大大加快了下載效率。常見的下載器,迅
java鎖機制的兩種實現synchronized 與ReentrantLock
java的多執行緒環境下併發是常見問題,這兩天看了鎖相關的問題,記錄下兩個簡單的用鎖實現等待/喚醒機制的demo。 1.synchronized方式實現等待/喚醒。 public class WaitAndNotify { private static boolean flag =