1. 程式人生 > >圖解java.util.concurrent原始碼(三) Reentrantlock && Semaphore

圖解java.util.concurrent原始碼(三) Reentrantlock && Semaphore

引言

Reentrantlock和Semaphore分別是AQS在獨佔模式和共享模式下經典的實現,在理解AQS的情況下看這兩個類的程式碼會感到非常簡單,如果還沒理解AQS的話,建議先讀我這個系列的第一篇文章

複習AQS

回憶一下AQS,AQS中維護了一個state同步狀態,它的子類只需要實現以下幾個方法,並在方法中修改判斷state的值即可:

獨佔模式的同步器(比如Reentrantlock)需要實現:

  • tryAcquire
  • tryRelease

共享模式的同步器(比如Semaphore)則需要實現:

  • tryAcquireShared
  • tryReleaseShared

如果想要進一步使用AQS的ConditionObject進行執行緒間同步的話,則子類還應該實現下面的方法:

  • isHeldExclusively

下面就中點分析這幾個方法。

Reentrantlock

開啟ReentrantLock最常用的三個方法看看(分別是lock,unlock和newCondition),果然全部委託給了叫做sync的內部類物件:

	public void lock() {
		sync.lock();
	}
	public void unlock() {
		sync.release(1);
	}
	public Condition newCondition() {
		return sync.newCondition();
	}

而Sync內部類其實就是AQS的實現:

	abstract static class Sync extends AbstractQueuedSynchronizer

而Reentrantlock中還有兩個Sync的子類內部類:

    static final class NonfairSync extends Sync 
	static final class FairSync extends Sync

在ReentrantLock中真正使用的是這兩個子類,分別對應非公平鎖與公平鎖。公平鎖能夠保證執行緒按照先進先出(FIFO)的方式獲得鎖,但是一般認為公平鎖的效能不如非公平鎖。

下面我們帶著兩個問題繼續閱讀:

  • ReentrantLock是如何實現可重入(即同一個執行緒可以持有該鎖的情況下多次lock)的?
  • 公平鎖的FIFO是怎麼實現的呢?

非公平鎖

非公平鎖NonfairSync的lock方法實現:

		final void lock() {
			if (compareAndSetState(0, 1))
				setExclusiveOwnerThread(Thread.currentThread());
			else
				acquire(1);
		}

發現它會先嚐試一下立即獲得鎖,如果失敗的話則退化為正常AQS獲鎖流程(即父類AQS中的acquire方法),這裡注意到acquire方法接收的引數是1。

tryAcquire方法的實現:

		protected final boolean tryAcquire(int acquires) {
			return nonfairTryAcquire(acquires);
		}

發現它直接呼叫的是父類的nonfairTryAcquire方法:

		final boolean nonfairTryAcquire(int acquires) {
			final Thread current = Thread.currentThread();
			int c = getState();
			//state == 0表示該鎖處於空閒狀態
			if (c == 0) {
			    //獲得鎖成功
				if (compareAndSetState(0, acquires)) {
					setExclusiveOwnerThread(current);
					return true;
				}
			} else if (current == getExclusiveOwnerThread()) {   //執行緒重入的情況
				int nextc = c + acquires;
				if (nextc < 0) // overflow
					throw new Error("Maximum lock count exceeded");
				setState(nextc);
				return true;
			}
			return false;
		}

如果發現鎖處於空閒狀態(state == 0),則嘗試獲得鎖,否則的話,先判斷一下重入的情況,如果是重入的情況(current == getExclusiveOwnerThread()),則將同步狀態state加1(int nextc = c + acquires;),這裡的acquires的值只可能是1,因為我們之前看到lock方法中始終呼叫的是acquire(1)

再看一下tryRelease方法,tryRelease方法在父類Sync中,也就是說公平鎖與非公鎖共用的是同一個tryRelease方法:

		protected final boolean tryRelease(int releases) {
			int c = getState() - releases;//減1
            //如果釋放的執行緒不是持有鎖的執行緒,則丟擲異常
			if (Thread.currentThread() != getExclusiveOwnerThread())
				throw new IllegalMonitorStateException();
			boolean free = false;
			if (c == 0) {   //鎖已經釋放完全的狀態
				free = true;
				setExclusiveOwnerThread(null);
			}
			setState(c);
			return free;
		}

大體上做的事情就是將同步狀態state減1,如果發現減到了零的話,則通過setExclusiveOwnerThread將AQS的exclusiveOwnerThread變數置空,如果已經減到零了,執行緒再次呼叫unlock方法的話,則會因為Thread.currentThread() != getExclusiveOwnerThread()的判斷條件丟擲IllegalMonitorStateException異常。

看到這裡我們可以回答上面提出的第一個問題了:

ReentrantLock是如何實現可重入的呢?

答:通過維護同步狀態state的含義為“執行緒重入的次數”,每次執行緒重入將其加1,釋放鎖將其減1,直到減成0,將其徹底釋放。

順手去看看為了支援執行緒間同步(newCondition)而實現的isHeldExclusively方法(位於Sync類中):

		protected final boolean isHeldExclusively() {
			return getExclusiveOwnerThread() == Thread.currentThread();
		}

發現非常簡單,就是判斷一下持有鎖的執行緒是否是當前執行緒,我在第一篇將AQS的文章中說過,ConditionObject的signal方法會首先呼叫一下isHeldExclusively方法確認呼叫方法的執行緒是否持有鎖。

公平鎖

FairSync的lock方法的實現:

		final void lock() {
             //非公平鎖與公平鎖的不同之處一
			acquire(1);
		}

發現非常簡單粗暴,直接呼叫AQS父類的acquire方法,AQS中維護的CLH佇列就是FIFO的,所以這裡直接呼叫acquire即可。而之前的非公平鎖的“非公平”又體現再哪裡呢?重看一下NonfairSync的lock方法,發現其實就體現在:執行緒會先嚐試一次“插隊”,直接設定state獲得鎖,然後才會呼叫acquire方法走FIFO的CLH佇列,在這個過程中有可能造成CLH佇列中等待的執行緒被後來的執行緒給“插隊”了,就是這個"插隊"的行為導致了“不公平”。

上述的修改依舊沒能完全制止執行緒插隊的機會,AQS的acquire方法中也會先嚐試先用tryAcquire方法插隊,然後才進入CLH佇列,所以FairSync對tryAcquire方法也進行了細微的修改(相比NonfairSync):

		protected final boolean tryAcquire(int acquires) {
			final Thread current = Thread.currentThread();
			int c = getState();
			if (c == 0) {// 初始化狀態
				//hasQueuedPredecessors()是公平鎖與非公平鎖的區別二
				//這個方法來自於AQS
				//hasQueuedPredecessors判斷當前執行緒是否是CLH佇列的隊頭
				//如果在CLH佇列中沒有前繼且CAS成功才能成功獲得鎖
				if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
					setExclusiveOwnerThread(current);
					return true;
				}
			} else if (current == getExclusiveOwnerThread()) {// 重入
				int nextc = c + acquires;
				if (nextc < 0)
					throw new Error("Maximum lock count exceeded");
				setState(nextc);
				return true;
			}
			return false;
		}
	}

可以看出這段程式碼和NonfairSync的tryAcquire基本相同,除了在獲得鎖的判斷條件上添加了一個hasQueuedPredecessors,這個方法來自於父類AQS,如果當前執行緒是CLH佇列的隊頭則返回false,否則返回true。

為什麼要做這一層防護呢?因為在AQS的acquire方法中,執行緒仍然會先嚐試呼叫tryAcquire方法插個隊,之後才進入acquireQueued方法:

    //AQS中的acquire方法
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

執行緒在進入acquireQueued方法之後就徹底是FIFO的了,所以要在前面的tryAcquire再進行一道防護,防止在這裡"插隊"。

上面的這段文字就回答了之前提出的第二個問題,“公平鎖的FIFO是怎麼實現的呢?

hasQueuedPredecessors方法在將AQS時候漏講了,這裡補充一下:

    public final boolean hasQueuedPredecessors() {
        Node t = tail; 
        Node h = head;
        Node s;  //s代表等待佇列的第一個節點
        //h != t 是為了判斷CLH佇列為空的情況
        //(s = h.next) == null 說明此時有另一個執行緒正在嘗試成為頭節點,詳見AQS的acquireQueued方法
        //s.thread != Thread.currentThread()  此執行緒不是等待的頭節點
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

判斷條件還是有點難理解的,h != t比較顯然,是為了判斷CLH佇列為空的情況,接下來的條件是在佇列不為空的情況下進行判斷,我逐個分析:

  • (s = h.next) == null

    首先複習一下AQS中的CLH佇列,它的頭結點代表當前獲得鎖的執行緒,而頭節點的下一個節點才代表等待佇列的第一個執行緒。

    所以這裡先通過s = h.next取到等待佇列的第一個節點賦給s。

    這裡h.next有可能為null,這就要複習一下AQS的acquireQueued方法了,當等待佇列的第一個執行緒獲得鎖時,它會將頭節點的next置空,這個置空next的執行緒顯然是呼叫hasQueuedPredecessors的前繼之一,所以返回true

  • s.thread != Thread.currentThread()

    當明白s節點代表的就是等待佇列的第一個的時候,這個也就很簡單了,如果第一個不是當前執行緒,則肯定是存在前繼的,返回true即可。

Semaphore

Semaphore用來在併發下管理數量有限的資源,是典型的共享模式下的AQS的實現。

和ReentrantLock一樣,也分為公平模式和非公平模式。

Semaphore的關鍵方法如下:

  • acquire 獲得許可
  • release 釋放許可

Semaphore並不支援使用CondionObject進行執行緒間的同步。

看看acquire方法:

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

直接呼叫了AQS的acquireSharedInterruptibly方法,表明以共享模式使用AQS

再看看release方法:

    public void release() {
        sync.releaseShared(1);
    }
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

也是直接調了AQS的releaseShared方法,共享模式釋放。

非公平鎖

NonfairSync的tryAcquire方法:

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }

再去其父類看nonfairTryAcquireShared方法:

        final int nonfairTryAcquireShared(int acquires) {
            //CAS迴圈
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

很好懂,只要明確一點就能夠看懂Semaphore的程式碼:

  • Semaphore的AQS中的同步狀態state代表的是剩餘許可的數量

上面那段程式碼其實就是通過CAS迴圈不斷嘗試減少響應數量的許可。

tryRelease方法也非常簡單,就是通過CAS迴圈不斷嘗試增加相應數量的許可:

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

公平鎖

和ReentrantLock一樣,就是加了一個hasQueuedPredecessors的判斷而已:

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                //和非公鎖的區別
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }