1. 程式人生 > 實用技巧 >佇列同步器(AQS)詳解

佇列同步器(AQS)詳解

轉自:https://blog.csdn.net/sunxianghuang/article/details/52287968

佇列同步器(AQS)

佇列同步器AbstractQueuedSynchronizer(以下簡稱同步器),是用來構建鎖或者其他同步元件的基礎框架,它使用了一個int成員變數表示同步狀態,通過內建的FIFO佇列來完成資源獲取執行緒的排隊工作,併發包的作者(Doug Lea)期望它能夠成為實現大部分同步需求的基礎。

佇列同步器的基本結構

同步器依賴內部的同步佇列(一個FIFO雙向佇列)來完成同步狀態的管理。同步佇列中的節點(Node)用來儲存"獲取同步狀態失敗的執行緒"引用、等待狀態以及前驅和後繼節點。

同步器包含了兩個節點型別的引用,一個指向頭節點,而另一個指向尾節點。

  1. public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
  2. implements java.io.Serializable {
  3. ......
  4. private transient volatile Node head;//頭節點
  5. private transient volatile Node tail;//尾節點
  6. private volatile int state;//*同步狀態*
  7. ......
  8. static final class Node {
  9. volatile int waitStatus;//等待狀態
  10. volatile Node prev;//前驅
  11. volatile Node next;//後繼
  12. volatile Thread thread;//執行緒引用
  13. ......
  14. }
  15. ......
  16. }

:Node型別的prev、next屬性以及AbstractQueuedSynchronizer型別的head 、tail屬性都設定為volatile,保證可見性。

自定義同步元件的設計思路

同步器的主要使用方式是繼承,子類通過繼承同步器並實現它的抽象方法來管理同步狀態,在抽象方法的實現過程中免不了要對同步狀態進行更改,這時就需要使用同步器提供的3個方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))來進行操作,因為它們能夠保證狀態的改變是安全的。

子類推薦被定義為自定義同步元件的靜態內部類,同步器自身沒有實現任何同步介面,它僅僅是定義了若干同步狀態獲取和釋放的方法來供自定義同步元件使用,同步器既可以支援獨佔式地獲取同步狀態,也可以支援共享式地獲取同步狀態,這樣就可以方便實現不同型別的同步元件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。

同步器是實現(也可以是任意同步元件)的關鍵,在鎖的實現中聚合(組合)同步器,利用同步器實現鎖的語義。可以這樣理解二者之間的關係:鎖是面向使用者的,它定義了使用者與鎖互動的介面(比如可以允許兩個執行緒並行訪問),隱藏了實現細節;同步器面向的是鎖的實現者,它簡化了鎖的實現方式,遮蔽了同步狀態管理、執行緒的排隊、等待與喚醒等底層操作。鎖和同步器很好地隔離了使用者和實現者所需關注的領域。

同步器的設計是基於模板方法模式的,也就是說,使用者需要繼承同步器並重寫指定的方法,隨後將同步器組合在自定義同步元件的實現中,並呼叫同步器提供的模板方法,而這些模板方法將會呼叫使用者重寫的方法
重寫同步器指定的方法時,需要使用同步器提供的如下3個方法來訪問或修改同步狀態。
getState():獲取當前同步狀態。
setState(int newState):設定當前同步狀態。
compareAndSetState(int expect,int update):使用CAS設定當前狀態,該方法能夠保證狀態設定的原子性。

獨佔式同步元件的設計

可重寫的方法

  1. /*Attempts to acquire in exclusive mode. This method should query if the state of the object
  2. permits it to be acquired in the exclusive mode, and if so to acquire it.*/
  3. //獨佔式獲取同步狀態,實現該方法需要查詢當前狀態並判斷同步狀態是否符合預期,然後再進行CAS設定同步狀態
  4. protected boolean tryAcquire(int arg)
  5. /*Attempts to set the state to reflect a release in exclusive mode.*/
  6. //獨佔式釋放同步狀態,等待獲取同步狀態的執行緒將有機會獲取同步狀態
  7. protected boolean tryRelease(int arg)
  8. /*Returns true if synchronization is held exclusively with respect to the current (calling) thread. */
  9. //當前同步器是否在獨佔模式下被執行緒佔用,一般該方法表示是否被當前執行緒所獨佔
  10. protected boolean isHeldExclusively()

同步器提供的模板方法

  1. /*Acquires in exclusive mode, ignoring interrupts.*/
  2. //獨佔式獲取同步狀態,如果當前執行緒獲取同步狀態成功,立即返回。否則,將會進入同步佇列等待,
  3. //該方法將會重複呼叫重寫的tryAcquire(int arg)方法
  4. public final void acquire(int arg)
  5. /*Acquires in exclusive mode, aborting if interrupted.*/
  6. //與acquire(int arg)基本相同,但是該方法響應中斷。
  7. public final void acquireInterruptibly(int arg)
  8. /* Releases in exclusive mode. Implemented by unblocking one or more threads if {@link #tryRelease} returns true.
  9. This method can be used to implement method {@link Lock#unlock}.*/
  10. //獨佔式釋放同步狀態,該方法會在釋放同步狀態後,將同步佇列中第一個節點包含的執行緒喚醒
  11. public final boolean release(int arg)

acquire(int arg)模板方法

通過呼叫同步器的acquire(int arg)方法可以獲取同步狀態。該方法對中斷不敏感,也就是說,由於執行緒獲取同步狀態失敗後進入同步佇列中,後續對執行緒進行中斷操作時,執行緒不會從同步佇列中移除。

  1. public final void acquire(int arg) {//**該方法是模板方法**
  2. if (!tryAcquire(arg) &&//先通過tryAcquire獲取同步狀態
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//獲取同步狀態失敗則生成節點並加入同步佇列
  4. selfInterrupt();
  5. }

獨佔式同步狀態獲取流程

主要邏輯:首先呼叫自定義同步器實現tryAcquire(int arg)方法,該方法保證執行緒安全的獲取同步狀態,如果同步狀態獲取失敗,則構造同步節點(獨佔式Node.EXCLUSIVE,同一時刻只能有一個執行緒成功獲取同步狀態)並通過addWaiter(Node node)方法將該節點加入到同步佇列的尾部,最後呼叫acquireQueued(Node node,int arg)方法,使得該節點以“死迴圈”的方式獲取同步狀態。



將節點加入同步佇列

當前執行緒獲取同步狀態失敗時,同步器會將當前執行緒等待狀態等資訊構造成為一個節點(Node)並將其加入同步佇列,同時會阻塞當前執行緒。

試想一下,當一個執行緒成功地獲取了同步狀態(或者鎖),其他執行緒將無法獲取到同步狀態,轉而被構造成為節點並加入到同步佇列中,而這個加入佇列的過程必須要保證執行緒安全

因此,同步器提供了一個基於CAS的設定尾節點的方法:compareAndSetTail(Nodeexpect,Nodeupdate),它需要傳遞當前執行緒“認為”的尾節點和當前節點,只有設定成功後,當前節點才正式與之前的尾節點建立關聯。

  1. //將節點加入到同步佇列的尾部
  2. private Node addWaiter(Node mode) {
  3. Node node = new Node(Thread.currentThread(), mode);//生成節點(Node)
  4. // Try the fast path of enq; backup to full enq on failure
  5. //快速嘗試在尾部新增
  6. Node pred = tail;
  7. if (pred != null) {
  8. node.prev = pred;//先將當前節點node的前驅指向當前tail
  9. if (compareAndSetTail(pred, node)) {//CAS嘗試將tail設定為node
  10. //如果CAS嘗試成功,就說明"設定當前節點node的前驅"與"CAS設定tail"之間沒有別的執行緒設定tail成功
  11. //只需要將"之前的tail"的後繼節點指向node即可
  12. pred.next = node;
  13. return node;
  14. }
  15. }
  16. enq(node);//否則,通過死迴圈來保證節點的正確新增
  17. return node;
  18. }
  1. private Node enq(final Node node) {
  2. for (;;) {//通過死迴圈來保證節點的正確新增
  3. Node t = tail;
  4. if (t == null) { // Must initialize 同步佇列為空的情況
  5. if (compareAndSetHead(new Node()))
  6. tail = head;
  7. } else {
  8. node.prev = t;
  9. if (compareAndSetTail(t, node)) {//直到CAS成功為止
  10. t.next = node;
  11. return t;//結束迴圈
  12. }
  13. }
  14. }
  15. }

在enq(final Node node)方法中,同步器通過“死迴圈”來保證節點的正確新增,在“死迴圈”中只有通過CAS將節點設定成為尾節點之後,當前執行緒才能從該方法返回,否則,當前執行緒不斷地嘗試設定。可以看出,enq(final Node node)方法將併發新增節點的請求通過CAS變得“序列化”了。

序列化的優點

如果通過加鎖同步的方式新增節點,執行緒必須獲取鎖後才能新增尾節點,那麼必然會導致其他執行緒等待加鎖而阻塞,獲取鎖的執行緒釋放鎖後阻塞的執行緒又會被喚醒,而執行緒的阻塞和喚醒需要依賴於系統核心完成,因此程式的執行需要從使用者態切換到核心態,而這樣的切換是非常耗時的操作。如果我們通過”迴圈CAS“來新增節點的話,所有執行緒都不會被阻塞,而是不斷失敗重試,執行緒不需要進行鎖同步,不僅消除了執行緒阻塞喚醒的開銷而且消除了加鎖解鎖的時間開銷。但是迴圈CAS也有其缺點,迴圈CAS通過不斷嘗試來新增節點,如果說CAS操作失敗那麼將會佔用處理器資源。

節點的自旋

節點進入同步佇列之後,就進入了一個自旋的過程,每個節點(或者說是執行緒)都在自省地觀察,當條件滿足,獲取到了同步狀態,就可以從這個自旋過程中退出,否則依舊留在這個自旋過程中。

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {//無限迴圈
  6. final Node p = node.predecessor();
  7. if (p == head && tryAcquire(arg)) {//前驅節點是首節點且獲取到了同步狀態
  8. setHead(node); //設定首節點
  9. p.next = null; // help GC 斷開引用
  10. failed = false;
  11. return interrupted;//從自旋中退出
  12. }
  13. if (shouldParkAfterFailedAcquire(p, node) &&//獲取同步狀態失敗後判斷是否需要阻塞或中斷
  14. parkAndCheckInterrupt())//阻塞當前執行緒
  15. interrupted = true;
  16. }
  17. } finally {
  18. if (failed)
  19. cancelAcquire(node);
  20. }
  21. }
  1. /**Checks and updates status for a node that failed to acquire.
  2. * Returns true if thread should block. This is the main signal control in all acquire loops.*/
  3. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  4. int ws = pred.waitStatus;//獲取前驅節點的等待狀態
  5. if (ws == Node.SIGNAL)
  6. //SIGNAL狀態:前驅節點釋放同步狀態或者被取消,將會通知後繼節點。因此,可以放心的阻塞當前執行緒,返回true。
  7. /* This node has already set status asking a release to signal it, so it can safely park.*/
  8. return true;
  9. if (ws > 0) {//前驅節點被取消了,跳過前驅節點並重試
  10. /* Predecessor was cancelled. Skip over predecessors and indicate retry. */
  11. do {
  12. node.prev = pred = pred.prev;
  13. } while (pred.waitStatus > 0);
  14. pred.next = node;
  15. } else {//獨佔模式下,一般情況下這裡指前驅節點等待狀態為SIGNAL
  16. /* waitStatus must be 0 or PROPAGATE. Indicate that we need a signal, but don't park yet. Caller will need to
  17. * retry to make sure it cannot acquire before parking. */
  18. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//設定當前節點等待狀態為SIGNAL
  19. }
  20. return false;
  21. }
  1. /** Convenience method to park and then check if interrupted 。return {@code true} if interrupted */
  2. private final boolean parkAndCheckInterrupt() {
  3. LockSupport.park(this);//阻塞當前執行緒
  4. return Thread.interrupted();
  5. }



可以看到節點和節點之間在迴圈檢查的過程中基本不相互通訊,而是簡單地判斷自己的前驅是否為頭節點,這樣就使得節點的釋
放規則符合FIFO。並且也便於對過早通知的處理(過早通知是指:前驅節點不是頭節點的執行緒由於中斷而被喚醒)。

當同步狀態獲取成功之後,當前執行緒從acquire(int arg)方法返回,如果對於鎖這種併發元件而言,代表著當前執行緒獲取了鎖。

設定首節點

同步佇列遵循FIFO,首節點是獲取同步狀態成功的節點,首節點的執行緒在釋放同步狀態時,將會喚醒後續節點,而後續節點將會在獲取同步狀態成功時將自己設定為首節點。

設定首節點是由獲取同步狀態成功的執行緒來完成的,由於只有一個執行緒能夠成功的獲取到同步狀態,因此設定頭節點的方法並不需要使用CAS來保證,它只需要將首節點設定成為原首節點後繼節點,並斷開首節點的next引用即可。

釋放同步狀態

當前執行緒獲取同步狀態並執行了相應邏輯之後,就需要釋放同步狀態,使得後續節點能夠繼續獲取同步狀態。通過呼叫同步器的release(int arg)方法可以釋放同步狀態,該方法在釋放了同步狀態之後,會"喚醒"其後繼節點(進而使後繼節點重新嘗試獲取同步狀態)。

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {//釋放同步狀態
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)//獨佔模式下這裡表示SIGNAL
  5. unparkSuccessor(h);//喚醒後繼節點
  6. return true;
  7. }
  8. return false;
  9. }
  1. /** Wakes up node's successor, if one exists.*/
  2. private void unparkSuccessor(Node node) {
  3. int ws = node.waitStatus;//獲取當前節點等待狀態
  4. if (ws < 0)
  5. compareAndSetWaitStatus(node, ws, 0);//更新等待狀態
  6. /* Thread to unpark is held in successor, which is normally just the next node.
  7. But if cancelled or apparently null,
  8. * traverse backwards from tail to find the actual non-cancelled successor.*/
  9. Node s = node.next;
  10. if (s == null || s.waitStatus > 0) {//找到第一個沒有被取消的後繼節點(等待狀態為SIGNAL)
  11. s = null;
  12. for (Node t = tail; t != null && t != node; t = t.prev)
  13. if (t.waitStatus <= 0)
  14. s = t;
  15. }
  16. if (s != null)
  17. LockSupport.unpark(s.thread);//喚醒後繼執行緒
  18. }

總結:在獲取同步狀態時,同步器維護一個同步佇列,獲取狀態失敗的執行緒都會被加入到佇列中並在佇列中進行自旋;移出佇列

(或停止自旋)的條件是前驅節點為頭節點且成功獲取了同步狀態。在釋放同步狀態時,同步器呼叫tryRelease(int arg)方法釋放同步狀態,然後喚醒頭節點的後繼節點。

獨佔鎖(Mutex)

  1. import java.util.Collection;
  2. import java.util.concurrent.locks.AbstractQueuedSynchronizer;
  3. public class Mutex {
  4. // 靜態內部類,自定義同步器
  5. private static class Sync extends AbstractQueuedSynchronizer {
  6. // 是否處於佔用狀態
  7. protected boolean isHeldExclusively() {
  8. return getState() == 1;
  9. }
  10. // 當狀態為0的時候獲取鎖
  11. public boolean tryAcquire(int acquires) {
  12. if (compareAndSetState(0, 1)) {
  13. setExclusiveOwnerThread(Thread.currentThread());
  14. return true;
  15. }
  16. return false;
  17. }
  18. // 釋放鎖,將狀態設定為0
  19. protected boolean tryRelease(int releases) {
  20. if (getState() == 0)
  21. throw new IllegalMonitorStateException();
  22. setExclusiveOwnerThread(null);
  23. setState(0);
  24. return true;
  25. }
  26. }
  27. // 僅需要將操作代理到Sync上即可
  28. private final Sync sync = new Sync();
  29. //獲取等待的執行緒
  30. public Collection<Thread> getQueuedThreads(){
  31. return sync.getQueuedThreads();
  32. }
  33. //獨佔鎖的操作介面
  34. public void lock() {//獲取鎖
  35. sync.acquire(1);
  36. }
  37. public void unlock() {//釋放鎖
  38. sync.release(1);
  39. }
  40. }

  1. import java.util.Collection;
  2. import java.util.Random;
  3. public class MutexTestSecond {
  4. private static Random r=new Random(47);
  5. private static int threadCount=10;
  6. private static Mutex mut=new Mutex();
  7. private static class Weight implements Runnable{//給蘋果稱重的任務
  8. String name;
  9. public Weight(String name){
  10. this.name=name;
  11. }
  12. @Override
  13. public void run() {
  14. mut.lock();
  15. System.out.println(name+"放蘋果!");
  16. System.out.println(name+"重量:"+(r.nextInt(10)+3));
  17. System.out.println(name+"取蘋果!");
  18. printQueuedThreads(mut.getQueuedThreads());
  19. mut.unlock();
  20. }
  21. }
  22. private static void printQueuedThreads(Collection<Thread> threads){
  23. System.out.print("等待佇列中的執行緒:");
  24. for(Thread t:threads){
  25. System.out.print(t.getName()+" ");
  26. }
  27. System.out.println();
  28. }
  29. public static void main(String[] args) {
  30. Thread[] threads=new Thread[threadCount];
  31. for(int i=0;i<threadCount;i++){
  32. threads[i]=new Thread(new Weight("Weight-"+i),"Thread-"+i);
  33. }
  34. for(int i=0;i<threadCount;i++){
  35. threads[i].start();
  36. }
  37. }
  38. }

輸出:

Weight-0放蘋果!
Weight-0重量:11
Weight-0取蘋果!
等待佇列中的執行緒:Thread-3 Thread-2 Thread-1
Weight-6放蘋果!
Weight-6重量:8
Weight-6取蘋果!
等待佇列中的執行緒:Thread-8 Thread-7 Thread-5 Thread-4 Thread-3 Thread-2 Thread-1
Weight-1放蘋果!
Weight-1重量:6
Weight-1取蘋果!
等待佇列中的執行緒:Thread-9 Thread-8 Thread-7 Thread-5 Thread-4 Thread-3 Thread-2
Weight-2放蘋果!
Weight-2重量:4
Weight-2取蘋果!
等待佇列中的執行緒:Thread-9 Thread-8 Thread-7 Thread-5 Thread-4 Thread-3
Weight-3放蘋果!
Weight-3重量:4
Weight-3取蘋果!
等待佇列中的執行緒:Thread-9 Thread-8 Thread-7 Thread-5 Thread-4
Weight-4放蘋果!
Weight-4重量:12
Weight-4取蘋果!
等待佇列中的執行緒:Thread-9 Thread-8 Thread-7 Thread-5
Weight-5放蘋果!
Weight-5重量:11
Weight-5取蘋果!
等待佇列中的執行緒:Thread-9 Thread-8 Thread-7
Weight-7放蘋果!
Weight-7重量:3
Weight-7取蘋果!
等待佇列中的執行緒:Thread-9 Thread-8
Weight-8放蘋果!
Weight-8重量:5
Weight-8取蘋果!
等待佇列中的執行緒:Thread-9
Weight-9放蘋果!
Weight-9重量:10
Weight-9取蘋果!
等待佇列中的執行緒:

從輸出中可以看出,我們的獨佔鎖Mutex,保證了秤的獨佔使用。

重入鎖

重入鎖ReentrantLock,顧名思義,就是支援重進入的鎖,它表示該鎖能夠支援一個執行緒對資源的重複加鎖。除此之外,該鎖的還支援獲取鎖時的公平和非公平性選擇。

對於獨佔鎖(Mutex),考慮如下場景:當一個執行緒呼叫Mutex的lock()方法獲取鎖之後,如果再次呼叫lock()方法,則該執行緒將會被自己所阻塞,原因是Mutex在實現tryAcquire(int acquires)方法時沒有考慮佔有鎖的執行緒再次獲取鎖的場景,而在呼叫tryAcquire(int acquires)方法時返回了false,導致該執行緒被阻塞。簡單地說,Mutex是一個不支援重進入的鎖。

synchronized關鍵字隱式的支援重進入,比如一個synchronized修飾的遞迴方法,在方法執行時,執行執行緒在獲取了鎖之後仍能連續多次地獲得該鎖,而不像Mutex由於獲取了鎖,而在下一次獲取鎖時出現阻塞自己的情況。
ReentrantLock雖然沒能像synchronized關鍵字一樣支援隱式的重進入,但是在呼叫lock()方法時,已經獲取到鎖的執行緒,能夠再次呼叫lock()方法獲取鎖而不被阻塞。

可重入的實現

重進入是指任意執行緒在獲取到鎖之後能夠再次獲取該鎖而不會被鎖所阻塞,該特性的實現需要解決以下兩個問題。
1)執行緒再次獲取鎖。鎖需要去識別獲取鎖的執行緒是否為當前佔據鎖的執行緒,如果是,則再次成功獲取。
2)鎖的最終釋放。執行緒重複n次獲取了鎖,隨後在第n次釋放該鎖後,其他執行緒能夠獲取到該鎖。鎖的最終釋放要求鎖對於獲取進行計數自增,計數表示當前鎖被重複獲取的次數,而鎖被釋放時,計數自減,當計數等於0時表示鎖已經成功釋放。

ReentrantLock是通過組合自定義同步器來實現鎖的獲取與釋放。我們以非公平鎖為例:

  1. public class ReentrantLock implements Lock, java.io.Serializable {
  2. private final Sync sync;
  3. ......
  4. abstract static class Sync extends AbstractQueuedSynchronizer {
  5. private static final long serialVersionUID = -5179523762034025860L;
  6. abstract void lock();//抽象方法
  7. final boolean nonfairTryAcquire(int acquires) {//非公平的獲取鎖
  8. final Thread current = Thread.currentThread();
  9. int c = getState();
  10. if (c == 0) {//首次獲取同步狀態
  11. if (compareAndSetState(0, acquires)) {//只要設定成功就獲取到鎖
  12. setExclusiveOwnerThread(current);
  13. return true;
  14. }
  15. }
  16. else if (current == getExclusiveOwnerThread()) {//再次獲取同步狀態(可重入的關鍵)
  17. //如果是獲取鎖的執行緒再次請求,則將同步狀態值進行增加並返回true,表示獲取同步狀態成功。
  18. int nextc = c + acquires;
  19. if (nextc < 0) // overflow
  20. throw new Error("Maximum lock count exceeded");
  21. setState(nextc);
  22. return true;
  23. }
  24. return false;
  25. }
  26. protected final boolean tryRelease(int releases) {
  27. int c = getState() - releases;
  28. if (Thread.currentThread() != getExclusiveOwnerThread())
  29. throw new IllegalMonitorStateException();
  30. boolean free = false;
  31. if (c == 0) {//當同步狀態為0時,將佔有執行緒設定為null
  32. free = true;
  33. setExclusiveOwnerThread(null);
  34. }
  35. setState(c);//更新同步狀態
  36. return free;
  37. }
  38. ......
  39. }
  40. static final class NonfairSync extends Sync {
  41. private static final long serialVersionUID = 7316153563782823691L;
  42. /**Performs lock. Try immediate barge, backing up to normal acquire on failure. */
  43. final void lock() {
  44. if (compareAndSetState(0, 1))//首次獲取鎖成功
  45. setExclusiveOwnerThread(Thread.currentThread());
  46. else
  47. acquire(1);//申請加鎖
  48. }
  49. protected final boolean tryAcquire(int acquires) {
  50. return nonfairTryAcquire(acquires);//非公平獲取鎖
  51. }
  52. }
  53. ......
  54. public void lock() {
  55. sync.lock();
  56. }
  57. public void unlock() {
  58. sync.release(1);
  59. }
  60. ......
  61. }

  1. import java.util.Random;
  2. import java.util.concurrent.locks.ReentrantLock;
  3. public class ReentrantLockTest {
  4. private static Random r=new Random(47);
  5. private static int threadCount=10;
  6. private static ReentrantLock mut=new ReentrantLock();
  7. private static class Weight implements Runnable{//給蘋果稱重的任務
  8. String name;
  9. public Weight(String name){
  10. this.name=name;
  11. }
  12. @Override
  13. public void run() {
  14. mut.lock();
  15. System.out.println(name+"放蘋果!");
  16. System.out.println(name+"重量:"+(r.nextInt(10)+3));
  17. System.out.println(name+"取蘋果!");
  18. if(r.nextInt()%2==0){run();}//遞迴呼叫
  19. mut.unlock();
  20. }
  21. }
  22. public static void main(String[] args) throws InterruptedException {
  23. Thread[] threads=new Thread[threadCount];
  24. for(int i=0;i<threadCount;i++){
  25. threads[i]=new Thread(new Weight("Weight-"+i),"Thread-"+i);
  26. }
  27. for(int i=0;i<threadCount;i++){
  28. threads[i].start();
  29. Thread.sleep(10);
  30. }
  31. }
  32. }

輸出:

Weight-0放蘋果!
Weight-0重量:11
Weight-0取蘋果!
Weight-0放蘋果!
Weight-0重量:6
Weight-0取蘋果!
Weight-0放蘋果!
Weight-0重量:4
Weight-0取蘋果!
Weight-1放蘋果!
Weight-1重量:11
Weight-1取蘋果!
Weight-2放蘋果!
Weight-2重量:5
Weight-2取蘋果!
Weight-2放蘋果!
Weight-2重量:11
Weight-2取蘋果!
Weight-2放蘋果!
Weight-2重量:4
Weight-2取蘋果!
Weight-3放蘋果!
Weight-3重量:12
Weight-3取蘋果!
Weight-3放蘋果!
Weight-3重量:11
Weight-3取蘋果!
Weight-3放蘋果!
Weight-3重量:3
Weight-3取蘋果!
Weight-3放蘋果!
Weight-3重量:9
Weight-3取蘋果!
Weight-5放蘋果!
Weight-5重量:4
Weight-5取蘋果!
Weight-7放蘋果!
Weight-7重量:7
Weight-7取蘋果!
Weight-8放蘋果!
Weight-8重量:9
Weight-8取蘋果!
Weight-6放蘋果!
Weight-6重量:3
Weight-6取蘋果!
Weight-4放蘋果!
Weight-4重量:7
Weight-4取蘋果!
Weight-4放蘋果!
Weight-4重量:3
Weight-4取蘋果!
Weight-9放蘋果!
Weight-9重量:5
Weight-9取蘋果!
Weight-9放蘋果!
Weight-9重量:6
Weight-9取蘋果!
Weight-9放蘋果!
Weight-9重量:7
Weight-9取蘋果!

從輸出中,可以看出可重入特性。如果,我們將可重入鎖換成獨佔鎖Mutex程式將會阻塞,不具有可重入性。

此外,我們還發現,執行緒的執行是亂序的(從執行緒名稱的角度看),即與start()方法呼叫順序不一致。這是為什麼呢?

原來重入鎖ReentrantLock預設採用非公平實現?那好,我們將可重入鎖設定為公平鎖:

    private static ReentrantLock mut=new ReentrantLock(true);//設定為公平鎖

輸出:

Weight-0放蘋果!
Weight-0重量:11
Weight-0取蘋果!
Weight-0放蘋果!
Weight-0重量:6
Weight-0取蘋果!
Weight-0放蘋果!
Weight-0重量:4
Weight-0取蘋果!
Weight-1放蘋果!
Weight-1重量:11
Weight-1取蘋果!
Weight-2放蘋果!
Weight-2重量:5
Weight-2取蘋果!
Weight-2放蘋果!
Weight-2重量:11
Weight-2取蘋果!
Weight-2放蘋果!
Weight-2重量:4
Weight-2取蘋果!
Weight-3放蘋果!
Weight-3重量:12
Weight-3取蘋果!
Weight-3放蘋果!
Weight-3重量:11
Weight-3取蘋果!
Weight-3放蘋果!
Weight-3重量:3
Weight-3取蘋果!
Weight-3放蘋果!
Weight-3重量:9
Weight-3取蘋果!
Weight-7放蘋果!
Weight-7重量:4
Weight-7取蘋果!
Weight-6放蘋果!
Weight-6重量:7
Weight-6取蘋果!
Weight-4放蘋果!
Weight-4重量:9
Weight-4取蘋果!
Weight-5放蘋果!
Weight-5重量:3
Weight-5取蘋果!
Weight-8放蘋果!
Weight-8重量:7
Weight-8取蘋果!
Weight-8放蘋果!
Weight-8重量:3
Weight-8取蘋果!
Weight-9放蘋果!
Weight-9重量:5
Weight-9取蘋果!
Weight-9放蘋果!
Weight-9重量:6
Weight-9取蘋果!
Weight-9放蘋果!
Weight-9重量:7
Weight-9取蘋果!

從輸出中我們看到,執行緒的執行順序與對應start()方法被呼叫的順序依然不一樣,說好的公平鎖呢?

原因分析:start()語句呼叫的順序與執行緒進入Runnable狀態的順序不一定一致,也就是說先呼叫start()語句所對應的執行緒不一定先進入Runnable狀態,即使先進入Runnable狀態也不一定先分得處理器開始執行。

公平性與否是針對獲取鎖而言的,如果一個鎖是公平的,那麼鎖的獲取順序就應該符合請求的絕對時間順序,也就是FIFO。

個人理解,如有偏頗,還望指正!!

非公平性的實現

  1. static final class NonfairSync extends Sync {
  2. private static final long serialVersionUID = 7316153563782823691L;
  3. /**Performs lock. Try immediate barge, backing up to normal acquire on failure.*/
  4. final void lock() {
  5. if (compareAndSetState(0, 1))//只要CAS更新同步狀態成功就獲取到鎖。
  6. setExclusiveOwnerThread(Thread.currentThread());
  7. else
  8. acquire(1);
  9. }
  10. protected final boolean tryAcquire(int acquires) {
  11. return nonfairTryAcquire(acquires);
  12. }
  13. }

非公平性例項,如果Thread-1擁有鎖,Thread-2和Thread-3在同步佇列中,當Thread-1釋放鎖後會喚醒Thread-2,但是如果此時Thread-1重新申請鎖,可能依然是Thread-1獲取到鎖。甚至這時候Thread-4也申請鎖,Thread-4也可能比Thread-2先獲取鎖。

非公平鎖可能使得執行緒“飢餓”。當一個執行緒請求鎖時,只要獲取了同步狀態即成功獲取鎖。在這個前提下,剛釋放鎖的執行緒再次獲取同步狀態的機率會非常大,使得其他執行緒只能在同步佇列中等待。

公平性的實現

  1. static final class FairSync extends Sync {
  2. private static final long serialVersionUID = -3000897897090466540L;
  3. final void lock() {
  4. acquire(1);
  5. }
  6. /** Fair version of tryAcquire. Don't grant access unless recursive call or no waiters or is first.*/
  7. protected final boolean tryAcquire(int acquires) {
  8. final Thread current = Thread.currentThread();
  9. int c = getState();
  10. //這裡沒有一進來就直接進行CAS操作
  11. if (c == 0) {
  12. if (!hasQueuedPredecessors() &&<span><span class="comment">//增加是否有前驅執行緒的判斷</span><span>,從而保證公平性</span></span>
  13. compareAndSetState(0, acquires)) {
  14. setExclusiveOwnerThread(current);
  15. return true;
  16. }
  17. }
  18. else if (current == getExclusiveOwnerThread()) {
  19. int nextc = c + acquires;
  20. if (nextc < 0)
  21. throw new Error("Maximum lock count exceeded");
  22. setState(nextc);
  23. return true;
  24. }
  25. return false;
  26. }
  27. }

公平鎖與非公平鎖的比較

公平性與否是針對獲取鎖而言的,如果一個鎖是公平的,那麼鎖的獲取順序就應該符合請求的絕對時間順序,也就是FIFO。

公平性鎖每次都是從同步佇列中的第一個節點獲取到鎖,而非公平性鎖出現了一個執行緒連續獲取鎖的情況。
非公平性鎖可能使執行緒“飢餓”,當一個執行緒請求鎖時,只要獲取了同步狀態即成功獲取鎖。在這個前提下,剛釋放鎖的執行緒再次獲取同步狀態的機率會非常大,使得其他執行緒只能在同步佇列中等待。
非公平鎖可能使執行緒“飢餓”,為什麼它又被設定成預設的實現呢?非公平性鎖模式下執行緒上下文切換的次數少,因此其效能開銷更小。公平性鎖保證了鎖的獲取按照FIFO原則,而代價是進行大量的執行緒切換。非公平性鎖雖然可能造成執行緒“飢餓”,但極少的執行緒切換,保證了其更大的吞吐量。

共享式同步元件設計

可重寫的方法

  1. /**Attempts to acquire in shared mode. This method should query if the state of the object
  2. permits it to be acquired in the shared mode, and if so to acquire it.*/
  3. //共享式獲取同步狀態,返回大於等於0的值,表示獲取成功,反之獲取失敗
  4. protected int tryAcquireShared(int arg)
  5. /**Attempts to set the state to reflect a release in shared mode.*/
  6. //共享式釋放同步狀態
  7. protected boolean tryReleaseShared(int arg)

同步器提供的模板方法

  1. /**Acquires in shared mode, ignoring interrupts.*/
  2. //共享式獲取同步狀態,如果當前執行緒未獲取到同步狀態,將會進入同步佇列等待
  3. //與獨佔式獲取的主要區別是在同一時刻可以有多個執行緒獲取到同步狀態
  4. public final void acquireShared(int arg)
  5. <span style="font-size:14px;"><span style="font-size:14px;"></span></span><pre name="code" class="java"> /**Acquires in exclusive mode, aborting if interrupted.*/
  6. //該方法可以響應中斷
  7. public final void acquireInterruptibly(int arg)


/**Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true.*/ //共享式釋放同步狀態 public final boolean releaseShared(int arg)


共享式獲取與獨佔式獲取最主要的區別在於同一時刻能否有多個執行緒同時獲取到同步狀態。以檔案的讀寫為例,如果一個程式在對檔案進行讀操作,那麼這一時刻對於該檔案的寫操作均被阻塞,而讀操作能夠同時進行。寫操作要求對資源的獨佔式訪問,而讀操作可以是共享式訪問。

獲取同步狀態

呼叫同步器的acquireShared(int arg)方法可以共享式地獲取同步狀態。

  1. public final void acquireShared(int arg) {
  2. if (tryAcquireShared(arg) < 0)
  3. doAcquireShared(arg);
  4. }

在acquireShared(int arg)方法中,同步器呼叫tryAcquireShared(int arg)方法嘗試獲取同步狀態,tryAcquireShared(int arg)方法返回值為int型別,當返回值大於等於0時,表示能夠獲取到同步狀態。因此,在共享式獲取的自旋過程中,成功獲取到同步狀態並退出自旋的條件就是tryAcquireShared(int arg)方法返回值大於等於0。

在doAcquireShared(int arg)方法的自旋過程中,如果當前節點的前驅為頭節點時,嘗試獲取同步狀態,如果返回值大於等於0,表示該次獲取同步狀態成功並從自旋過程中退出。

釋放同步狀態

與獨佔式一樣,共享式獲取也需要釋放同步狀態,通過呼叫releaseShared(int arg)方法可以釋放同步狀態。

  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. doReleaseShared();
  4. return true;
  5. }
  6. return false;
  7. }

該方法在釋放同步狀態之後,將會喚醒後續處於等待狀態的節點。對於能夠支援多個執行緒同時訪問的併發元件(比Semaphore),它和獨佔式主要區別在於tryReleaseShared(int arg)方法必須確保同步狀態(或者資源數)執行緒安全釋放,一般是通過迴圈和CAS來保證的,因為釋放同步狀態的操作會同時來自多個執行緒。

CountDownLatch

  1. public class CountDownLatch {
  2. /**Synchronization control For CountDownLatch. Uses AQS state to represent count.*/
  3. private static final class Sync extends AbstractQueuedSynchronizer {
  4. private static final long serialVersionUID = 4982264981922014374L;
  5. Sync(int count) {
  6. setState(count);//初始化同步狀態
  7. }
  8. int getCount() {
  9. return getState();
  10. }
  11. protected int tryAcquireShared(int acquires) { //同步狀態為0才返回成功
  12. return (getState() == 0) ? 1 : -1;
  13. }
  14. protected boolean tryReleaseShared(int releases) {//減少同步狀態
  15. // Decrement count; signal when transition to zero
  16. for (;;) { //這裡通過迴圈CAS來釋放同步狀態,從而保證執行緒安全性
  17. int c = getState();
  18. if (c == 0)
  19. return false;
  20. int nextc = c-1;
  21. if (compareAndSetState(c, nextc))
  22. return nextc == 0;
  23. }
  24. }
  25. }
  26. private final Sync sync;//組合一個同步器(AQS)
  27. public CountDownLatch(int count) {
  28. if (count < 0) throw new IllegalArgumentException("count < 0");
  29. this.sync = new Sync(count);//初始化同步狀態
  30. }
  31. /*Causes the current thread to wait until the latch has counted down to
  32. * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.*/
  33. public void await() throws InterruptedException {
  34. sync.acquireSharedInterruptibly(1);//當同步狀態為0時,acquireShared(1)才返回
  35. }
  36. public boolean await(long timeout, TimeUnit unit)
  37. throws InterruptedException {
  38. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  39. }
  40. public void countDown() {
  41. sync.releaseShared(1);//釋放同步狀態
  42. }
  43. public long getCount() {
  44. return sync.getCount();
  45. }
  46. public String toString() {
  47. return super.toString() + "[Count = " + sync.getCount() + "]";
  48. }
  49. }

獨佔式超時獲取同步狀態

Lock介面

鎖是用來控制多個執行緒訪問共享資源的方式,一般來說,一個鎖能夠防止多個執行緒同時訪問共享資源(但是有些鎖可以允許多個執行緒併發的訪問共享資源,比如讀寫鎖)。

在Lock接口出現之前,Java程式是靠synchronized關鍵字實現鎖功能的,而Java SE 5之後,併發包中新增了Lock介面(以及相關實現類)用來實現鎖功能,它提供了與synchronized關鍵字類似的同步功能,只是在使用時需要顯式地獲取和釋放鎖。雖然它缺少了(通過synchronized塊或者方法所提供的)隱式獲取釋放鎖的便捷性,但是卻擁有了鎖獲取與釋放的可操作性可中斷的獲取鎖以及超時獲取鎖等多種synchronized關鍵字所不具備的同步特性。
使用synchronized關鍵字將會隱式地獲取鎖,但是它將鎖的獲取和釋放固化了,也就是先獲取再釋放。當然,這種方式簡化了同步的管理,可是擴充套件性沒有顯示的鎖獲取和釋放來的好。例如,針對一個場景,手把手進行鎖獲取和釋放,先獲得鎖A,然後再獲取鎖B,當鎖B獲得後,釋放鎖A同時獲取鎖C,當鎖C獲得後,再釋放B同時獲取鎖D,以此類推。這種場景下,synchronized關鍵字就不那麼容易實現了,而使用Lock卻容易許多。

Lock的使用方式

  1. Lock lock = new ReentrantLock();
  2. lock.lock();
  3. try {
  4. 。。。。。。
  5. } finally {
  6. lock.unlock();
  7. }

在finally塊中釋放鎖,目的是保證在獲取到鎖之後,最終能夠被釋放。
不要將獲取鎖的過程寫在try塊中,因為如果在獲取鎖(自定義鎖的實現)時發生了異常,異常丟擲的同時,也會導致鎖無故釋放。

Lock介面提供的新特性

Lock介面

  1. public interface Lock {
  2. //獲取鎖,呼叫該方法將會獲取鎖,當鎖獲取後,從該方法返回
  3. void lock();
  4. //可中斷地獲取鎖,和lock()方法的不同之處在於該方法會響應中斷,即在鎖的獲取過程中可以中斷當前執行緒
  5. void lockInterruptibly() throws InterruptedException;
  6. //嘗試非阻塞的獲取鎖,呼叫該方法後會立刻返回,如果能夠獲取則返回true,否則返回false
  7. boolean tryLock();
  8. //超時地獲取鎖 1、當前執行緒在超時時間內成功獲取鎖。2、當前執行緒在超時時間內被中斷。3、超時時間結束返回false。
  9. boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  10. //釋放鎖
  11. void unlock();
  12. //獲取等待通知元件
  13. Condition newCondition();
  14. }

Lock介面的實現

Lock介面的實現基本都是通過組合了一個佇列同步器(AbstractQueuedSynchronizer)的子類來完成執行緒訪問控制的。

例如,ReentrantLock(重入鎖)。

說在最後:關於可響應中斷超時等待特性,文中基本略過,詳情可參看《Java併發程式設計的藝術》和JDK原始碼。

參考:

JDK 1.7原始碼

《Java併發程式設計的藝術》

《Java併發程式設計實踐》