J.U.C剖析與解讀1(Lock的實現)
J.U.C剖析與解讀1(Lock的實現)
前言
為了節省各位的時間,我簡單介紹一下這篇文章。這篇文章主要分為三塊:Lock的實現,AQS的由來(通過演變的方式),JUC三大工具類的使用與原理剖析。
Lock的實現:簡單介紹ReentrantLock,ReentrantReadWriteLock兩種JUC下經典Lock的實現,並通過手寫簡化版的ReentrantLock和ReentrantReadWriteLock,從而瞭解其實現原理。
AQS的由來:通過對兩個簡化版Lock的多次迭代,從而獲得AQS。並且最終的Lock實現了J.U.C下Lock介面,既可以使用我們演變出來的AQS,也可以對接JUC下的AQS。這樣一方面可以幫助大家理解AQS,另一方面大家可以從中瞭解,如何利用AQS實現自定義Lock。而這兒,對後續JUC下的三大Lock工具的理解有非常大的幫助。
JUC三大工具:經過前兩個部分的學習,這個部分不要太easy。可以很容易地理解CountDownLatch,Semaphore,CyclicBarrier的內部執行及實現原理。
不過,由於這三塊內容較多,所以我將它拆分為三篇子文章進行論述。
一,介紹
Lock
Lock介面位於J.U.C下locks包內,其定義了Lock應該具備的方法。
Lock 方法簽名:
- void lock():獲取鎖(不死不休,拿不到就一直等)
- boolean tryLock():獲取鎖(淺嘗輒止,拿不到就算了)
- boolean tryLock(long time, TimeUnit unit) throws InterruptedException:獲取鎖(過時不候,在一定時間內拿不到鎖,就算了)
- void lockInterruptibly() throws InterruptedException:獲取鎖(任人擺佈,xxx)
- void unlock():釋放鎖
- Condition newCondition():獲得Condition物件
ReentrantLock
簡介
ReentrantLock是一個可重入鎖,一個悲觀鎖,預設是非公平鎖(但是可以通過Constructor設定為公平鎖)。
Lock應用
ReentrantLock通過構造方法獲得lock物件。利用lock.lock()方法對當前執行緒進行加鎖操作,利用lock.unlock()方法對當前執行緒進行釋放鎖操作。
Condition應用
通過
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
獲得Condition物件(Condition是J.U.C下locks包下的介面)。
通過Condition物件的.await(*),可以將當前執行緒的執行緒狀態切換到Waiting狀態(如果是有參,則是Time Waiting狀態)。而.signal(),.signalAll()等方法則正好相反,恢復執行緒狀態為Runnable狀態。
ReentrantReadWriteLock
簡介
ReentrantLock和Synchronized功能類似,更加靈活,當然,也更加手動了。
大家都知道,只有涉及資源的競爭時,採用同步的必要。寫操作自然屬於資源的競爭,但是讀操作並不屬於資源的競爭行為。簡單說,就是寫操作最多隻能一個執行緒(因為寫操作涉及資料改變,多個執行緒同時寫,會產生資源同步問題),而讀操作可以有多個(因為不涉及資料改變)。
所以在讀多寫少的場景下,ReentrantLock就比較浪費資源了。這就需要一種能夠區分讀寫操作的鎖,那就是ReentrantReadWriteLock。通過ReentrantReadWriteLock,可以獲得讀鎖與寫鎖。當寫鎖存在時,有且只能有一個執行緒持有鎖。當寫鎖不存在時,可以有多個執行緒持有讀鎖(寫鎖,必須等待讀鎖釋放完,才可以持有鎖)。
Lock及Condition應用
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
readLock.lock();
readLock.unlock();
readLock.newCondition();
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
writeLock.lock();
writeLock.unlock();
writeLock.newCondition();
與之前ReentrantLock應用的區別,就是需要通過lock.readLock()與lock.writeLock()來獲取讀鎖,寫鎖,再進行加鎖,釋放鎖的操作,以及Condition的獲取操作。
二,手寫ReentrantLock
獲取需求
終於上大餐了。
首先第一步操作,我們需要確定我們要做什麼。
我們要做一個鎖,這裡姑且命名為JarryReentrantLock。
這個鎖,需要具備以下特性:可重入鎖,悲觀鎖。
另外,為了更加規範,以後更好地融入到AQS中,該鎖需要實現Lock介面。
而Lock的方法簽名,在文章一開始,就已經寫了,這裡不再贅述。
當然,我們這裡只是一個demo,所以就不實現Condition了。另外tryLock(long,TimeUnit)也不再實現,因為實現了整體後,這個實現其實並沒有想象中那麼困難。
JarryReentrantLock實現原理
既然需要已經確定,並且API也確定了。
那麼第二步操作,就是簡單思考一下,如何實現。
類成員方面:
首先,我們需要一個owner屬性,來儲存持有鎖的執行緒物件。
其次,由於是可重入鎖,所以我們需要一個count來儲存重入次數。
最後,我們需要一個waiters屬性,來儲存那些競爭鎖失敗後,還在等待(不死不休型)的執行緒物件。
類方法方面:
- tryLock:嘗試獲取鎖,成功返回true,失敗返回false。首先是獲取鎖的行為,可以通過CAS操作實現,或者更簡單一些,通過Atomic包實現(其底層也還是CAS)。另外,由於是可重入鎖,所以在嘗試獲取鎖時,需要判斷嘗試獲取鎖的執行緒是否為當前鎖的持有者執行緒。
- lock:嘗試獲取鎖,直到成功獲得鎖。看到這種不成功便成仁的精神,我第一個想法是迴圈呼叫tryLock。但是這實在太浪費資源了(畢竟長時間的忙迴圈是非常消耗CPU資源的)。所以就是手動通過LockSupport.park()將當前執行緒掛起,然後置入等待佇列waiters中,直到釋放鎖操作來呼叫。
- tryUnlock:嘗試解鎖,成功返回true,失敗返回false。首先就是在釋放鎖前,需要判斷嘗試解鎖的執行緒與鎖的持有者是否為同一個執行緒(總不能執行緒A把執行緒B持有的鎖給釋放了吧)。其次,需要判斷可重入次數count是否為0,從而決定是否將鎖的持有owner設定為null。最後,就是為了避免在count=0時,其他執行緒同時進行加鎖操作,造成的count>0,owner=null的情況,所以count必須是Atomic,並此處必須採用CAS操作(這裡有些難理解,可以看程式碼,有相關注釋)。
- unlock:解鎖操作。這裡嘗試進行解鎖,如果解鎖成功,需要從等待佇列waiters中喚醒一個執行緒(喚醒後的執行緒,由於在迴圈中,所以會繼續進行競爭鎖操作。但是切記該執行緒不一定競爭鎖成功,因為可能有新來的執行緒,搶先一步。那麼該執行緒會重新進入佇列。所以,此時的JarryReentrantLock只支援不公平鎖)。
JarryReentrantLock實現
那麼接下來,就根據之前的資訊,進行編碼吧。
package tech.jarry.learning.netease;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
/**
* @Description: 仿ReentrantLock,實現其基本功能及特性
* @Author: jarry
*/
public class JarryReentrantLock implements Lock {
// 加鎖計數器
private AtomicInteger count = new AtomicInteger(0);
// 鎖持有者
private AtomicReference<Thread> owner = new AtomicReference<>();
// 等待池
private LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
@Override
public boolean tryLock() {
// 判斷當前count是否為0
int countValue = count.get();
if (countValue != 0){
// countValue不為0,意味著鎖被執行緒持有
// 進而判斷鎖的持有者owner是否為當前執行緒
if (Thread.currentThread() == owner.get()){
// 鎖的持有者為當前執行緒,那麼就重入加鎖
// 既然鎖已經被當前執行緒佔有,那麼就不用擔心count被其他執行緒修改,即不需要使用CAS
count.set(countValue+1);
// 執行重入鎖,表示當前執行緒獲得了鎖
return true;
}else{
// 如果當前執行緒不是鎖的持有者,返回false(該方法是tryLock,即淺嘗輒止)
return false;
}
}else {
// countValue為0,意味著當前鎖不被任何執行緒持有
// 通過CAS操作將count修改為1
if (count.compareAndSet(countValue,countValue+1)){
// count修改成功,意味著該執行緒獲得了鎖(只有一個CAS成功修改count,那麼這個CAS的執行緒就是鎖的持有者)
// 至於這裡為什麼不用擔心可見性,其實一開始我也比較擔心其發生類似doubleCheck中重排序造成的問題(tryUnlock是會設定null的)
// 看了下原始碼,AtomicReference中的value是volatile的
owner.set(Thread.currentThread());
return true;
} else {
// CAS操作失敗,表示當前執行緒沒有成功修改count,即獲取鎖失敗
return false;
}
}
}
@Override
public void lock() {
// lock()【不死不休型】就等於執行tryLock()失敗後,仍然不斷嘗試獲取鎖
if (!tryLock()){
// 嘗試獲取鎖失敗後,就只能進入等待佇列waiers,等待機會,繼續tryLock()
waiters.offer(Thread.currentThread());
// 通過自旋,不斷嘗試獲取鎖
// 其實我一開始也不是很理解為什麼這樣寫,就可以確保每個執行lock()的執行緒就在一直競爭鎖。其實,想一想執行lock()的執行緒都有這個迴圈。
// 每次unlock,都會將等待佇列的頭部喚醒(unpark),那麼處在等待佇列頭部的執行緒就會繼續嘗試獲取鎖,等待佇列的其它執行緒仍然,繼續阻塞(park)
// 這也是為什麼需要在迴圈體中執行一個檢測當前執行緒是否為等待佇列頭元素等一系列操作。
// 另外,還有就是:處於等待狀態的執行緒可能收到錯誤警報和偽喚醒,如果不在迴圈中檢測等待條件,程式就會在沒有滿足結束條件的情況下退出。反正最後無論那個分支,都return,結束方法了。
// 即使沒有偽喚醒問題,while還是需要的,因為執行緒需要二次嘗試獲得鎖
while (true){
// 獲取等待佇列waiters的頭元素(peek表示獲取頭元素,但不刪除。poll表示獲取頭元素,並刪除其在佇列中的位置)
Thread head = waiters.peek();
// 如果當前執行緒就是等待佇列中的頭元素head,說明當前等待佇列就剛剛加入的元素。
if (head == Thread.currentThread()){
// 嘗試再次獲得鎖
if (!tryLock()){
// 再次嘗試獲取鎖失敗,即將該執行緒(即當前執行緒)掛起,
LockSupport.park();
} else {
// 獲取鎖成功,即將該執行緒(等待佇列的頭元素)從等待佇列waiters中移除
waiters.poll();
return;
}
} else {
// 如果等待佇列的頭元素head,不是當前執行緒,表示等待佇列在當前執行緒加入前,就還有別的執行緒在等待
LockSupport.park();
}
}
}
}
private boolean tryUnlock() {
// 首先確定當前執行緒是否為鎖持有者
if (Thread.currentThread() != owner.get()){
// 如果當前執行緒不是鎖的持有者,就丟擲一個異常
throw new IllegalMonitorStateException();
} else {
// 如果當前執行緒是鎖的持有者,就先count-1
// 另外,同一時間執行解鎖的只可能是鎖的持有者執行緒,故不用擔心原子性問題(原子性問題只有在多執行緒情況下討論,才有意義)
int countValue = count.get();
int countNextValue = countValue - 1;
count.compareAndSet(countValue,countNextValue);
if (countNextValue == 0){
// 如果當前count為0,意味著鎖的持有者已經完全解鎖成功,故應當失去鎖的持有(即設定owner為null)
// 其實我一開始挺糾結的,這裡為什麼需要使用CAS操作呢。反正只有當前執行緒才可以走到程式這裡。
// 首先,為什麼使用CAS。由於count已經設定為0,其它執行緒已經可以修改count,修改owner了。所以不用CAS就可能將owner=otherThread設定為owner=null了,最終的結果就是徹底卡死
//TODO_FINISHED 但是unlock()中的unpark未執行,根本就不會有其它執行緒啊。囧
// 這裡程式碼還是為了體現原始碼的一些特性。實際原始碼是將這些所的特性,抽象到了更高的層次,形成一個AQS。
// 雖然tryUnlock是由實現子類實現,但countNextValue是來自countValue(而放在JarryReadWriteLock中就是writeCount),在AQS原始碼中,則是通過state實現
// 其次,有沒有ABA問題。由於ABA需要將CAS的expect值修改為currentThread,而當前執行緒只能單執行緒執行,所以不會。
// 最後,這裡owner設定為null的操作到底需不需要。實際原始碼可能是需要的,但是這裡貌似真的不需要。
owner.compareAndSet(Thread.currentThread(),null);
// 解鎖成功
return true;
} else {
// count不為0,解鎖尚未完全完成
return false;
}
}
}
@Override
public void unlock() {
if (tryUnlock()){
// 如果當前執行緒成功tryUnlock,就表示當前鎖被空置出來了。那就需要從備胎中,啊呸,從waiters中“放“出來一個
Thread head = waiters.peek();
// 這裡需要做一個簡單的判斷,防止waiters為空時,丟擲異常
if (head != null){
LockSupport.unpark(head);
}
}
}
// 非核心功能就不實現了,起碼現在不實現了。
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
這裡就不進行一些解釋了。因為需要的解釋,在註釋中都寫的很明確了,包括我踩的一些坑。
如果依舊有一些看不懂的地方,或者錯誤的地方,歡迎@我,或者私信我。
三,手寫ReentrantReadWriteLock
獲取需求
與ReentrantLock一樣,首先第一步操作,我們需要確定我們要做什麼。
我們要做一個鎖,這裡姑且命名為JarryReadWriteLock。
這個鎖,需要具備以下特性:讀寫鎖,可重入鎖,悲觀鎖。
一方面了為了更好理解(第一版本,重在理解基礎,不是嘛),另一方面也是為了更好地複用前面ReentrantLock的程式碼(畢竟ReentrantLock其實就是讀寫鎖的寫鎖,不是嘛),這裡的JarryReadWriteLock的API不再與官方的ReentrantReadWriteLock相同,而是做了小小調整。直接呼叫相關讀鎖的加解鎖API,已經相關寫鎖的加解鎖API。具體看程式碼部分。
JarryReadWriteLock實現原理
既然需要已經確定,並且API也確定了。
那麼第二步操作,就是簡單思考一下,如何實現。
類成員方面:
首先,我們需要一個owner屬性,來儲存持有寫鎖的執行緒物件。
其次,由於寫鎖是可重入鎖,所以我們需要一個readCount來儲存重入次數。
然後,由於讀鎖是可以有多個執行緒持有的,所以我們需要一個writeCount來儲存讀鎖持有執行緒數。
最後,我們需要一個waiters屬性,來儲存那些競爭鎖失敗後,還在等待(不死不休型)的執行緒物件。
自定義資料結構:
到這這裡,就不禁會有一個疑問。如何判斷嘗試獲取鎖的執行緒想要獲得的鎖是什麼型別的鎖。在API呼叫階段,我們可以根據API判斷。但是放入等待佇列後,我們如何判斷呢?如果還是如之前那樣,等待佇列只是儲存競爭鎖的執行緒物件,是完全不夠的。
所以我們需要新建一個WaitNode的Class,用來儲存等待佇列中執行緒物件及相關必要資訊。所以,WaitNode會有如下屬性:
- Thread thread:標識該等待者的執行緒。
- int type:標識該執行緒物件希望競爭的鎖的型別。0表示寫鎖(獨佔鎖),1表示讀鎖(共享鎖)。
- int arg:擴充套件引數。其實在手寫的簡易版,看不出來價值。但是實際AQS中的Node就是類似設計。不過AQS中,並不是採用queue儲存Node,而是通過一個連結串列的方式儲存Node。
類方法方面:
- 獨佔鎖:
- tryLock:與JarryReentrantLock類似,不過增加了兩點。一方面需要考量共享鎖是否被佔用。另一方面需要引入acquire引數(目前是固定值),呼應WaitNode的arg。
- lock:與JarryReentrantLock類似,不過需要手動設定arg。
- tryUnlock:與JarryReentrantLock類似,同樣需要引入release引數(目前是固定值),呼應WaitNode的arg。
- unlock:與JarryReentrantLock類似,不過需要手動設定arg。
- 共享鎖:
- tryLockShared:嘗試獲取共享鎖,成功返回true,失敗返回false。其實和獨佔鎖的tryLock類似,只不過需要額外考慮獨佔鎖是否已經存在。另外為了實現鎖降級,如果獨佔鎖存在,需要判斷獨佔鎖的持有者與當前嘗試獲得共享鎖的執行緒是否一致。
- lockShared:獲取共享鎖,直到成功。由於已經有了WaitNode.type,用於判斷鎖型別,所以共享鎖與獨佔鎖使用的是同一佇列。同樣的,這裡需要手動設定arg。其它方面與獨佔鎖的lock操作基本一致。
- tryUnlockShared:嘗試釋放鎖,成功返回true,失敗返回false。類似於tryUnlock,只不過增加了release引數(固定值),呼應WaitNode的arg。
- unlockShared:釋放鎖。類似unlock,不過需要手動設定arg。
JarryReentrantLock實現
package tech.jarry.learning.netease;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
/**
* @Description:
* @Author: jarry
*/
public class JarryReadWriteLock {
// 用於讀鎖(共享鎖)的鎖計數器 這裡真的有必要volatile嘛(Atomic中的value時volatile的),再看看後續程式碼
// 這裡確實不需要volatile,至於原始碼,更過分,原始碼是通過一個變數state的位運算實現readCount與writeCount
volatile AtomicInteger readCount = new AtomicInteger(0);
// 用於寫鎖(獨佔鎖)的鎖計數器 這裡之所以不用volatile是因為獨佔鎖,只有一個執行緒在改變writeCount(即使有快取,也還是這個執行緒,所以不會因為快取問題,導致問題)
AtomicInteger writeCount = new AtomicInteger(0);
// 用於儲存鎖的持有者(這裡專指寫鎖(獨佔鎖)的鎖持有者)
AtomicReference<Thread> owner = new AtomicReference<>();
// 用於儲存期望獲得鎖的執行緒(為了區分執行緒希望獲得的鎖的型別,這裡新建一個新的資料型別(通過內部類實現))
public volatile LinkedBlockingQueue<WaitNode> waiters = new LinkedBlockingQueue<>();
// 內部類實現等待佇列中的自定義資料型別
class WaitNode{
// 表示該等待者的執行緒
Thread thread = null;
// 表示希望爭取的鎖的型別。0表示寫鎖(獨佔鎖),1表示讀鎖(共享鎖)
int type = 0;
// 引數,acquire,狀態相關,再看看
int arg = 0;
public WaitNode(Thread thread, int type, int arg) {
this.type = type;
this.thread = thread;
this.arg = arg;
}
}
/**
* 嘗試獲取獨佔鎖(針對獨佔鎖)
* @param acquires 用於加鎖次數。一般傳入waitNode.arg(本程式碼中就是1。為什麼不用一個常量1,就不知道了?)(可以更好的對接AQS)
* @return
*/
public boolean tryLock(int acquires){
//TODO_FINISHED 這裡readCount的判斷,與修改writeCount的操作可以被割裂,並不是原子性的。不就有可能出現readCount與writeCount的值同時大於零的情況。
// 該示例程式碼,確實存在該問題,但實際原始碼,writeCount與readCount是通過同一變數state實現的,所以可以很好地通過CAS確保原子性
// readCount表示讀鎖(共享鎖)的上鎖次數
if (readCount.get() == 0){
// readCount的值為0,表示讀鎖(共享鎖)空置,所以當前執行緒是有可能獲得寫鎖(獨佔鎖)。
// 接下來判斷寫鎖(獨佔鎖)是否被佔用
int writeCountValue = writeCount.get();
if (writeCountValue == 0){
// 寫鎖(獨佔鎖)的鎖次數為0,表示寫鎖(獨佔鎖)並沒未被任何執行緒持有
if (writeCount.compareAndSet(writeCountValue,writeCountValue+acquires)){
// 修改writeCount,來獲得鎖。該機制與ReentrantLock相同
// 設定獨享鎖的持有者owner
owner.set(Thread.currentThread());
// 至此,表示當前執行緒搶鎖成功
return true;
}
} else {
// 寫鎖(獨佔鎖)的鎖次數不為0,表示寫鎖(獨佔鎖)已經被某執行緒持有
if (Thread.currentThread() == owner.get()){
// 如果持有鎖的執行緒為當前執行緒,那就進行鎖的重入操作
writeCount.set(writeCountValue+acquires);
// 重入鎖,表示當前執行緒是持有鎖的
return true;
}
// 讀鎖未被佔用,但寫鎖被佔用,且佔據寫鎖的執行緒不是當前執行緒
}
}
// 讀鎖被佔據
// 其它情況(1.讀鎖被佔據,2讀鎖未被佔用,但寫鎖被佔用,且佔據寫鎖的執行緒不是當前執行緒),都返回false
return false;
}
/**
* 獲取獨佔鎖(針對獨佔鎖)
*/
public void lock(){
// 設定waitNote中arg引數
int arg = 1;
// 嘗試獲取獨佔鎖。成功便退出方法,失敗,則進入“不死不休”邏輯
if (!tryLock(arg)){
// 需要將當前儲存至等待佇列,在這之前,需要封裝當前執行緒為waitNote
WaitNode waitNode = new WaitNode(Thread.currentThread(), 0, arg);
// 將封裝好的waitNode放入等待佇列waiters中(offer方法會在佇列滿時,直接返回false。put則是阻塞。add則是丟擲異常)
waiters.offer(waitNode);
// 如ReentrantLock一般,開始迴圈嘗試拿鎖
while (true){
// 獲取佇列頭部元素
WaitNode headNote = waiters.peek();
// 如果等待佇列頭部元素headNote不為null(有可能是null嘛?),並且就是當前執行緒,那就嘗試獲取鎖
if (headNote !=null && headNote.thread == Thread.currentThread()){
// 如果再次嘗試獲取鎖失敗,那就只能掛起了
if (!tryLock(headNote.arg)){
LockSupport.park();
} else {
// 再次嘗試獲取鎖成功,那就將佇列頭部元素,踢出等待佇列waiters
waiters.poll();
return;
}
}else {
// 如果headNote不是當前執行緒的封裝,就直接掛起(這裡就沒處理headNote==null的情況)
LockSupport.park();
}
}
}
}
/**
* 嘗試解鎖(針對獨佔鎖)
* @param releases 用於設定解鎖次數。一般傳入waitNode.arg
* @return
*/
public boolean tryUnlock(int releases){
// 首先判斷鎖的持有者是否為當前執行緒
if (owner.get() != Thread.currentThread()){
// 鎖的持有者不是當前執行緒(即使鎖的持有者為null,鎖的持有者是null,還解鎖,仍然是丟擲異常)
throw new IllegalMonitorStateException();
}
// 鎖的持有者就是當前執行緒
// 首先按照releases進行解鎖(經過一番思考後,這裡不會出現類似DoubleCheck中的問題(Atomic中的value是volatile的),所以這個值同時只會有一個執行緒對其操作)
int writeCountValue = writeCount.get();
// 為writeCount設定新值
writeCount.set(writeCountValue-releases);
// 根據writeCount的新值,判斷鎖的持有者是否發生變化
if (writeCount.get() == 0){
// writeCount的值為0,表示當前執行緒已經完全解鎖,所以修改鎖的持有者為null
owner.set(null);
// 而這表示完全解鎖成功
return true;
} else {
// writeCount的值不為0,表示當前執行緒尚未完全解鎖,故鎖的持有者未發生變化。即嘗試解鎖失敗
return false;
}
}
/**
* 解鎖(針對獨佔鎖)
*/
public void unlock(){
// 設定tryUnlock的引數releases
int arg = 1;
// 先嚐試解鎖
if (tryUnlock(arg)){
// 獲得等待佇列的頭部元素
WaitNode head = waiters.peek();
// 檢測一下頭部元素head是否null(也許等待佇列根本就沒有元素)
if (head == null){
// 如果頭部元素head為null,說明佇列為null,直接return
return;
}
// 解鎖成功,就要把等待佇列中的頭部元素喚醒(unpark)
// 這裡有一點注意,即使佇列的頭元素head被喚醒了,也不一定就是這個頭元素head獲得鎖(詳見tryLock,新來的執行緒可能獲得鎖)
// 如果這個頭元素無法獲得鎖,就會park(while迴圈嘛)。並且一次park,可以多次unpark(已實踐)
LockSupport.unpark(head.thread);
}
}
/**
* 嘗試獲取共享鎖(針對共享鎖)
* @param acquires
* @return
*/
public boolean tryLockShared(int acquires){
// 判斷寫鎖(獨佔鎖)是否被別的執行緒持有(這個條件意味著:同一個執行緒可以同時持有讀鎖與寫鎖)
// 該方法是為了進行 鎖降級******
if (writeCount.get() == 0 || owner.get() == Thread.currentThread()){
// 如果寫鎖(獨佔鎖)沒有別的被執行緒持有,就可以繼續嘗試獲取讀鎖(共享鎖)
// 通過迴圈實現自旋,從而實現加鎖(避免加鎖失敗)
while(true){
// 由於讀鎖(共享鎖)是共享的,不存在獨佔行為,故直接在writeCount增加當前執行緒加鎖行為的次數acquires
int writeCountValue = writeCount.get();
// 通過CAS進行共享鎖的次數的增加
if (writeCount.compareAndSet(writeCountValue, writeCountValue+acquires)){
break;
}
}
}
// 寫鎖已經被別的執行緒持有,共享鎖獲取失敗
return false;
}
/**
* 獲取共享鎖(針對共享鎖)
*/
public void lockShared(){
// 設定waitNote中arg引數
int arg = 1;
// 判斷是否獲取共享鎖成功
if (!tryLockShared(arg)){
// 如果獲取共享鎖失敗,就進入等待佇列
// 與獲取同步鎖操作一樣的,需要先對當前執行緒進行WaitNote的封裝
WaitNode waitNode = new WaitNode(Thread.currentThread(),1,arg);
// 將waitNote置入waiters(offer方法會在佇列滿時,直接返回false。put則是阻塞。add則是丟擲異常)
waiters.offer(waitNode);
// 使用迴圈。一方面避免偽喚醒,另一方面便於二次嘗試獲取鎖
while (true){
// 獲取等待佇列waiters的頭元素head
WaitNode head = waiters.peek();
// 校驗head是否為null,並判斷等待佇列的頭元素head是否為當前執行緒的封裝(也許head時當前執行緒的封裝,但並不意味著head就是剛剛放入waiters的元素)
if (head != null && head.thread == Thread.currentThread()){
// 如果校驗通過,並且等待佇列的頭元素head為當前執行緒的封裝,就再次嘗試獲取鎖
if (tryLockShared(head.arg)){
// 獲取共享鎖成功,就從當前佇列中移除head元素(poll()方法移除佇列頭部元素)
waiters.poll();
// 在此處就是與獨佔鎖不同的地方了,獨佔鎖意味著只可能有一個執行緒獲得鎖,而共享鎖是可以有多個執行緒獲得的
// 獲得等待佇列的新頭元素newHead
WaitNode newHead = waiters.peek();
// 校驗該元素是否為null,並判斷它的鎖型別是否為共享鎖
if (newHead != null && newHead.type == 1){
// 如果等待佇列的新頭元素是爭取共享鎖的,那麼就喚醒它(這是一個類似迭代的過程,剛喚醒的執行緒會會做出同樣的舉動)
//TODO_FINISHED 這裡有一點,我有些疑惑,那麼如果等待佇列是這樣的{共享鎖,共享鎖,獨佔鎖,共享鎖,共享鎖},共享鎖們被一個獨佔鎖隔開了。是不是就不能喚醒後面的共享鎖了。再看看後面的程式碼
// 這個實際原始碼,並不是這樣的。老師表示現有程式碼是這樣的,不用理解那麼深入,後續有機會看看原始碼
LockSupport.unpark(newHead.thread);
}
} else {
// 如果再次獲取共享鎖失敗,就掛起
LockSupport.park();
}
} else {
// 如果校驗未通過,或等待佇列的頭元素head不是當前執行緒的封裝,就掛起當前執行緒
LockSupport.park();
}
}
}
}
/**
* 嘗試解鎖(針對共享鎖)
* @param releases
* @return
*/
public boolean tryUnlockShared(int releases){
// 通過CAS操作,減少共享鎖的鎖次數,即readCount的值(由於是共享鎖,所以是可能多個執行緒同時減少該值的,故採用CAS)
while (true){
// 獲取讀鎖(共享鎖)的值
int readCountValue = readCount.get();
int readCountNext = readCountValue - releases;
// 只有成功修改值,才可以跳出
if (readCount.compareAndSet(readCountValue,readCountNext)){
// 用於表明共享鎖完全解鎖成功
return readCountNext == 0;
}
}
// 由於讀鎖沒有owner,所以不用進行有關owner的操作
}
/**
* 解鎖(針對共享鎖)
*/
public boolean unlockShared(){
// 設定tryUnlockShared的引數releases
int arg = 1;
// 判斷是否嘗試解鎖成功
if (tryUnlockShared(arg)){
// 如果嘗試解鎖成功,就需要喚醒等待佇列的頭元素head的執行緒
WaitNode head = waiters.peek();
// 校驗head是否為null,畢竟可能等待佇列為null
if (head != null){
// 喚醒等待佇列的頭元素head的執行緒
LockSupport.unpark(head.thread);
}
//TODO_FINISHED 嘗試共享鎖解鎖成功後,就應當返回true(雖然有些不大理解作用)
// 用於對應原始碼
return true;
}
//TODO_FINISHED 嘗試共享鎖解鎖失敗後,就應當返回false(雖然有些不大理解作用)
// 用於對應原始碼
return false;
}
}
這裡同樣不進行相關解釋了。因為需要的解釋,在註釋中都寫的很明確了,包括我踩的一些坑。
如果依舊有一些看不懂的地方,或者錯誤的地方,歡迎@我,或者私信我。
四,總結
技術
- CAS:通過CAS實現鎖持有數量等的原子性操作,從而完成鎖的競爭操作。
- Atomic:為了簡化操作(避免自己獲取Unsafe,offset等),通過Atomic實現CAS 操作。
- volatile:為了避免多執行緒下的可見性問題,採用了volatile的no cache特性。
- transient:可以避免對應變數序列化,原始碼中有采用。不過考慮後,並沒有使用。
- while:一方面通過while避免偽喚醒問題,另一方面,通過while推動流程(這個需要看程式碼)。
- LinkedBlockingQueue:實現執行緒等待佇列。實際的AQS是通過Node構成連結串列結構的。
- LockSupport:通過LockSupport實現執行緒的掛起,喚醒等操作。
- IllegalMonitorStateException:就是一個異常型別,仿Synchronized的,起碼看起來更明確,還不用自己實現新的Exception型別。
方案
其實,這兩個demo有兩個重要的方面。一方面是可以親自感受,一個鎖是怎麼實現的,它的方案是怎樣的。另一方面就是去思量,其中有關原子性,以及可見性的思量與設計。
你們可以嘗試改動一些東西,然後去考慮,這樣改動後,是否存線上程安全問題。這樣的考慮對自己線上程安全方面的提升是巨大的。反正我當時那一週,就不斷的改來改去。甚至有些改動,根本除錯不出來問題,然後諮詢了別人,才知道其中的一些坑。當然也有一些改動是可以的。
後言
如果有問題,可以@我,或者私信我。
如果覺得這篇文章不錯的話,請點選推薦。這對我,以及那些需要的人,很重要。
謝謝