java之AQS和顯式鎖
本次內容主要介紹AQS、AQS的設計及使用、ReentrantLock、ReentrantReadWriteLock以及手寫一個可重入獨佔鎖
1、什麼是AQS?
AQS,佇列同步器AbstractQueuedSynchronizer的簡寫,JDK1.5引入的,是用來構建鎖或者其他同步元件的基礎框架,它使用了一個int成員變量表示同步狀態,通過內建的FIFO佇列來完成資源獲取執行緒的排隊工作。AQS的作者Doug Lea大師期望它能夠成為實現大部分同步需求的基礎。
2、AQS的設計及其作用
AbstractQueuedSynchronizer是一個抽象類,先看一下其類圖。
AQS中裡有一個volatile修飾int型的state來代表同步狀態,使用同步器提供的3個方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))來改變狀態,因為它們能夠保證狀態的改變是安全的。
AQS使用的是模板方法模式,主要使用方式是繼承,且通常將子類推薦定義為靜態內部類,子類通過繼承AQS並實現它的抽象方法來管理同步狀態。AQS自身沒有實現任何同步介面,它僅僅是定義了若干同步狀態獲取和釋放的方法來供自定義同步元件使用,同步器既可以支援獨佔式地獲取同步狀態,也可以支援共享式地獲取同步狀態,這樣就可以方便實現不同型別的同步元件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。AQS是實現鎖(也可以是任意同步元件)的關鍵,在鎖的實現中聚合同步器。可以這樣理解二者之間的關係:
- 鎖是面向使用者的,它定義了使用者與鎖互動的介面(比如可以允許兩個執行緒並行訪問),隱藏了實現細節
- 同步器面向的是鎖的實現者,它簡化了鎖的實現方式,遮蔽了同步狀態管理、執行緒的排隊、等待與喚醒等底層操作。鎖和同步器很好地隔離了使用者和實現者所需關注的領域。實現者需要繼承同步器並重寫指定的方法,隨後將同步器組合在自定義同步元件的實現中,並呼叫同步器提供的模板方法,而這些模板方法將會呼叫使用者重寫的方法。
實現自定義同步元件時,將會呼叫AQS提供的模板方法,AQS的模板方法如下:
AQS提供的模板方法基本上分為3類:獨佔式獲取與釋放同步狀態、共享式獲取與釋放同步狀態和查詢同步佇列中的等待執行緒情況。AQS中可重寫的方法如下:
AQS中有一個內部類Node,用於構造一個佇列來儲存排隊等待獲取鎖的執行緒。看一下Node的原始碼及其簡單說明:
static final class Node { /**標記執行緒是因為獲取共享資源失敗被阻塞新增到佇列中的*/ static final Node SHARED = new Node(); /**表示執行緒因為獲取獨佔資源失敗被阻塞新增到佇列中的*/ static final Node EXCLUSIVE = null; /**表示執行緒因為中斷或者等待超時,需要從等待佇列中取消等待*/ static final int CANCELLED = 1; /**表示當前執行緒佔有鎖,佇列中沒有存放執行緒引用頭結點的後繼結點A處於等待狀態, * 如果已佔有鎖的執行緒釋放鎖或被CANCEL之後就會通知結點A去獲取鎖。*/ static final int SIGNAL = -1; /**當持有鎖的執行緒呼叫了Condition(下面會講到具體運用)的signal()方法之後,處於同一condition下的等待執行緒會去競爭鎖*/ static final int CONDITION = -2; /**表示把waitStatus的值,指示下一個acquireShared應該無條件傳播*/ static final int PROPAGATE = -3; /**表示當前執行緒的等待狀態*/ volatile int waitStatus; volatile Node prev; volatile Node next; /**表示進入AQS佇列中的執行緒引用*/ volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
AQS基礎內容先了解這麼多,後面會用AQS實現一個自己的可重入獨佔式鎖。
3、顯式鎖Lock
與使用關鍵字synchronized相比,顯式鎖Lock提供了更廣泛的加鎖操作。 Lock獲取鎖的方法更加靈活,並且支援多個關聯的Condition物件,先看一下Lock的常用API:
與關鍵字synchronized相比,Lock有以下幾個優勢:
(1)可以嘗試非阻塞地獲取鎖,如果這一時刻鎖沒有被其他執行緒獲取到,則成功獲取並持有鎖。
(2)獲取鎖過程中可以被中斷。
(3)超時獲取鎖,可以指定一個時間,在指定的時間範圍內獲取鎖,如果截止時間到了仍然無法獲取鎖,則返回,可以避免執行緒長時間阻塞。
Lock也有缺點,比如說必須手動的釋放鎖,所以在使用Lock時有一個正規化,以ReentrantLock為例:
class X { private final ReentrantLock lock = new ReentrantLock(); // ... public void m() { lock.lock(); // block until condition holds try { // ... method body } finally { lock.unlock() } } }
還有一個要注意的地方是,不要將獲取鎖的過程寫在try塊中,因為如果在獲取鎖(自定義鎖的實現)時發生了異常,異常丟擲的同時,也會導致鎖無故釋放。
4、ReentrantLock
4.1 公平鎖和非公平鎖
ReentrantLock是可重入的互斥鎖,與使用synchronized修飾的方法和程式碼塊具有相同的基本行為和語義,但具有擴充套件的功能。最明顯的一個擴充套件功能是ReentrantLock可以定義為公平鎖或非公平鎖,synchronized內部實現使用的是非公平鎖機制。從時間上來說,先對鎖進行獲取的請求一定先被滿足,那麼這個鎖是公平的,反之是不公平的。 ReentrantLock提供了一個建構函式,能夠控制鎖是否是公平的。事實上,公平的鎖機制往往沒有非公平的效率高,原因如下:在激烈競爭的情況下,恢復一個被掛起的執行緒與該執行緒真正開始執行之間存在著嚴重的延遲。假設執行緒A持有一個鎖,並且執行緒B請求這個鎖。由於這個鎖已被執行緒A持有,因此B將被掛起。當A釋放鎖時,B將被喚醒,因此會再次嘗試獲取鎖。與此同時,如果C也請求這個鎖,那麼C很可能會在B被完全喚醒之前獲得、使用以及釋放這個鎖。這樣的情況是一種“雙贏”的局面,B獲得鎖的時刻並沒有推遲,C更早地獲得了鎖,並且吞吐量也獲得了提高,用一張圖來說明。
我們可以去看下公平鎖和非公平鎖加鎖的原始碼,區別其實非常小,先看非公平鎖:
1 final boolean nonfairTryAcquire(int acquires) { 2 final Thread current = Thread.currentThread(); 3 int c = getState(); 4 if (c == 0) { 5 if (compareAndSetState(0, acquires)) { 6 setExclusiveOwnerThread(current); 7 return true; 8 } 9 } 10 else if (current == getExclusiveOwnerThread()) { 11 int nextc = c + acquires; 12 if (nextc < 0) // overflow 13 throw new Error("Maximum lock count exceeded"); 14 setState(nextc); 15 return true; 16 } 17 return false; 18 }
再看公平鎖:
1 protected final boolean tryAcquire(int acquires) { 2 final Thread current = Thread.currentThread(); 3 int c = getState(); 4 if (c == 0) { 5 if (!hasQueuedPredecessors() && 6 compareAndSetState(0, acquires)) { 7 setExclusiveOwnerThread(current); 8 return true; 9 } 10 } 11 else if (current == getExclusiveOwnerThread()) { 12 int nextc = c + acquires; 13 if (nextc < 0) 14 throw new Error("Maximum lock count exceeded"); 15 setState(nextc); 16 return true; 17 } 18 return false; 19 } 20 }
通過對比原始碼可以發現,公平鎖在第5行的判斷條件裡多了一個!hasQueuedPredecessors(),這個的意思是查詢是否有執行緒在排隊等待獲取鎖,如果有執行緒在排隊,則不去搶鎖。而非公平鎖才不管你有沒有執行緒在排隊等待,直接去搶一次再說,不管搶不搶的到。
4.2 ReentrantLock使用
隔壁老王在某寶買了一個FBB版的娃娃,假設娃娃從廣東發出,目的是上海,距離大約1500公里。娃娃發出後,在離目的地小於100公里的時候給老王發簡訊說,你的娃娃快到了。在上海的快遞員接到娃娃後,會給老王打電話讓他來取娃娃。這是一個典型的等待/通知機制,在之前的篇幅中我們使用Object類中的wait()和notifyAll()等待通知機制實現了一個自己的資料庫連線池,現在使用ReentrantLock來模擬剛剛老王買娃娃的場景。
業務實現程式碼:
1 import java.util.concurrent.locks.Condition; 2 import java.util.concurrent.locks.Lock; 3 import java.util.concurrent.locks.ReentrantLock; 4 5 public class BuyFBBWawa { 6 public final static String DESTINATION = "Shanghai"; 7 /**娃娃剩餘運輸里程數*/ 8 private int km; 9 /**娃娃當前位置*/ 10 private String site; 11 private Lock lock = new ReentrantLock(); 12 /**距離Condition*/ 13 private Condition kmCondition = lock.newCondition(); 14 /**位置Condition*/ 15 private Condition siteCondition = lock.newCondition(); 16 17 public BuyFBBWawa() { 18 } 19 20 public BuyFBBWawa(int km, String site) { 21 this.km = km; 22 this.site = site; 23 } 24 25 /** 26 * 距離目的地小於100公里,通知處於wait狀態並需要給老王傳送簡訊的執行緒工作 27 */ 28 public void changeKm() { 29 lock.lock(); 30 try { 31 this.km = 99; 32 kmCondition.signal();//通知其他在kmCondition上等待的執行緒 33 } finally { 34 lock.unlock(); 35 } 36 } 37 38 /** 39 * 到達菜鳥驛站,通知處於wait狀態並需要給老王打電話的執行緒工作 40 */ 41 public void changeSite() { 42 lock.lock(); 43 try { 44 this.site = "Shanghai"; 45 siteCondition.signal();//通知其他在siteCondition上等待的執行緒 46 } finally { 47 lock.unlock(); 48 } 49 } 50 51 /** 52 * 當娃娃的剩餘里程數小於100時給老王發簡訊 53 */ 54 public void waitKm() { 55 lock.lock(); 56 try { 57 while (this.km >= 100) { 58 try { 59 kmCondition.await();//當前執行緒在kmCondition上進行等待 60 System.out.println("check km thread[" + Thread.currentThread().getName() 61 + "] is be notify"); 62 } catch (InterruptedException e) { 63 e.printStackTrace(); 64 } 65 } 66 } finally { 67 lock.unlock(); 68 } 69 70 System.out.println("娃娃離老王已經不足100公里,我給他發個簡訊"); 71 } 72 73 /**當娃娃到達目的地時給老王打電話*/ 74 public void waitSite() { 75 lock.lock(); 76 try { 77 while (!this.site.equals(DESTINATION)) { 78 try { 79 siteCondition.await();//當前執行緒在siteCondition上進行等待 80 System.out.println("check Site thread[" + Thread.currentThread().getName() 81 + "] is be notify"); 82 } catch (InterruptedException e) { 83 e.printStackTrace(); 84 } 85 } 86 } finally { 87 lock.unlock(); 88 } 89 System.out.println("娃娃已經到達目的地,我給他打個電話讓他來取"); 90 } 91 }
測試程式碼:
public class TestBuyWawa { private static BuyFBBWawa fbbWawa = new BuyFBBWawa(1500, "Guangdong"); /**檢查里程數變化的執行緒,不滿足條件,執行緒一直等待*/ private static class CheckKm extends Thread { @Override public void run() { fbbWawa.waitKm(); } } /**檢查地點變化的執行緒,不滿足條件,執行緒一直等待*/ private static class CheckSite extends Thread { @Override public void run() { fbbWawa.waitSite(); } } public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 3; i++) { new CheckSite().start(); } for (int i = 0; i < 3; i++) { new CheckKm().start(); } Thread.sleep(1000); fbbWawa.changeKm();//娃娃距離目的地小於100公里 Thread.sleep(2000); fbbWawa.changeSite();//娃娃到達目的地 } }
這段程式碼使用ReentrantLock和Condition模擬了老王買的娃娃的運輸過程,從程式輸出可以看到,通過不同的Condition實現了點對點的通知,這是與使用synchronized+wait()/notifyAll()最大的區別,如果對wait()/notifyAll()使用方法不熟悉的同學,歡迎閱讀之前的《java執行緒間的協作》。使用synchronized+wait()/notifyAll()的時候,不能指定喚醒某類執行緒,只能喚醒等待在物件上的所有執行緒,故儘量使用notifyAll()而不是notify(),在使用Lock+Condition的時候,由於可以指定喚醒某類執行緒,所以儘量使用signal()而不是signalAll()。
5、ReentrantReadWriteLock
5.1 ReentrantReadWriteLock介紹
之前提到的synchroniezd和ReentrantLock都是排它鎖,這些鎖在同一時刻只允許一個執行緒訪問,而讀寫鎖ReentrantReadWriteLock在同一時刻可以允許多個讀執行緒訪問,但是在寫執行緒訪問時,所有的讀執行緒和其他寫執行緒均被阻塞。讀寫鎖維護了一對鎖,一個讀鎖和一個寫鎖,通過分離讀鎖和寫鎖,使得併發性相比一般的排他鎖有了很大提升。除了保證寫操作對讀操作的可見性以及併發性的提升之外,讀寫鎖能夠簡化讀寫互動場景的程式設計方式。假設在程式中定義一個共享的用作快取資料結構,它大部分時間提供讀服務(例如查詢和搜尋),而寫操作佔有的時間很少,但是寫操作完成之後的更新需要對後續的讀服務可見。如果不使用讀寫鎖,完成上述工作就要使用Java的等待通知機制,就是當寫操作開始時,所有晚於寫操作的讀操作均會進入等待狀態,只有寫操作完成並進行通知之後,所有等待的讀操作才能繼續執行(寫操作之間依靠synchronized關鍵進行同步),這樣做的目的是使讀操作能讀取到正確的資料,不會出現髒讀。改用讀寫鎖實現上述功能,只需要在讀操作時獲取讀鎖,寫操作時獲取寫鎖即可。當寫鎖被獲取到時,後續(非當前寫操作執行緒)的讀寫操作都會被阻塞,寫鎖釋放之後,所有操作繼續執行,程式設計方式相對於使用等待通知機制的實現方式而言,變得簡單明瞭。一般情況下,讀寫鎖的效能都會比排它鎖好,因為大多數場景讀是多於寫的。在讀多於寫的情況下,讀寫鎖能夠提供比排它鎖更好的併發性和吞吐量。
5.2 使用ReentrantReadWriteLock
我們來模擬一個讀多寫少的場景,分別使用synchroniezd和ReentrantReadWriteLock,看看效率的差異。假設某種商品,讀寫比列為1:10,我們寫一段程式碼來模擬。
商品類:
public class GoodsInfo { /**總銷售額*/ private double totalMoney; /**庫存數*/ private int storeNumber; public GoodsInfo( int totalMoney, int storeNumber) { this.totalMoney = totalMoney; this.storeNumber = storeNumber; } public double getTotalMoney() { return totalMoney; } public int getStoreNumber() { return storeNumber; } public void changeNumber(int sellNumber) { this.totalMoney += sellNumber * 9.9; this.storeNumber -= sellNumber; } }
商品介面:
public interface GoodsService { GoodsInfo getNumber(); void setNumber(int number); }
使用讀寫鎖來實現商品介面:
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class UseRwLock implements GoodsService { private GoodsInfo goodsInfo; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock getLock = lock.readLock();//讀鎖 private final Lock setLock = lock.writeLock();//寫鎖 public UseRwLock(GoodsInfo goodsInfo) { this.goodsInfo = goodsInfo; } @Override public GoodsInfo getNumber() { getLock.lock(); try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } finally { getLock.unlock(); } return this.goodsInfo; } @Override public void setNumber(int number) { setLock.lock(); try { Thread.sleep(5); goodsInfo.changeNumber(number); } catch (InterruptedException e) { e.printStackTrace(); } finally { setLock.unlock(); } } }
使用synchronized實現商品介面:
public class UseSynchronized implements GoodsService { private GoodsInfo goodsInfo; public UseSynchronized(GoodsInfo goodsInfo) { this.goodsInfo = goodsInfo; } @Override public synchronized GoodsInfo getNumber() { try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } return this.goodsInfo; } @Override public synchronized void setNumber(int number) { try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } goodsInfo.changeNumber(number); } }
測試類,先使用synchronized實現:
1 import java.util.Random; 2 3 public class GoodsInfoTest { 4 static final int readWriteRatio = 10;//讀寫執行緒的比例 5 static final int writeThreadCount = 1;//寫執行緒數量 6 7 /** 8 * 讀執行緒 9 */ 10 private static class GetTask implements Runnable { 11 private GoodsService goodsService; 12 13 public GetTask(GoodsService goodsService) { 14 this.goodsService = goodsService; 15 } 16 17 @Override 18 public void run() { 19 long start = System.currentTimeMillis(); 20 for (int i = 0; i < 100; i++) {//每個讀執行緒操作100次 21 goodsService.getNumber(); 22 } 23 System.out.println(Thread.currentThread().getName() + "讀取商品資料耗時:" 24 + (System.currentTimeMillis() - start) + "ms"); 25 } 26 } 27 28 /** 29 * 寫執行緒 30 */ 31 private static class SetTask implements Runnable { 32 private GoodsService goodsService; 33 34 public SetTask(GoodsService goodsService) { 35 this.goodsService = goodsService; 36 } 37 38 @Override 39 public void run() { 40 long start = System.currentTimeMillis(); 41 Random r = new Random(); 42 for (int i = 0; i < 10; i++) {//每個寫執行緒操作10次 43 goodsService.setNumber(r.nextInt(10)); 44 } 45 System.out.println(Thread.currentThread().getName() 46 + "寫商品資料耗時:" + (System.currentTimeMillis() - start) + "ms"); 47 } 48 } 49 50 public static void main(String[] args) throws InterruptedException { 51 GoodsInfo goodsInfo = new GoodsInfo(100000, 10000); 52 GoodsService goodsService = new UseSynchronized(goodsInfo); 53 54 for (int i = 0; i < writeThreadCount; i++) { //啟動1個寫執行緒 55 new Thread(new SetTask(goodsService)).start(); 56 for (int j = 0; j < readWriteRatio; j++) { //啟動10個讀執行緒 57 new Thread(new GetTask(goodsService)).start(); 58 } 59 Thread.sleep(10); 60 } 61 } 62 }
程式輸出:
再把剛剛測試類修改一下,只需要把第52行修改成讀寫鎖實現,即GoodsService goodsService = new UseRwLock(goodsInfo);程式輸出:
對比可以看出,對於讀多寫少的場景,使用讀寫鎖比使用獨佔鎖效率高很多。
6、手寫一個自己的可重入獨佔鎖
鎖的重進入是指任意執行緒在獲取到鎖之後能夠再次獲取該鎖而不會被鎖所阻塞,synchronized關鍵字隱式的支援重進入,比如一個synchronized修飾的遞迴方法,在方法執行時,執行執行緒在獲取了鎖之後仍能連續多次地獲得該鎖,該特性的實現需要解決以下兩個問題:
(1)執行緒再次獲取鎖。鎖需要去識別獲取鎖的執行緒是否為當前佔據鎖的執行緒,如果是,則再次成功獲取。
(2)鎖的最終釋放。執行緒重複n次獲取了鎖,隨後在第n次釋放該鎖後,其他執行緒能夠獲取到該鎖。鎖的最終釋放要求鎖對於獲取進行計數自增,計數表示當前鎖被重複獲取的次數,而鎖被釋放時,計數自減,當計數等於0時表示鎖已經成功釋放。
從上面ReentrantLock的公平鎖和非公平鎖加鎖的原始碼也可以看出,getState()返回的是一個累計獲取鎖的次數。我們基於以上2點,利用AQS手寫一個簡易版本的可重入獨佔鎖。
實現類:
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class MyReentrantLock implements Lock { /*** * 內部類繼承AQS */ static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int arg) { if (compareAndSetState(0, 1)) {//鎖被第一次獲取 setExclusiveOwnerThread(Thread.currentThread());//設定當前執行緒為鎖獨佔執行緒 return true; } else if (Thread.currentThread() == getExclusiveOwnerThread()) {//鎖被多次獲取 setState(getState() + 1);//對獲取鎖的次數累加 return true; } return false; } @Override protected boolean tryRelease(int arg) { if (Thread.currentThread() != getExclusiveOwnerThread()) { throw new IllegalMonitorStateException(); } if (getState() == 0) { throw new IllegalMonitorStateException(); } setState(getState() - 1); if (getState() == 0) { setExclusiveOwnerThread(null); } return true; } @Override protected boolean isHeldExclusively() { return getState() > 0; } /** * 返回一個Condition,每個condition都包含了一個condition佇列 * 這是能夠喚醒指定執行緒的關鍵 */ Condition newCondition() { return new ConditionObject(); } } /**僅需要將操作代理到Sync上,呼叫AQS模板方法*/ private final Sync sync = new Sync(); /*** * 呼叫AQS的模板方法acquire(int arg) */ public void lock() { System.out.println(Thread.currentThread().getName() + " 準備獲取鎖"); sync.acquire(1); System.out.println(Thread.currentThread().getName() + " 已經獲取到鎖"); } public boolean tryLock() { return sync.tryAcquire(1); } /*** * 呼叫AQS的模板方法release(int arg) */ public void unlock() { System.out.println(Thread.currentThread().getName() + " 準備釋放鎖"); sync.release(1); System.out.println(Thread.currentThread().getName() + " 已經釋放鎖"); } public Condition newCondition() { return sync.newCondition(); } public boolean isLocked() { return sync.isHeldExclusively(); } public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }
測試類:
import java.util.concurrent.locks.Lock; public class Test { static final Lock lock = new MyReentrantLock(); /** * 遞迴獲取鎖 * * @param deep 遞迴深度 */ public static void reenter(int deep) { lock.lock(); try { System.out.println(Thread.currentThread().getName() + ":遞迴深度:" + deep); int currentDeep = deep - 1; if (currentDeep == 0) { return; } else { reenter(currentDeep); } } finally { lock.unlock(); } } static class WorkerThread extends Thread { public void run() { reenter(3); } } public static void main(String[] args) { // 啟動2個子執行緒去爭搶鎖 for (int i = 0; i < 2; i++) { Thread thread = new WorkerThread(); thread.start(); } } }
從程式輸出可以看到,利用AQS,我們自定義的MyReentrantLock實現了可重入獨佔鎖的功能。
7、結語
本次就分享這麼多內容,希望大家看了有收穫。下一篇內容中會介紹Java執行緒池相關知識點,閱讀過程中如發現描述有誤,請指出,謝謝。
&n