011Java併發包012AQS
注意:本文基於JDK1.8進行記錄。
1 簡介
1.1 是什麼
AQS是英文單詞AbstractQueuedSynchronizer的縮寫,翻譯過來就是抽象的佇列式的同步器,AQS定義了一套多執行緒訪問共享資源的同步器框架,許多同步類實現都依賴於它,如常用的ReentrantLock、Semaphore、CountDownLatch等等。
AQS是用來構建鎖或者其它同步器元件的重量級基礎框架及整個JUC體系的基石,通過內建的FIFO佇列來完成資源獲取執行緒的排隊工作,並通過一個state整型變量表示持有鎖的狀態。
1.2 抽象
AQS的主要使用方式是繼承,子類通過繼承同步器並實現它的抽象方法來管理同步狀態。
1.3 原理
搶到資源的執行緒直接使用處理業務邏輯,搶不到資源的必然涉及一種排隊等候機制。既然說到了排隊等候機制,那麼就一定會有某種佇列形成,這樣的佇列是什麼資料結構呢。
如果共享資源被佔用,就需要一定的阻塞等待喚醒機制來保證鎖分配。這個機制主要用的是CLH佇列的變體實現的,將暫時獲取不到鎖的執行緒加入到佇列中,這個佇列就是AQS的抽象表現。它將請求共享資源的執行緒封裝成佇列的Node結點,通過CAS、自旋以及LockSupport的憑證機制,維護state變數的狀態,使併發達到同步的控制效果。
CLH:Craig、Landin、Hagersten(三個科學家名字)佇列,原版是一個單向連結串列,AQS中的佇列是CLH變體的虛擬雙向佇列FIFO,其頭節點在初始化後變為空節點。
1.4 資源使用方式
AQS定義兩種資源使用方式:Exclusive(獨佔,只有一個執行緒能執行,如ReentrantLock)和Share(共享,多個執行緒可同時執行,如Semaphore/CountDownLatch)。
2 體系架構
1 public abstract class AbstractQueuedSynchronizer 2 extends AbstractOwnableSynchronizer 3 implements java.io.Serializable { 4 ... 5 // 內部封裝Node節點 6 static final classNode { 7 // 標記執行緒以共享的模式等待鎖 8 static final Node SHARED = new Node(); 9 // 標記執行緒以獨佔的模式等待鎖 10 static final Node EXCLUSIVE = null; 11 // waitStatus取值為1表示執行緒取消(超時、中斷),被取消的節點不會阻塞 12 static final int CANCELLED = 1; 13 // waitStatus取值為-1表示後繼節點已經準備完成,等待執行緒釋放資源 14 static final int SIGNAL = -1; 15 // waitStatus取值為-2表示執行緒在Condition佇列中阻塞,當其他執行緒呼叫了Condition中的喚醒方法後,將節點從Condition佇列轉移到CLH等待佇列(Condition中有使用) 16 static final int CONDITION = -2; 17 // waitStatus取值為-3表示執行緒及後續執行緒無條件傳播(共享模式可用,CountDownLatch中有使用) 18 static final int PROPAGATE = -3; 19 // 執行緒的等待狀態,初始值為0 20 volatile int waitStatus; 21 // 前驅節點 22 volatile Node prev 23 // 後繼節點 24 volatile Node next; 25 // 執行緒物件 26 volatile Thread thread; 27 ... 28 } 29 // 頭節點 30 private transient volatile Node head 31 // 尾節點 32 private transient volatile Node tail; 33 // 資源狀態,0表示可獲取,大於等於1表示已佔用 34 private volatile int state; 35 // 獲取資源狀態 36 protected final int getState() { 37 return state; 38 } 39 // 設定資源狀態 40 protected final void setState(int newState) { 41 state = newState; 42 } 43 // CAS設定資源狀態 44 protected final boolean compareAndSetState(int expect, int update) { 45 // See below for intrinsics setup to support this 46 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); 47 } 48 ... 49 }
AQS使用了一個volatile修飾的整型變數state用來表示同步狀態,通過內建的CLH同步佇列來完成執行緒的排隊工作。
其中,對state值的修改是通過CAS完成的,0表示資源可用,大於等於1表示資源不可用。AQS提供了三種操作state的方法:getState()、setState()、compareAndSetState()。
當前執行緒根據state的值判斷能否獲取資源,如果獲取失敗,AQS會將當前執行緒thread以及等待狀態waitStatus等資訊封裝成Node節點,並將其加CLH入同步佇列,同時阻塞當前執行緒。當state的值變為可獲取資源後,會把Node節點中的執行緒喚醒,再次嘗試獲取資源。
3 Lock與AQS
Lock介面的實現類,基本都是通過聚合了一個佇列同步器的子類完成執行緒訪問控制的。
1 public class ReentrantLock implements Lock, java.io.Serializable { 2 ... 3 abstract static class Sync extends AbstractQueuedSynchronizer { 4 ... 5 final boolean nonfairTryAcquire(int acquires) { 6 final Thread current = Thread.currentThread(); 7 int c = getState(); 8 if (c == 0) { 9 if (compareAndSetState(0, acquires)) { 10 setExclusiveOwnerThread(current); 11 return true; 12 } 13 } 14 else if (current == getExclusiveOwnerThread()) { 15 int nextc = c + acquires; 16 if (nextc < 0) // overflow 17 throw new Error("Maximum lock count exceeded"); 18 setState(nextc); 19 return true; 20 } 21 return false; 22 } 23 ... 24 } 25 static final class NonfairSync extends Sync { 26 ... 27 final void lock() { 28 if (compareAndSetState(0, 1)) 29 setExclusiveOwnerThread(Thread.currentThread()); 30 else 31 acquire(1); 32 } 33 protected final boolean tryAcquire(int acquires) { 34 return nonfairTryAcquire(acquires); 35 } 36 } 37 static final class FairSync extends Sync { 38 ... 39 final void lock() { 40 acquire(1); 41 } 42 protected final boolean tryAcquire(int acquires) { 43 final Thread current = Thread.currentThread(); 44 int c = getState(); 45 if (c == 0) { 46 if (!hasQueuedPredecessors() && 47 compareAndSetState(0, acquires)) { 48 setExclusiveOwnerThread(current); 49 return true; 50 } 51 } 52 else if (current == getExclusiveOwnerThread()) { 53 int nextc = c + acquires; 54 if (nextc < 0) 55 throw new Error("Maximum lock count exceeded"); 56 setState(nextc); 57 return true; 58 } 59 return false; 60 } 61 } 62 public ReentrantLock() { 63 sync = new NonfairSync(); 64 } 65 public ReentrantLock(boolean fair) { 66 sync = fair ? new FairSync() : new NonfairSync(); 67 } 68 ... 69 }
ReentrantLock類的內部聚合了一個Sync類,Sync類繼承了AQS類,並且非公平鎖NonfairSync和公平鎖FairSync都繼承自Sync,預設建立的是非公平鎖NonfairSync。
4 分析ReentrantLock
4.1 概述
整個ReentrantLock的加鎖過程,可以分為三個階段:
1)嘗試加鎖。
2)加鎖失敗,執行緒入佇列。
3)執行緒入佇列後,進入阻賽狀態。
4.2 場景舉例
舉例三個客戶在銀行辦理業務,使用預設的非公平鎖:
1 public static void main(String[] args) { 2 Lock lock = new ReentrantLock(); 3 new Thread(()->{ 4 lock.lock(); 5 try { 6 System.out.println(Thread.currentThread().getName() + "-----辦理業務"); 7 try { 8 TimeUnit.SECONDS.sleep(60); 9 } catch (InterruptedException e) { 10 e.printStackTrace(); 11 } 12 System.out.println(Thread.currentThread().getName() + "-----離開"); 13 } finally { 14 lock.unlock(); 15 } 16 }, "A").start(); 17 new Thread(()->{ 18 lock.lock(); 19 try { 20 System.out.println(Thread.currentThread().getName() + "-----辦理業務"); 21 try { 22 TimeUnit.SECONDS.sleep(60); 23 } catch (InterruptedException e) { 24 e.printStackTrace(); 25 } 26 System.out.println(Thread.currentThread().getName() + "-----離開"); 27 } finally { 28 lock.unlock(); 29 } 30 }, "B").start(); 31 new Thread(()->{ 32 lock.lock(); 33 try { 34 System.out.println(Thread.currentThread().getName() + "-----辦理業務"); 35 try { 36 TimeUnit.SECONDS.sleep(60); 37 } catch (InterruptedException e) { 38 e.printStackTrace(); 39 } 40 System.out.println(Thread.currentThread().getName() + "-----離開"); 41 } finally { 42 lock.unlock(); 43 } 44 }, "C").start(); 45 }
5 程式分析
5.1 執行緒A開始並執行
5.1.1 獲取資源
執行緒A進入,呼叫lock()方法,檢視實現:
1 final void lock() { 2 // 使用CAS設定state為1 3 if (compareAndSetState(0, 1)) 4 // 表示獲取資源成功,將當前執行緒設為佔用執行緒 5 setExclusiveOwnerThread(Thread.currentThread()); 6 else 7 // 表示獲取資源失敗,繼續搶佔資源 8 acquire(1); 9 }
因為執行緒A是第一個獲取資源的執行緒,所以使用compareAndSetState()方法設定成功,繼續呼叫setExclusiveOwnerThread()方法將當前執行緒設為佔用執行緒,然後繼續執行業務。
5.2 執行緒B開始並阻塞
5.2.1 獲取資源
執行緒B進入,呼叫lock()方法。
因為執行緒B是第二個獲取資源的執行緒,執行緒A已經將state從0改為了1,所以使用compareAndSetState()方法設定失敗,繼續呼叫acquire()方法獲取資源,檢視實現:
1 public final void acquire(int arg) { 2 // 搶佔資源 3 if (!tryAcquire(arg) && 4 // 加入等待佇列 5 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 6 // 執行緒阻塞 7 selfInterrupt(); 8 }
如果搶佔資源成功,呼叫tryAcquire()方法返回true,判斷條件結束,繼續執行業務。
如果搶佔資源失敗,繼續判斷acquireQueued()方法返回。執行addWaiter()方法並傳入引數表示使用獨佔模式將執行緒加入到等待佇列。
5.2.2 搶佔資源
執行緒B進入,繼續呼叫acquire()方法獲取資源,執行tryAcquire()方法,檢視實現:
1 protected final boolean tryAcquire(int acquires) { 2 // 繼續呼叫非公平鎖的嘗試搶佔方法 3 return nonfairTryAcquire(acquires); 4 }
繼續呼叫非公平鎖的nonfairTryAcquire()方法,返回false表示佔用失敗:
1 final boolean nonfairTryAcquire(int acquires) { 2 // 記錄當前執行緒 3 final Thread current = Thread.currentThread(); 4 // 記錄當前資源狀態 5 int c = getState(); 6 // 0表示當前資源可用 7 if (c == 0) { 8 // 使用CAS設定state為請求數 9 if (compareAndSetState(0, acquires)) { 10 // 表示獲取資源成功,將當前執行緒設為佔用執行緒 11 setExclusiveOwnerThread(current); 12 return true; 13 } 14 } 15 // 大於等於1表示當前資源被佔用,判斷當前執行緒是否為佔用執行緒(可重入鎖的情況) 16 else if (current == getExclusiveOwnerThread()) { 17 // 當前執行緒為佔用執行緒,記錄資源狀態 18 int nextc = c + acquires 19 // 判斷是否溢位 20 if (nextc < 0) // overflow 21 throw new Error("Maximum lock count exceeded"); 22 // 設定state為新的資源狀態 23 setState(nextc); 24 return true; 25 } 26 return false; 27 }
5.2.3 進入等待
執行緒B進入,繼續呼叫addWaiter()方法將當前執行緒加入等待佇列,檢視實現:
1 private Node addWaiter(Node mode) { 2 // 將當前執行緒和傳入的獨佔模式封裝為節點 3 Node node = new Node(Thread.currentThread(), mode); 4 // Try the fast path of enq; backup to full enq on failure 5 Node pred = tail; 6 // 尾節點不為空,表示CLH佇列已經初始化,CAS操作將當前節點設為尾節點 7 if (pred != null) { 8 node.prev = pred; 9 if (compareAndSetTail(pred, node)) { 10 pred.next = node; 11 return node; 12 } 13 } 14 // 尾節點為空,表示CLH佇列還未初始化,初始化佇列 15 enq(node); 16 return node; 17 }
因為執行緒B是第一個進入等待的執行緒,尾節點為空,繼續檢視enq()方法:
1 private Node enq(final Node node) { 2 for (;;) { 3 Node t = tail; 4 // 尾節點為空,通過CAS設定頭節點和尾節點為空節點 5 if (t == null) { // Must initialize 6 if (compareAndSetHead(new Node())) 7 tail = head; 8 } else { 9 // 尾節點不為空,通過CAS將當前節點作為新的尾節點 10 node.prev = t; 11 if (compareAndSetTail(t, node)) { 12 t.next = node; 13 return t; 14 } 15 } 16 } 17 }
初始化CLH佇列後,頭節點為空節點,尾節點為當前節點。
5.2.4 阻塞執行緒
執行緒B得到當前節點後,作為引數傳入acquireQueued()方法繼續執行:
1 final boolean acquireQueued(final Node node, int arg) { 2 // 記錄當前節點是否取消,預設為true,表示取消 3 boolean failed = true; 4 try { 5 // 標記當前節點是否中斷,預設為false,表示當前節點沒有中斷 6 boolean interrupted = false 7 // 自旋 8 for (;;) { 9 // 獲取當前節點的上一節點 10 final Node p = node.predecessor(); 11 // 如果當前節點是頭節點,表示當前節點即將被喚醒,嘗試搶佔資源 12 if (p == head && tryAcquire(arg)) { 13 // 將當前節點設為頭節點,置空當前節點的上一節點,並取消當前節點同當前執行緒的繫結 14 setHead(node); 15 // 將原頭節點的下一節點置空,方便GC回收 16 p.next = null; // help GC 17 // 標記當前節點為false,表示沒有取消 18 failed = false; 19 // 返回false,表示當前節點沒有中斷 20 return interrupted; 21 } 22 // 不管當前節點是不是頭節點,執行到這裡就表示獲取資源失敗,處理前置節點並阻塞當前節點 23 if (shouldParkAfterFailedAcquire(p, node) && 24 parkAndCheckInterrupt()) 25 // 標記為true,表示當前節點中斷 26 interrupted = true; 27 } 28 } finally { 29 // 當前節點如果被取消,執行取消操作 30 if (failed) 31 cancelAcquire(node); 32 } 33 }
因為執行緒B是第一個進入等待的執行緒,上一節點為頭節點,嘗試獲取資源。獲取成功則將當前節點作為頭節點並移除當前執行緒,獲取失敗則進入判斷。
在判斷條件中呼叫shouldParkAfterFailedAcquire()方法,處理前置節點:
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 2 // 記錄上一節點的等待狀態 3 int ws = pred.waitStatus; 4 if (ws == Node.SIGNAL) 5 // 如果上一節點的等待狀態為-1,表示當前執行緒可以被阻塞,返回true,執行parkAndCheckInterrupt()方法 6 return true; 7 if (ws > 0) { 8 // 如果上一節點的等待狀態為1,表示上一節點被取消,迴圈移除被取消的上一節點 9 do { 10 node.prev = pred = pred.prev; 11 } while (pred.waitStatus > 0); 12 pred.next = node; 13 } else { 14 // 上述條件不滿足,表示上一節點的等待狀態為0或者-3,通過CAS將等待狀態設定為-1 15 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 16 } 17 // 返回false,跳過parkAndCheckInterrupt()方法,重新進入自旋 18 return false; 19 }
因為執行緒B是第一個進入等待的執行緒,上一節點為頭節點,頭節點為空節點,等待狀態為0,所以兩次進入此方法。
第一次進入shouldParkAfterFailedAcquire()方法將上一節點的等待狀態設定為-1後返回false,條件判斷為false重新進入自旋。
第二次進入shouldParkAfterFailedAcquire()方法檢測到上一節點的等待狀態為-1,返回true,繼續判斷parkAndCheckInterrupt()方法。
在判斷條件中呼叫parkAndCheckInterrupt()方法,阻塞當前節點:
1 private final boolean parkAndCheckInterrupt() { 2 // 使用LockSupport的park()方法阻塞當前節點 3 LockSupport.park(this); 4 // 返回執行緒的中斷狀態 5 return Thread.interrupted(); 6 }
執行緒B在此被阻塞。
5.3 執行緒C開始並阻塞
5.3.1 獲取資源
執行緒C進入,呼叫lock()方法。
因為執行緒C是第三個獲取資源的執行緒,執行緒A已經將state從0改為了1,所以使用compareAndSetState()方法設定失敗,繼續呼叫acquire()方法獲取資源。
如果搶佔資源成功,呼叫tryAcquire()方法返回true,判斷條件結束,繼續執行業務。
如果搶佔資源失敗,繼續判斷acquireQueued()方法返回。執行addWaiter()方法並傳入引數表示使用獨佔模式將執行緒加入到等待佇列。
5.3.2 搶佔資源
執行緒C進入,繼續呼叫acquire()方法獲取資源,執行tryAcquire()方法。
繼續呼叫非公平鎖的nonfairTryAcquire()方法,返回false表示佔用失敗。
5.3.3 進入等待
執行緒C進入,繼續呼叫addWaiter()方法將當前執行緒加入等待佇列。
因為執行緒C是第二個進入等待的執行緒,執行緒B已經完成了佇列初始化,尾節點不為空,將當前節點作為新的尾節點。
5.3.4 阻塞執行緒
執行緒C得到當前節點後,作為引數傳入acquireQueued()方法繼續執行。
因為執行緒C是第二個進入等待的執行緒,上一節點不為頭節點,直接進入判斷。
在判斷條件中呼叫shouldParkAfterFailedAcquire()方法,處理前置節點,將B節點的等待狀態設為-1,返回true,繼續判斷parkAndCheckInterrupt()方法。
在判斷條件中呼叫parkAndCheckInterrupt()方法,阻塞當前節點。
執行緒C在此被阻塞。
5.4 執行緒A結束
5.4.1 解鎖資源
執行緒A執行完畢,呼叫unlock()方法釋放資源並喚醒執行緒,檢視實現:
1 public void unlock() { 2 sync.release(1); 3 }
繼續檢視release()方法:
1 public final boolean release(int arg) { 2 // 呼叫tryRelease()方法嘗試釋放資源 3 if (tryRelease(arg)) { 4 // 獲取頭節點 5 Node h = head; 6 // 如果頭節點不為空,並且等待狀態不為0,表示需要喚醒其他執行緒 7 if (h != null && h.waitStatus != 0) 8 // 呼叫unparkSuccessor()方法並傳入頭節點,喚醒執行緒 9 unparkSuccessor(h); 10 return true; 11 } 12 // 釋放失敗返回false 13 return false; 14 }
5.4.2 釋放資源
繼續檢視tryRelease()方法:
1 protected final boolean tryRelease(int releases) { 2 // 記錄資源狀態 3 int c = getState() - releases; 4 // 如果當前執行緒不為佔用執行緒則丟擲異常 5 if (Thread.currentThread() != getExclusiveOwnerThread()) 6 throw new IllegalMonitorStateException(); 7 // 標記資源空閒,預設為false 8 boolean free = false 9 // 資源狀態為0則標記資源空閒為true,並將佔用執行緒置空 10 if (c == 0) { 11 free = true; 12 setExclusiveOwnerThread(null); 13 } 14 // 設定資源狀態 15 setState(c); 16 // 返回資源空閒 17 return free; 18 }
執行緒A釋放資源並返回true,繼續執行。
5.4.3 喚醒執行緒
因為執行緒B和執行緒C已經進入等待佇列,所以頭節點不為空,繼續檢視unparkSuccessor()方法:
1 private void unparkSuccessor(Node node) { 2 // 記錄頭節點的等待狀態 3 int ws = node.waitStatus; 4 // 如果頭節點的等待狀態小於0,則將頭節點的等待狀態設為0 5 if (ws < 0) 6 compareAndSetWaitStatus(node, ws, 0); 7 // 記錄頭節點的下一節點 8 Node s = node.next; 9 // 判斷下一節點是否為空或者下一節點的等待狀態是否大於0 10 if (s == null || s.waitStatus > 0) { 11 s = null; 12 // 遍歷下一節點,找到不為空並且等待狀態小於等於0的節點,將其設為下一節點 13 for (Node t = tail; t != null && t != node; t = t.prev) 14 if (t.waitStatus <= 0) 15 s = t; 16 } 17 // 如果下一節點不為空,則使用LockSupport的unpark()方法喚醒下一節點中的執行緒 18 if (s != null) 19 LockSupport.unpark(s.thread); 20 }
頭節點的下一節點為執行緒B所在的節點,執行緒B被喚醒。
5.5 執行緒B執行並結束
5.5.1 搶佔資源
執行緒B在parkAndCheckInterrupt()方法中被釋放後,返回中斷狀態為false,重新進入自旋:
1 final boolean acquireQueued(final Node node, int arg) { 2 // 記錄當前節點是否取消,預設為true,表示取消 3 boolean failed = true; 4 try { 5 // 標記當前節點是否中斷,預設為false,表示當前節點沒有中斷 6 boolean interrupted = false 7 // 自旋 8 for (;;) { 9 // 獲取當前節點的上一節點 10 final Node p = node.predecessor(); 11 // 如果當前節點是頭節點,表示當前節點即將被喚醒,嘗試搶佔資源 12 if (p == head && tryAcquire(arg)) { 13 // 將當前節點設為頭節點,置空當前節點的上一節點,並取消當前節點同當前執行緒的繫結 14 setHead(node); 15 // 將原頭節點的下一節點置空,方便GC回收 16 p.next = null; // help GC 17 // 標記當前節點為false,表示沒有取消 18 failed = false; 19 // 返回false,表示當前節點沒有中斷 20 return interrupted; 21 } 22 // 不管當前節點是不是頭節點,執行到這裡就表示獲取資源失敗,處理前置節點並阻塞當前節點 23 if (shouldParkAfterFailedAcquire(p, node) && 24 parkAndCheckInterrupt()) 25 // 標記為true,表示當前節點中斷 26 interrupted = true; 27 } 28 } finally { 29 // 當前節點如果被取消,執行取消操作 30 if (failed) 31 cancelAcquire(node); 32 } 33 }
因為執行緒B的上一節點為頭節點,進入tryAcquire()方法搶佔資源,搶佔成功返回true並將當前節點設為頭節點,同時解除同線程B的繫結。
5.5.2 解鎖資源
執行緒B執行完畢,呼叫unlock()方法釋放資源並喚醒執行緒。
頭節點的下一節點為執行緒C所在的節點,執行緒C被喚醒。
5.6 執行緒C執行並結束
5.6.1 搶佔資源
執行緒C在parkAndCheckInterrupt()方法中被釋放後,返回中斷狀態為false,重新進入自旋。
因為執行緒C的上一節點為頭節點,進入tryAcquire()方法搶佔資源,搶佔成功返回true並將當前節點設為頭節點,同時解除同線程C的繫結。
5.6.2 解鎖資源
執行緒C執行完畢,呼叫unlock()方法釋放資源並喚醒執行緒。
頭節點的下一節點為空,不會有任何執行緒被喚醒。
6 公平鎖與非公平鎖
6.1 非公平鎖
非公平鎖的執行緒在獲取資源時,會嘗試獲取資源,如果成功則立刻佔用資源,如果失敗則嘗試佔用資源。
在資源可用時不會判斷當前佇列是否有執行緒在等待,也就是說剛加入的執行緒可以同喚醒的執行緒競爭資源。
1 final void lock() { 2 if (compareAndSetState(0, 1)) 3 setExclusiveOwnerThread(Thread.currentThread()); 4 else 5 acquire(1); 6 } 7 ... 8 public final void acquire(int arg) { 9 if (!tryAcquire(arg) && 10 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 11 selfInterrupt(); 12 } 13 ... 14 protected final boolean tryAcquire(int acquires) { 15 return nonfairTryAcquire(acquires); 16 } 17 ... 18 final boolean nonfairTryAcquire(int acquires) { 19 final Thread current = Thread.currentThread(); 20 int c = getState(); 21 if (c == 0) { 22 if (compareAndSetState(0, acquires)) { 23 setExclusiveOwnerThread(current); 24 return true; 25 } 26 } 27 else if (current == getExclusiveOwnerThread()) { 28 int nextc = c + acquires; 29 if (nextc < 0) // overflow 30 throw new Error("Maximum lock count exceeded"); 31 setState(nextc); 32 return true; 33 } 34 return false; 35 }
6.2 公平鎖
公平鎖的執行緒在獲取資源時,不會嘗試獲取資源,而是嘗試佔用資源。
在資源可用時會判斷當前佇列是否有執行緒在等待,剛加入的執行緒不可用同喚醒的執行緒競爭資源。
1 final void lock() { 2 acquire(1); 3 } 4 ... 5 public final void acquire(int arg) { 6 if (!tryAcquire(arg) && 7 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 8 selfInterrupt(); 9 } 10 ... 11 protected final boolean tryAcquire(int acquires) { 12 final Thread current = Thread.currentThread(); 13 int c = getState(); 14 if (c == 0) { 15 if (!hasQueuedPredecessors() && 16 compareAndSetState(0, acquires)) { 17 setExclusiveOwnerThread(current); 18 return true; 19 } 20 } 21 else if (current == getExclusiveOwnerThread()) { 22 int nextc = c + acquires; 23 if (nextc < 0) 24 throw new Error("Maximum lock count exceeded"); 25 setState(nextc); 26 return true; 27 } 28 return false; 29 }
6.3 分析hasQueuedPredecessors()方法
比較NonfairSync和FairSync中tryAcquire()方法的實現,發現FairSync中的tryAcquire()方法中多了一項判斷:
1 !hasQueuedPredecessors()
hasQueuedPredecessors()方法用於公平鎖加鎖時判斷等待佇列中是否存在有效節點的方法。該方法用於判斷當前節點前是否有其他節點排隊:
返回false表示沒有,取反後為true表示當前節點不需要排隊,需要繼續執行佔用資源的操作。
返回true表示有,取反後為false表示當前節點需要排隊,需要執行加入等待佇列的操作。
檢視AQS中定義的hasQueuedPredecessors()方法:
1 public final boolean hasQueuedPredecessors() { 2 // The correctness of this depends on head being initialized 3 // before tail and on head.next being accurate if the current 4 // thread is first in queue. 5 Node t = tail; // Read fields in reverse initialization order 6 Node h = head; 7 Node s; 8 return h != t && 9 ((s = h.next) == null || s.thread != Thread.currentThread()); 10 }
判斷h是否不等於t,如果不成立,說明h等於t,說明頭節點和尾節點相同,說明當前佇列未初始化(頭節點和尾節點都是空節點)或者當前佇列只有一個節點(頭結點和尾節點都是空節點),說明不需要排隊,返回false,取反後為true,嘗試佔用資源。
判斷h是否不等於t,如果成立,說明h不等於t,說明存在兩個不同節點。繼續判斷頭節點的下一節點是否為空節點,如果成立,說明下一節點為空,可能上個執行緒在執行初始化enq()方法,剛剛通過CAS操作compareAndSetHead()將頭節點初始化,尚未給尾節點賦值,此時頭節點不為空,尾節點為空,並且頭節點的下一節點為空,返回true,取反後為false,需要排隊。
判斷h是否不等於t,如果成立,說明h不等於t,說明存在兩個不同節點。繼續判斷頭節點的下一節點是否為空節點,如果不成立,說明下一節點不為空。繼續判斷下一節點封裝的執行緒是否不等於當前執行緒,如果成立,說明下一執行緒不為當前執行緒,返回true,取反後為false,需要排隊。
判斷h是否不等於t,如果成立,說明h不等於t,說明存在兩個不同節點。繼續判斷頭節點的下一節點是否為空節點,如果不成立,說明下一節點不為空。繼續判斷下一節點封裝的執行緒是否不等於當前執行緒,如果不成立,說明下一執行緒為當前執行緒,返回false,取反後為true,嘗試佔用資源。
7 自定義同步器
7.1 實現方法
不同的自定義同步器爭用共享資源的方式也不同,自定義同步器在實現時只需要實現共享資源state的獲取與釋放即可,至於具體執行緒等待佇列的維護(如獲取資源失敗入隊和喚醒出隊等),AQS已經在底層實現好了。
自定義同步器實現時主要實現以下幾種方法:
isHeldExclusively():該執行緒是否正在獨佔資源。只有用到condition才需要去實現它。
tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。
tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回true,否則返回false。
一般來說,自定義同步器要麼是獨佔方法,要麼是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支援自定義同步器同時實現獨佔和共享兩種方式,如ReentrantReadWriteLock。
7.2 舉例說明
7.2.1 ReentrantLock
以ReentrantLock為例,state初始化為0,表示未鎖定狀態。
當執行緒A呼叫lock()方法獲取資源時,會呼叫tryAcquire()佔用資源,並將state的值加1。
此後,其他執行緒再tryAcquire()時就會失敗,直到執行緒A呼叫unlock()方法釋放資源,並將state的值減0,其它執行緒才有機會獲取該鎖。
當然,釋放鎖之前,A執行緒自己是可以重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多少次,這樣才能保證state是能回到零態的。
7.2.2 CountDownLatch
再以CountDownLatch以例,任務分為N個子執行緒去執行,state也初始化為N(N與執行緒個數一致)。這N個子執行緒是並行執行的,每個子執行緒執行完後都會呼叫一次countDown()方法,使用CAS操作將state值減1。等到所有子執行緒都執行完後,state的值變為0,這時會呼叫unpark()方法喚醒主執行緒,然後主執行緒就會從await()方法喚醒,繼續後餘動作。