1. 程式人生 > >JVM:鎖實現(synchronized&JSR166)行為分析和相關原始碼

JVM:鎖實現(synchronized&JSR166)行為分析和相關原始碼

JVM中有這樣一段註釋:

// The base-class, PlatformEvent, is platform-specific while the ParkEvent is
// platform-independent.  PlatformEvent provides park(), unpark(), etc., and
// is abstract -- that is, a PlatformEvent should never be instantiated except
// as part of a ParkEvent.
// Equivalently we could have defined a platform-independent base-class that
// exported Allocate(), Release(), etc.  The platform-specific class would extend
// that base-class, adding park(), unpark(), etc.
//
// A word of caution: The JVM uses 2 very similar constructs:
// 1. ParkEvent are used for Java-level "monitor" synchronization.
// 2. Parkers are used by JSR166-JUC park-unpark.
//
// We'll want to eventually merge these redundant facilities and use ParkEvent.
其中是說ParkEvent用於Java語言級別的關鍵字synchronized。

Parkers用於Java類庫中的併發資料集合,該集合是由JSR166發展來的。

這裡說這兩個東西功能類似,將來會統一使用ParkEvent。

那麼它們究竟有什麼區別呢?

我們先看看這兩個類的大概介面樣子:

(ParkEvent)

class ParkEvent : public os::PlatformEvent {
  private:
    ParkEvent * FreeNext ;

    // Current association
    Thread * AssociatedWith ;
    intptr_t RawThreadIdentity ;        // LWPID etc
    volatile int Incarnation ;

class PlatformEvent : public CHeapObj<mtInternal> {
    // Use caution with reset() and fired() -- they may require MEMBARs
    void reset() { _Event = 0 ; }
    int  fired() { return _Event; }
    void park () ;
    void unpark () ;
    int  TryPark () ;
    int  park (jlong millis) ; // relative timed-wait only
(Parkers)
class Parker : public os::PlatformParker {
public:
  // For simplicity of interface with Java, all forms of park (indefinite,
  // relative, and absolute) are multiplexed into one call.
  void park(bool isAbsolute, jlong time);
  void unpark();

  // Lifecycle operators
  static Parker * Allocate (JavaThread * t) ;
  static void Release (Parker * e) ;
private:
  static Parker * volatile FreeList ;
  static volatile int ListLock ;

可以看到它們提供一致的相同介面,park和unpark。從而支撐Java中併發控制的功能。

它們究竟有什麼不同呢?我們首先來執行2段類似的程式碼。

阻塞執行緒獲取鎖的順序完全相反

首先是使用synchronized提供的鎖機制,我們隨便用一個Object lock = new Object()作為鎖關聯的物件,程式碼如下,它的功能是讓10個執行緒進入阻塞狀態,然後釋放鎖,觀察隨後執行緒獲取鎖的順序:

package com.psly.testLocks;

public class TestLockSynchronized {

	private static Object lock = new Object();
	public static void main(String[] args) throws InterruptedException {
		// TODO Auto-generated method stub
		int N = 10;
		Thread[] threads = new Thread[N];
		for(int i = 0; i < N; ++i){
			threads[i] = new Thread(new Runnable(){
				public void run() {
					synchronized(lock){
						System.out.println(Thread.currentThread().getName() + " get synch lock!");
						try {
							Thread.sleep(200);
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
					}
				}
				
			});
		}
		synchronized(lock){
			for(int i = 0; i < N; ++i){
				threads[i].start();
				Thread.sleep(200);
			}
		}
			
		for(int i = 0; i < N; ++i)
			threads[i].join();
	}
}

我們用一個0.2seconds的時間,從而讓先建立的執行緒能夠先進入阻塞狀態,輸出為:

Thread-9 get synch lock!
Thread-8 get synch lock!
Thread-7 get synch lock!
Thread-6 get synch lock!
Thread-5 get synch lock!
Thread-4 get synch lock!
Thread-3 get synch lock!
Thread-2 get synch lock!
Thread-1 get synch lock!
Thread-0 get synch lock! 

這有點奇怪,先嚐試獲取鎖的執行緒竟然後獲得鎖!

先不管這個, 我們把這個例子改為JSR166的Lock重做一遍:

package com.psly.testLocks;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestLockSynchronized {

	private static Lock lock = new ReentrantLock();
	public static void main(String[] args) throws InterruptedException {
		// TODO Auto-generated method stub
		int N = 10;
		Thread[] threads = new Thread[N];
		for(int i = 0; i < N; ++i){
			threads[i] = new Thread(new Runnable(){
				public void run() {
					lock.lock();
						System.out.println(Thread.currentThread().getName() + " get JSR166 lock!");
						try {
							Thread.sleep(200);
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
					lock.unlock();
				}
				
			});
		}
		lock.lock();
			for(int i = 0; i < N; ++i){
				threads[i].start();
				Thread.sleep(200);
			}
		lock.unlock();
			
		for(int i = 0; i < N; ++i)
			threads[i].join();
	}
}

輸出為:

Thread-0 get JSR166 lock!
Thread-1 get JSR166 lock!
Thread-2 get JSR166 lock!
Thread-3 get JSR166 lock!
Thread-4 get JSR166 lock!
Thread-5 get JSR166 lock!
Thread-6 get JSR166 lock!
Thread-7 get JSR166 lock!
Thread-8 get JSR166 lock!
Thread-9 get JSR166 lock!
 這個輸出比較符合了我們的預期,畢竟先嚐試獲取鎖的的確先獲取了鎖。

為什麼這兩種實現有這樣的差異呢,我們來看下他們分別的阻塞佇列實現,首先是JAVA的:

    public void lock() {
        sync.lock();
    }

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
我們這裡重點檢視addWaiter(Node node);可以看出來,執行緒構造的阻塞節點是通過tail欄位加入進佇列的,並且作為next節點。這是個先進先出雙向佇列。

所以當鎖被釋放時,阻塞執行緒獲取鎖的順序與進阻塞佇列是一致的。

我們接著看下synchronized的實現,這裡涉及到JVM中系統程式設計的原始碼,這裡也只貼出跟進入阻塞佇列相關的程式碼:

class ObjectMonitor;

class ObjectSynchronizer : AllStatic {
  static void fast_enter  (Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS);
  static void slow_enter  (Handle obj, BasicLock* lock, TRAPS);
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
 if (UseBiasedLocking) {
    if (!SafepointSynchronize::is_at_safepoint()) {
      BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
      if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
        return;
      }
    } else {
      assert(!attempt_rebias, "can not rebias toward VM thread");
      BiasedLocking::revoke_at_safepoint(obj);
    }
    assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
 }

 slow_enter (obj, lock, THREAD) ;
}
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
  markOop mark = obj->mark();
  assert(!mark->has_bias_pattern(), "should not see bias pattern here");
  ................
  ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
void ATTR ObjectMonitor::enter(TRAPS) {
  // The following code is ordered to check the most common cases first
  // and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors.
  Thread * const Self = THREAD ;
  void * cur ;
  ·········
    for (;;) {
      jt->set_suspend_equivalent();
      // cleared by handle_special_suspend_equivalent_condition()
      // or java_suspend_self()

      EnterI (THREAD) ;

      if (!ExitSuspendEquivalent(jt)) break ;

      //
      // We have acquired the contended monitor, but while we were
      // waiting another thread suspended us. We don't want to enter
      // the monitor while suspended because that would surprise the
      // thread that suspended us.
      //
          _recursions = 0 ;
     ······
}

void ATTR ObjectMonitor::EnterI (TRAPS) {
    Thread * Self = THREAD ;
    assert (Self->is_Java_thread(), "invariant") ;
    assert (((JavaThread *) Self)->thread_state() == _thread_blocked   , "invariant") ;

    // Try the lock - TATAS
    if (TryLock (Self) > 0) {
			......
    }

    if (TrySpin (Self) > 0) {
       ......
    }

    ObjectWaiter node(Self) ;
    Self->_ParkEvent->reset() ;
    node._prev   = (ObjectWaiter *) 0xBAD ;
    node.TState  = ObjectWaiter::TS_CXQ ;

    // Push "Self" onto the front of the _cxq.
    // Once on cxq/EntryList, Self stays on-queue until it acquires the lock.
    // Note that spinning tends to reduce the rate at which threads
    // enqueue and dequeue on EntryList|cxq.
    ObjectWaiter * nxt ;
    for (;;) {
        node._next = nxt = _cxq ;
        if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

        // Interference - the CAS failed because _cxq changed.  Just retry.
        // As an optional optimization we retry the lock.
        if (TryLock (Self) > 0) {
            assert (_succ != Self         , "invariant") ;
            assert (_owner == Self        , "invariant") ;
            assert (_Responsible != Self  , "invariant") ;
            return ;
        }
    }
這裡的重點是,ObjectWaiter node(Self) ;
ObjectWaiter node(Self) ;
........
 for (;;) {
        node._next = nxt = _cxq ;
        if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

        // Interference - the CAS failed because _cxq changed.  Just retry.
        // As an optional optimization we retry the lock.
        if (TryLock (Self) > 0) {
            assert (_succ != Self         , "invariant") ;
            assert (_owner == Self        , "invariant") ;
            assert (_Responsible != Self  , "invariant") ;
            return ;
        }
    }
_cxq,我們採用比較並交換的原子指令,修改了_cxq,修改之前將_cxq的舊值填入node的next欄位,這樣一來我們就在_cxq上構造了個stack,也就是先進後出的佇列。於是下次當我們索取_cxq時候自然就取得了最後填入的值。這解釋了我們上面的執行示例,阻塞執行緒獲取鎖的順序與進佇列完全相反。

我們接著看下再複雜點的例子,依次啟動10個執行緒,依次獲取鎖,獲得鎖的同時列印自身資訊,然後主動呼叫wait語義的方法陷入阻塞狀態。等到這10個執行緒都阻塞之後主執行緒獲取鎖,接著再啟動10個無等待執行緒,這是個執行緒唯一做的事情就是依次獲取鎖,他們會按照我們上面所說的方式進入阻塞佇列。接著主執行緒依次傳送4次notify語義的訊號(注意時間間隔),然後釋放鎖。我們感興趣的是這幾個收到通知的執行緒,他們相對已經在阻塞佇列中的執行緒,誰會先獲取鎖?他們的排列又是怎麼樣的呢?

我們先執行JSR166的版本,程式碼如下:

package com.psly.testLocks;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestLockImplWithWaits {

	private static Lock lock = new ReentrantLock();
	private static Condition condition = lock.newCondition();
	public static void main(String[] args) throws InterruptedException {
		int N = 10;
		Thread[] threads = new Thread[N];
		Thread[] threadsForWaits = new Thread[N];
		for(int i = 0; i < N; ++i){
			threads[i] = new Thread(new Runnable(){
				@Override
				public void run() {
					lock.lock();		
					System.out.println(Thread.currentThread().getName() + " nowait get lock");
					try {
						Thread.sleep(200);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					lock.unlock();
				}
				
			});
		}
		for(int i = 0; i < N; ++i){
			threadsForWaits[i] = new Thread(new Runnable(){
				@Override
				public void run() {
					// TODO Auto-generated method stub
					lock.lock();		//synchronized(lock){
					System.out.println(Thread.currentThread().getName() + " wait first get lock");
					try {
						condition.await();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					
					System.out.println(Thread.currentThread().getName() + " wait second get lock");
					try {
						Thread.sleep(200);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					lock.unlock();
				}
				
			});
		}
		
		for(int i = 0; i < N; ++i){
			threadsForWaits[i].start();
			Thread.sleep(200);
		}
		lock.lock();	//synchronized(lock){
		for(int i = 0; i < N; ++i){
			threads[i].start();
				Thread.sleep(200);
		}
		for(int i = 0; i < 4 ; ++i){
				condition.signal();
		}
		Thread.sleep(200);
		lock.unlock();
			
			
		for(int i = 0; i < N; ++i)
			threads[i].join();
		
		for(int i = 0; i < N; ++i)
			threadsForWaits[i].join();
		
	}

}
Thread-10到Thread-19為主動呼叫wait阻塞的執行緒,Thread-0到Thread-9為只獲取鎖的執行緒。

輸出為:

Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-0 nowait get lock
Thread-1 nowait get lock
Thread-2 nowait get lock
Thread-3 nowait get lock
Thread-4 nowait get lock
Thread-5 nowait get lock
Thread-6 nowait get lock
Thread-7 nowait get lock
Thread-8 nowait get lock
Thread-9 nowait get lock
Thread-10 wait second get lock
Thread-11 wait second get lock
Thread-12 wait second get lock
Thread-13 wait second get lock

可以看到JSR166的實現依然滿足先進先出,即使Thread-10到Thread-13是先獲取鎖之後陷入wait的。我們接著看下這是如何做到的,

注意JSR166的實現是在JAVA層面完成的。

主要是三個呼叫:wait,notify,unlock。

await:

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
這裡的addConditionWaiter嘗試新增等待佇列的節點。acquireQueued用於將來被喚醒之後的再次嘗試獲取鎖。

我們來看addConditionWaiter,

        /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
是將新節點作為lastWaiter的next節點,並且本身成為lastWaiter節點。那麼這裡說明這構造的是一個先進先出的佇列。(這裡是在已經獲取鎖的情況下,所以不需同步)

我們接著看 signal

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

    final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }  

以上已經拿到了等待佇列第一個節點,接著enq讓他轉移(transfer)到鎖的阻塞佇列

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
這樣一來我們就完成了將等待執行緒從處於wait的狀態,轉移到了未獲得鎖處於阻塞的狀態。

最後看下當主執行緒釋放鎖時的操作:

unlock

    public void unlock() {
        sync.release(1);
    }
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
可以看到,呼叫unlock的執行緒喚醒了阻塞佇列head中的第一個執行緒。 (OVER)

因為阻塞佇列跟等待佇列都是先進先出,這樣子能夠得到一個比較好的行為。

從而導致了我們之前的輸出,看上去比較符合預期。

最後我們來看看採用synchronized,這個示例下的輸出是什麼,程式碼如下:

package com.psly.testLocks;

public class TestLockImplWithWaits {

	private static Object lock = new Object();
	public static void main(String[] args) throws InterruptedException {
		int N = 10;
		Thread[] threads = new Thread[N];
		Thread[] threadsForWaits = new Thread[N];
		for(int i = 0; i < N; ++i){
			threads[i] = new Thread(new Runnable(){
				@Override
				public void run() {
					synchronized(lock){		
					System.out.println(Thread.currentThread().getName() + " nowait get lock");
					try {
						Thread.sleep(200);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					}
				}
				
			});
		}
		for(int i = 0; i < N; ++i){
			threadsForWaits[i] = new Thread(new Runnable(){
				@Override
				public void run() {
					// TODO Auto-generated method stub
					synchronized(lock){
					System.out.println(Thread.currentThread().getName() + " wait first get lock");
					try {
						lock.wait();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					
					System.out.println(Thread.currentThread().getName() + " wait second get lock");
					try {
						Thread.sleep(200);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					}
				}
				
			});
		}
		
		for(int i = 0; i < N; ++i){
			threadsForWaits[i].start();
			Thread.sleep(200);
		}
		synchronized(lock){
		for(int i = 0; i < N; ++i){
			threads[i].start();
				Thread.sleep(200);
		}
		for(int i = 0; i < 4 ; ++i){
			lock.notify();
		}
		Thread.sleep(200);
		}
			
			
		for(int i = 0; i < N; ++i)
			threads[i].join();
		
		for(int i = 0; i < N; ++i)
			threadsForWaits[i].join();
		
	}

}
輸出入下:
Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-10 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
Thread-9 nowait get lock
Thread-8 nowait get lock
Thread-7 nowait get lock
Thread-6 nowait get lock
Thread-5 nowait get lock
Thread-4 nowait get lock
Thread-3 nowait get lock
Thread-2 nowait get lock
Thread-1 nowait get lock
Thread-0 nowait get lock

這個結果很奇怪,最奇怪在於居然是連續輸出Thread-10,Thread-13,Thread-12,Thread-11:

Thread-10 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
我們調整一下發送notify的數量,給出所有等待執行緒數量的呼叫N。
		for(int i = 0; i < N ; ++i){
			lock.notify();
		}
輸出為:
Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-10 wait second get lock
Thread-19 wait second get lock
Thread-18 wait second get lock
Thread-17 wait second get lock
Thread-16 wait second get lock
Thread-15 wait second get lock
Thread-14 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
Thread-9 nowait get lock
Thread-8 nowait get lock
Thread-7 nowait get lock
Thread-6 nowait get lock
Thread-5 nowait get lock
Thread-4 nowait get lock
Thread-3 nowait get lock
Thread-2 nowait get lock
Thread-1 nowait get lock
Thread-0 nowait get lock
依然是Thread-10莫名其妙出現在最前,後面緊接著Thread-19到Thread-11倒序。

我們再嘗試換下呼叫方式,採用notifyAll();

	synchronized(lock){
		for(int i = 0; i < N; ++i){
			threads[i].start();
				Thread.sleep(200);
		}
		lock.notifyAll();
		Thread.sleep(200);
		}

輸出為:

Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-19 wait second get lock
Thread-18 wait second get lock
Thread-17 wait second get lock
Thread-16 wait second get lock
Thread-15 wait second get lock
Thread-14 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
Thread-10 wait second get lock
Thread-9 nowait get lock
Thread-8 nowait get lock
Thread-7 nowait get lock
Thread-6 nowait get lock
Thread-5 nowait get lock
Thread-4 nowait get lock
Thread-3 nowait get lock
Thread-2 nowait get lock
Thread-1 nowait get lock
Thread-0 nowait get lock
這下子又變了,Thread-10變為最後,完全逆序來獲取鎖了。

我們嘗試進入JVM去看下這一切是怎麼回事。與之前的過程類似,首先我們來看看等待之後執行緒節點如何組織的:

經研究,應該是下面這片程式碼:

WAIT:

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
   Thread * const Self = THREAD ;


   TEVENT (Wait) ;

   assert (Self->_Stalled == 0, "invariant") ;
   Self->_Stalled = intptr_t(this) ;
   jt->set_current_waiting_monitor(this);

   ObjectWaiter node(Self);
   node.TState = ObjectWaiter::TS_WAIT ;
   Self->_ParkEvent->reset() ;
   OrderAccess::fence();          // ST into Event; membar ; LD interrupted-flag

   // Enter the waiting queue, which is a circular doubly linked list in this case
   // but it could be a priority queue or any data structure.
   // _WaitSetLock protects the wait queue.  Normally the wait queue is accessed only
   // by the the owner of the monitor *except* in the case where park()
   // returns because of a timeout of interrupt.  Contention is exceptionally rare
   // so we use a simple spin-lock instead of a heavier-weight blocking lock.

   Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
   AddWaiter (&node) ;
   Thread::SpinRelease (&_WaitSetLock) ;
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
  assert(node != NULL, "should not dequeue NULL node");
  assert(node->_prev == NULL, "node already in list");
  assert(node->_next == NULL, "node already in list");
  // put node at end of queue (circular doubly linked list)
  if (_WaitSet == NULL) {
    _WaitSet = node;
    node->_prev = node;
    node->_next = node;
  } else {
    ObjectWaiter* head = _WaitSet ;
    ObjectWaiter* tail = head->_prev;
    assert(tail->_next == head, "invariant check");
    tail->_next = node;
    head->_prev = node;
    node->_next = head;
    node->_prev = tail;
  }
}

如果_WaitSet為空,則設定它,並且前驅和後繼都是它。

如果只有_WaitSet一個,則將節點增加到它的後面。

不為空的情況下統一新增到next節點。

所以這裡是個先進先出的佇列。

notify

void ObjectMonitor::notify(TRAPS) {
  CHECK_OWNER();
  if (_WaitSet == NULL) {
     TEVENT (Empty-Notify) ;
     return ;
  }
  DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);

  int Policy = Knob_MoveNotifyee ;

  Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
  ObjectWaiter * iterator = DequeueWaiter() ;
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
  // dequeue the very first waiter
  ObjectWaiter* waiter = _WaitSet;
  if (waiter) {
    DequeueSpecificWaiter(waiter);
  }
  return waiter;
}

inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
  // dequeue the very first waiter
  ObjectWaiter* waiter = _WaitSet;
  if (waiter) {
    DequeueSpecificWaiter(waiter);
  }
  return waiter;
}
inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) {
  assert(node != NULL, "should not dequeue NULL node");
  assert(node->_prev != NULL, "node already removed from list");
  assert(node->_next != NULL, "node already removed from list");
  // when the waiter has woken up because of interrupt,
  // timeout or other spurious wake-up, dequeue the
  // waiter from waiting list
  ObjectWaiter* next = node->_next;
  if (next == node) {
    assert(node->_prev == node, "invariant check");
    _WaitSet = NULL;
  } else {
    ObjectWaiter* prev = node->_prev;
    assert(prev->_next == node, "invariant check");
    assert(next->_prev == node, "invariant check");
    next->_prev = prev;
    prev->_next = next;
    if (_WaitSet == node) {
      _WaitSet = next;
    }
  }
  node->_next = NULL;
  node->_prev = NULL;
}
static int Knob_MoveNotifyee       = 2 ;       // notify() - disposition of notifyee
     if (Policy == 2) {      // prepend to cxq
         // prepend to cxq
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
            iterator->TState = ObjectWaiter::TS_CXQ ;
            for (;;) {
                ObjectWaiter * Front = _cxq ;
                iterator->_next = Front ;
                if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                    break ;
                }
            }
         }
     } else
     if (Policy == 3) {      // append to cxq
        iterator->TState = ObjectWaiter::TS_CXQ ;
        for (;;) {
            ObjectWaiter * Tail ;
            Tail = _cxq ;
            if (Tail == NULL) {
這裡的DequeueWaiter呼叫DequeueSpecificWaiter,效果是隊列出一個元素,_WaitSet.next成為_WaitSet。這裡有_EntryList、_cxq兩個資料結構。

接著我們走Policy==2分支,注意這裡並不是全部放入cxq(儘管註釋如此),判斷是_EntryList==NULL的時候,直接將我們的節點放入它。否則,將我們的節點新增到_cxq這個stack前面。想象一個,假如第一個節點進來,發現_EntryList為空,_EntryList設定為它自己。從第二個節點開始,所有節點都是進stack,這樣的話是不是取出時,第二個往後的節點都顛倒了呢。假如我們取節點的方式是先驅_EntryList,然後再取stack中的元素。則就會發生示例中Thread-10提前的亂序情況。 

但是注意,之前的notifyAll並沒有產生這種效果。所以我們來看下notifyAll的程式碼:

     if (Policy == 2) {      // prepend to cxq
         // prepend to cxq
         iterator->TState = ObjectWaiter::TS_CXQ ;
         for (;;) {
             ObjectWaiter * Front = _cxq ;
             iterator->_next = Front ;
             if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                 break ;
             }
         }
     } else
果然如此!notifyAll的邏輯跟notify大部分一樣,除了它將所有節點都加入cxq。所以我們才會觀察到notifyAll呼叫之後的節點獲取鎖順序是逆序。

unlock

我們接著看看unlock的時候,是不是如我們猜測的那樣先取_EntryList的元素,再來看cxq。

void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
  fast_exit (object, lock, THREAD) ;
}
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
  assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here");
  // if displaced header is null, the previous enter is recursive enter, no-op
  .........

  ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
}
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
       ...................
       for (;;) {
      assert (THREAD == _owner, "invariant") ;


      if (Knob_ExitPolicy == 0) {

        .........
      } else {
			..........
         } else {
            TEVENT (Inflated exit - complex egress) ;
         }
      }

      guarantee (_owner == THREAD, "invariant") ;

      ObjectWaiter * w = NULL ;
      int QMode = Knob_QMode ;

  

      w = _EntryList  ;
      if (w != NULL) {

          assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }

      w = _cxq ;
      if (w == NULL) continue ;

      for (;;) {
          assert (w != NULL, "Invariant") ;
          ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
          if (u == w) break ;
          w = u ;
      }
 

      if (QMode == 1) {
               ..............
      } else {
         _EntryList = w ;
         ObjectWaiter * q = NULL ;
         ObjectWaiter * p ;
         for (p = w ; p != NULL ; p = p->_next) {
             guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
             p->TState = ObjectWaiter::TS_ENTER ;
             p->_prev = q ;
             q = p ;
         }
      }

      if (_succ != NULL) continue;

      w = _EntryList  ;
      if (w != NULL) {
          guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }
   }
static int Knob_QMode              = 0 ;       // EntryList-cxq policy - queue discipline
static int Knob_ExitPolicy         = 0 ;
這裡是先取_EntryList,假如有就呼叫ExitEpilog並返回,否則採用原子操作取_cxq,然後將這個值再次給_EntryList,並呼叫ExitEpilog。

總之這裡最終都是將資料給_EntryList,只不過假如_EntryList原本就有值,那麼我們會先使用它,之後再使用_cxq。

我們看下ExitEpilog完成了什麼事:

void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
   assert (_owner == Self, "invariant") ;

   // Exit protocol:
   // 1. ST _succ = wakee
   // 2. membar #loadstore|#storestore;
   // 2. ST _owner = NULL
   // 3. unpark(wakee)

   _succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
   ParkEvent * Trigger = Wakee->_event ;

   Wakee  = NULL ;

   // Drop the lock
   OrderAccess::release_store_ptr (&_owner, NULL) ;
   OrderAccess::fence() ;                               // ST _owner vs LD in unpark()

   if (SafepointSynchronize::do_call_back()) {
      TEVENT (unpark before SAFEPOINT) ;
   }

   Trigger->unpark() ;

   // Maintain stats and report events to JVMTI
   if (ObjectMonitor::_sync_Parks != NULL) {
      ObjectMonitor::_sync_Parks->inc() ;
   }
}
果然這裡最後呼叫了unpark,從而喚醒了相應的那個執行緒。這裡的_EntryList的值會如何變化?我們最後看下,當等待執行緒從wait中醒過來會做什麼:
// Note: a subset of changes to ObjectMonitor::wait()
// will need to be replicated in complete_exit above
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
..........
   ObjectWaiter node(Self);
   node.TState = ObjectWaiter::TS_WAIT ;

.............
   Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
   AddWaiter (&node) ;
   Thread::SpinRelease (&_WaitSetLock) ;
............
   if (node.TState == ObjectWaiter::TS_WAIT) {
         Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
         if (node.TState == ObjectWaiter::TS_WAIT) {
            DequeueSpecificWaiter (&node) ;       // unlink from WaitSet
            assert(node._notified == 0, "invariant");
            node.TState = ObjectWaiter::TS_RUN ;
         }
..............
     assert (_owner != Self, "invariant") ;
     ObjectWaiter::TStates v = node.TState ;
     if (v == ObjectWaiter::TS_RUN) {
         enter (Self) ;
     } else {
進入了enter(Self),
void ATTR ObjectMonitor::enter(TRAPS) {
    for (;;) {
      jt->set_suspend_equivalent();
      // cleared by handle_special_suspend_equivalent_condition()
      // or java_suspend_self()

      EnterI (THREAD) ;

進入EnterI(THREAD),
void ATTR ObjectMonitor::EnterI (TRAPS) {
    Thread * Self = THREAD ;
    assert (Self->is_Java_thread(), "invariant") ;
    assert (((JavaThread *) Self)->thread_state() == _thread_blocked   , "invariant") ;

    if (TryLock (Self) > 0) {
        assert (_succ != Self              , "invariant") ;
        assert (_owner == Self             , "invariant") ;
        assert (_Responsible != Self       , "invariant") ;
        return ;
    }

    DeferredInitialize () ;


    if (TrySpin (Self) > 0) {
        assert (_owner == Self        , "invariant") ;
        assert (_succ != Self         , "invariant") ;
        assert (_Responsible != Self  , "invariant") ;
        return ;
    }

    // The Spin failed -- Enqueue and park the thread ...
    assert (_succ  != Self            , "invariant") ;
    assert (_owner != Self            , "invariant") ;
    assert (_Responsible != Self      , "invariant") ;


    ObjectWaiter node(Self) ;
    Self->_ParkEvent->reset() ;
    node._prev   = (ObjectWaiter *) 0xBAD ;
    node.TState  = ObjectWaiter::TS_CXQ ;

    ObjectWaiter * nxt ;
    for (;;) {
        node._next = nxt = _cxq ;
        if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

        // Interference - the CAS failed because _cxq changed.  Just retry.
        // As an optional optimization we retry the lock.
        if (TryLock (Self) > 0) {
            assert (_succ != Self         , "invariant") ;
            assert (_owner == Self        , "invariant") ;
            assert (_Responsible != Self  , "invariant") ;
            return ;
        }
    }

    if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
        // Try to assume the role of responsible thread for the monitor.
        // CONSIDER:  ST vs CAS vs { if (Responsible==null) Responsible=Self }
        Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
    }

    TEVENT (Inflated enter - Contention) ;
    int nWakeups = 0 ;
    int RecheckInterval = 1 ;

    for (;;) {

        if (TryLock (Self) > 0) break ;
        assert (_owner != Self, "invariant") ;

        if ((SyncFlags & 2) && _Responsible == NULL) {
           Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
        }

        if (_Responsible == Self || (SyncFlags & 1)) {
            TEVENT (Inflated enter - park TIMED) ;
            Self->_ParkEvent->park ((jlong) RecheckInterval) ;
            RecheckInterval *= 8 ;
            if (RecheckInterval > 1000) RecheckInterval = 1000 ;
        } else {
            TEVENT (Inflated enter - park UNTIMED) ;
            Self->_ParkEvent->park() ;
        }

        if (TryLock(Self) > 0) break ;

        TEVENT (Inflated enter - Futile wakeup) ;
        if (ObjectMonitor::_sync_FutileWakeups != NULL) {
           ObjectMonitor::_sync_FutileWakeups->inc() ;
        }
        ++ nWakeups ;

        if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) {
           Self->_ParkEvent->reset() ;
           OrderAccess::fence() ;
        }
        if (_succ == Self) _succ = NULL ;

        // Invariant: after clearing _succ a thread *must* retry _owner before parking.
        OrderAccess::fence() ;
    }

    assert (_owner == Self      , "invariant") ;
    assert (object() != NULL    , "invariant") ;
    // I'd like to write:
    //   guarantee (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
    // but as we're at a safepoint that's not safe.

    UnlinkAfterAcquire (Self, &node) ;
    if (_succ == Self) _succ = NULL ;

    assert (_succ != Self, "invariant") ;
    if (_Responsible == Self) {
        _Responsible = NULL ;
        OrderAccess::fence(); // Dekker pivot-point
	.........
這裡的重點是UnlinkAfterAcquire,
void ObjectMonitor::UnlinkAfterAcquire (Thread * Self, ObjectWaiter * SelfNode)
{
    assert (_owner == Self, "invariant") ;
    assert (SelfNode->_thread == Self, "invariant") ;

    if (SelfNode->TState == ObjectWaiter::TS_ENTER) {
        // Normal case: remove Self from the DLL EntryList .
        // This is a constant-time operation.
        ObjectWaiter * nxt = SelfNode->_next ;
        ObjectWaiter * prv = SelfNode->_prev ;
        if (nxt != NULL) nxt->_prev = prv ;
        if (prv != NULL) prv->_next = nxt ;
        if (SelfNode == _EntryList ) _EntryList = nxt ;
        assert (nxt == NULL || nxt->TState == ObjectWaiter::TS_ENTER, "invariant") ;
        assert (prv == NULL || prv->TState == ObjectWaiter::TS_ENTER, "invariant") ;
        TEVENT (Unlink from EntryList) ;
    } else {
它會將_EntryList的值做更新,從而讓鎖的獲取繼續下去,保證不會出錯。

到這裡為止,我們終於大致走完了一遍synchronized鎖與lock鎖分別在JVM和JUC中的實現。

那麼有個問題,linux中pthread鎖的實現,行為模式又是怎麼樣的呢?

我們嘗試將使用pthread來執行測試這兩個例子:

鎖獲取程式碼:

編譯生成執行檔案testLock:gcc -pthread testLock.c -o testLock

執行:./testLock

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
#define N 10
void* runTask(void* pm){
    pthread_mutex_lock(&mutex);
    printf("%d get lock\n", (int)pm);
    usleep(100000);
    pthread_mutex_unlock(&mutex);
    return 0;
}

int main(){
//  int N = 10;
    pthread_t threads[N];
    int i = 0;
    pthread_mutex_lock(&mutex);
    for(i = 0; i < N; ++i){
        pthread_create(&threads[i], 0, runTask, (void*)i);
        usleep(100000);
    }
    pthread_mutex_unlock(&mutex);
    for(i = 0; i < N; ++i){
        pthread_join(threads[i], NULL);
    }
    return 0;

}

輸出:

0 get lock
1 get lock
2 get lock
3 get lock
4 get lock
5 get lock
6 get lock
7 get lock
8 get lock
9 get lock

鎖獲取+阻塞等待程式碼:

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
#define N 10
void* runTask(void* pm){
    pthread_mutex_lock(&mutex);
    printf("%d get lock\n", (int)pm);
    usleep(100000);
    pthread_mutex_unlock(&mutex);
    return 0;
}

void* runTaskWithWait(void* pm){
    pthread_mutex_lock(&mutex);
    printf("%d wait first get lock\n", (int)pm);
    pthread_cond_wait(&cond, &mutex);
    printf("%d wait second get lock\n", (int)pm);
    usleep(300000);
    pthread_mutex_unlock(&mutex);
}
int main(){
//  int N = 10;
    pthread_t threads[N];
    pthread_t threadsForWaits[N];
    int i = 0;
    for(; i < N; ++i){
        pthread_create(&threadsForWaits[i], 0, runTaskWithWait, (void*)i);
        usleep(100000);
    }

    pthread_mutex_lock(&mutex);
    for(i = 0; i < N; ++i){
        pthread_create(&threads[i], 0, runTask, (void*)i);
        usleep(100000);
    }
    //pthread_cond_broadcast(&cond);
    for(i = 0; i < N; ++i)
        pthread_cond_signal(&cond);
    usleep(100000);
    pthread_mutex_unlock(&mutex);
    for(i = 0; i < N; ++i){
        pthread_join(threads[i], NULL);
    }
    for(i = 0; i < N; ++i){
        pthread_join(threadsForWaits[i], NULL);
    }
    return 0;

} 
輸出:
0 wait first get lock
1 wait first get lock
2 wait first get lock
3 wait first get lock
4 wait first get lock
5 wait first get lock
6 wait first get lock
7 wait first get lock
8 wait first get lock
9 wait first get lock
0 get lock
1 get lock
2 get lock
3 get lock
4 get lock
5 get lock
6 get lock
7 get lock
8 get lock
9 get lock
1 wait second get lock
2 wait second get lock
3 wait second get lock
0 wait second get lock
4 wait second get lock
5 wait second get lock
6 wait second get lock
7 wait second get lock
8 wait second get lock
9 wait second get lock

只能說等待執行緒轉移到阻塞執行緒之後的排列,看起來是沒啥規律 ([email protected][email protected]=)

相關推薦

JVM:實現(synchronized&JSR166)行為分析相關原始碼

JVM中有這樣一段註釋: // The base-class, PlatformEvent, is platform-specific while the ParkEvent is // platform-independent. PlatformEvent provid

深度學習助力實現智慧行為分析事件識別

作者:趙放、杜勇、王洪鬆、吳子豐 行為識別是指通過分析視訊、深度感測器等資料,利用特定的演算法,對行人的行為進行識別、分析的技術。這項技術被廣泛應用在視訊分類、人機互動、安防監控等領域。行為識別包含兩個研究方向:個體行為識別與群體行為(事件)識別。近年來,深度攝像技術的發展使得人體運動的深度影象序列變得容

水印第三版 ~ 變態水印(這次用Magick.NET來實現,附需求分析原始碼

以前的水印,只是簡單走起,用的是原生態的方法。現在各種變態水印,於是就不再用原生態的了,太麻煩了,這次用的是Magick,這個類庫還是很有名的,圖形化處理基本上都是支援的,至少逆天是挺喜歡的 歷史文章: 1.逆天通用水印支援Winform,WPF,Web,WP,Win10。支援位置選擇(9個位置

hive實現網站使用者行為分析指標

欄位解釋accessDate    //訪問時間,精確到日期,String格式accessTime  //訪問時間,精確到毫秒,int格式accessHour   //訪問小時,區間為0-23,int格式 requestMethod   //請求方式(get post 統計的

java中自定義實現synchronized功能

public class Test {private static long count = 0;private Lock lock = new Lock();private int m = 0;private int a = 0;private int b = 0;pub

S4系統模型分析關鍵原始碼讀解

S4(Simple Scalable Stream System) 流資料處理系統是Yahoo!公司提出的,在2011年的時候成為Apache軟體基金下的一個孵化專案,可惜的是在2014年的時候該孵化專案“退休”了,具體原因未知!!從這裡可以瞭解它當前的狀態資訊:link. 閱讀了所發表的論

分析思維 第四篇:資料分析入門階段——描述性統計分析相關分析

資料分析的入門思維,首先要認識資料,然後對資料進行簡單的分析,比如描述性統計分析和相關性分析等。 一,認識變數和資料 變數和資料是資料分析中常用的概念,用變數來描述事物的特徵,而資料是變數的具體值,把變數的值也叫做觀測值。 1,變數 變數是用來描述總體中成員的某一個特性,例如,性別、年齡、身高、收入等。 變數

presto UI 分析相關原始碼分析

CLUSTER OVERVIEW RESERVED MEMORY QUERY DETAILS

JAVA互斥(synchronized&Lock):行為分析原始碼

JVM中有這樣一段註釋: // The base-class, PlatformEvent, is platform-specific while the ParkEvent is // platform-independent. PlatformEvent provides park()

實現怎樣分析

Java Deadlock Example and How to analyze deadlock situation 所謂死鎖:是指兩個或兩個以上的程序在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產

簡單理解javascript中的原型對象,實現對之間共享屬性行為

type屬性 定義 say 能夠 方法 () post spa popu javascript中提供了構造函數。可以方便的創建對象。典型的構造函數例如以下: function Person(name, age) {   this.name = name;

leetcode: Longest Valid Parentheses分析實現

fin 最大 區間 之間 n) parent 復雜度 gif package   題目大意:給出一個只包含字符‘(‘和‘)‘的字符串S,求最長有效括號序列的長度。      很有趣的題目,有助於我們對這種人類自身制定的規則的深入理解,可能我們大多數人都從沒有真正理解過怎樣

RabbitMQ實戰:可用性分析實現

RabbitMQ本系列是「RabbitMQ實戰:高效部署分布式消息隊列」書籍的總結筆記。 上一篇介紹了各種場景下的最佳實踐,大部分場景可以使用「發後即忘」的模式,不需要響應,如果需要響應,可以使用RabbitMQ的RPC模型。 RabbitMQ以異步的方式解耦系統間的關系,調用者將業務請求發送到Rabbit

數據庫事務隔離級別實現機制

約定 表鎖 四種 back 數據庫操作 升級 數據對象 三級封鎖 pro 1. 數據庫事務處理中出現的數據不一致的情況 在多個事務並發做數據庫操作的時候,如果沒有有效的避免機制,就會出現種種問題。大體上有四種問題,歸結如下: 1.1 丟失更新 如果兩個事務都要更新數據庫一個

Mybatis實現一對一查詢 對ResultTypeResultMap分析

結果 列名 單獨 定義 延遲加載 map 映射 包括 增加 實現一對一查詢: ResultMap:使用ResultType實現較為簡單,如果pojo中沒有包括查詢出來的列名,需要增加 列名對應的屬性,即可完成映射。 如果沒有查詢結果的特殊要求建議使用

基於rediszookeeper的分布式實現方式

自動 key-value 判斷 nosql 順序 種類型 超時時間 key存在 sql數據庫 先來說說什麽是分布式鎖,簡單來說,分布式鎖就是在分布式並發場景中,能夠實現多節點的代碼同步的一種機制。從實現角度來看,主要有兩種方式:基於redis的方式和基於zookeeper的

對象synchronized修飾static方法與非static方法的區別

ati nbsp 的區別 一個 靜態方法 範圍 之間 對象 ron 當synchronized修飾一個static方法時,多線程下,獲取的是類鎖(即Class本身,註意:不是實例), 作用範圍是整個靜態方法,作用的對象是這個類的所有對象。 當synchro

Java內部synchronized)中類物件

版權宣告:本文為博主原創文章,轉載請註明出處。 https://blog.csdn.net/qq_25827845/article/details/77688880        synchronized是Java提供的內部鎖,裡邊有類鎖和物件鎖;在靜態方

Java 多執行緒分段下載原理分析實現

多執行緒下載介紹   多執行緒下載技術是很常見的一種下載方案,這種方式充分利用了多執行緒的優勢,在同一時間段內通過多個執行緒發起下載請求,將需要下載的資料分割成多個部分,每一個執行緒只負責下載其中一個部分,然後將下載後的資料組裝成完整的資料檔案,這樣便大大加快了下載效率。常見的下載器,迅

java機制的兩種實現synchronized 與ReentrantLock

java的多執行緒環境下併發是常見問題,這兩天看了鎖相關的問題,記錄下兩個簡單的用鎖實現等待/喚醒機制的demo。 1.synchronized方式實現等待/喚醒。 public class WaitAndNotify { private static boolean flag =