併發程式設計學習筆記-3
前言:文中出現的示例程式碼地址為:gitee程式碼地址
8. 共享模型之工具
8.1 執行緒池
池化技術相比大家已經屢見不鮮了,執行緒池、資料庫連線池、Http 連線池等等都是對這個思想的應用。池化技術的思想主要是為了減少每次獲取資源的消耗,提高對資源的利用率。
執行緒池提供了一種限制和管理資源(包括執行一個任務)。 每個執行緒池還維護一些基本統計資訊,例如已完成任務的數量。
這裡借用《Java 併發程式設計的藝術》提到的來說一下使用執行緒池的好處:
- 降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。
- 提高響應速度。當任務到達時,任務可以不需要的等到執行緒建立就能立即執行。
- 提高執行緒的可管理性。執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。
8.1.1 自定義執行緒池
例項程式碼設計步驟:Test18.java
- 步驟1:自定義拒絕策略介面
- 步驟2:自定義任務佇列
- 步驟3:自定義執行緒池
- 步驟4:測試
8.1.2 ThreadPoolExecutor
Executor 框架結構(主要由三大部分組成
- 任務(
Runnable
/Callable
)
執行任務需要實現的 Runnable 介面 或 Callable介面。Runnable 介面或 Callable 介面 實現類都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 執行。
- 任務的執行(
Executor
)
如上圖所示,包括任務執行機制的核心介面 Executor ,以及繼承自Executor
介面的 ExecutorService 介面。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 這兩個關鍵類實現了 ExecutorService 介面。
這裡提了很多底層的類關係,但是,實際上我們需要更多關注的是 ThreadPoolExecutor 這個類,這個類在我們實際使用執行緒池的過程中,使用頻率還是非常高的。
- 非同步計算的結果(
Future
)
Future 介面以及Future
介面的實現類 FutureTask 類都可以代表非同步計算的結果。
當我們把 Runnable介面 或 Callable 介面 的實現類提交給 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 執行。(呼叫submit()
方法時會返回一個 FutureTask 物件)
4.Executor 框架的使用示意圖
- 主執行緒首先要建立實現 Runnable 或者 Callable 介面的任務物件。
- 把建立完成的實現 Runnable/Callable介面的 物件直接交給 ExecutorService 執行:
ExecutorService.execute(Runnable command)
)或者也可以把Runnable
物件或Callable
物件提交給ExecutorService
執行(ExecutorService.submit(Runnable task)
或ExecutorService.submit(Callable <T> task)
)。 - 如果執行 ExecutorService.submit(…),ExecutorService 將返回一個實現Future介面的物件(我們剛剛也提到過了執行
execute()
方法和submit()
方法的區別,submit()
會返回一個FutureTask
物件)。 - 最後,主執行緒可以執行 FutureTask.get()方法來等待任務執行完成。主執行緒也可以執行 FutureTask.cancel(boolean mayInterruptIfRunning)來取消此任務的執行
1) 執行緒池狀態
ThreadPoolExecutor 使用 int 的高 3 位來表示執行緒池狀態,低 29 位表示執行緒數量
從數字上比較(第一位是符號位),TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING 這些資訊儲存在一個原子變數 ctl 中,目的是將執行緒池狀態與執行緒個數合二為一,這樣就可以用一次 cas 原子操作 進行賦值
// c 為舊值, ctlOf 返回結果為新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 為高 3 位代表執行緒池狀態, wc 為低 29 位代表執行緒個數,ctl 是合併它們
private static int ctlOf(int rs, int wc) { return rs | wc; }
2) 構造方法
下面看一下引數最多的 一個執行緒方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){
}
- corePoolSize 核心執行緒數目 (最多保留的執行緒數)
- maximumPoolSize 最大執行緒數目(核心執行緒數加上救急執行緒數)
- keepAliveTime 救急執行緒的生存時間(核心執行緒沒有生存時間這個東西,核心執行緒會一直執行)
- unit 時間單位 - 針對救急執行緒
- workQueue 阻塞佇列
- threadFactory 執行緒工廠 - 可以為執行緒建立時起個好名字
- handler 拒絕策略
- 執行緒池中剛開始沒有執行緒,當一個任務提交給執行緒池後,執行緒池會建立一個新執行緒來執行任務。
- 當執行緒數達到 corePoolSize 並沒有執行緒空閒,這時再加入任務,新加的任務會被加入workQueue 佇列排 隊,直到有空閒的執行緒。
- 如果佇列選擇了有界佇列,那麼任務超過了佇列大小時,會建立 maximumPoolSize - corePoolSize 數目的線 程來救急。
- 如果執行緒到達 maximumPoolSize 仍然有新任務這時會執行拒絕策略。拒絕策略 jdk 提供了 下面的前4 種實現,其它著名框架也提供了實現
- ThreadPoolExecutor.AbortPolicy讓呼叫者丟擲 RejectedExecutionException 異常,這是預設策略
- ThreadPoolExecutor.CallerRunsPolicy 讓呼叫者執行任務
- ThreadPoolExecutor.DiscardPolicy 放棄本次任務
- ThreadPoolExecutor.DiscardOldestPolicy 放棄佇列中最早的任務,本任務取而代之
- Dubbo 的實現,在丟擲 RejectedExecutionException 異常之前會記錄日誌,並 dump 執行緒棧資訊,方 便定位問題
- Netty 的實現,是建立一個新執行緒來執行任務
- ActiveMQ 的實現,帶超時等待(60s)嘗試放入佇列,類似我們之前自定義的拒絕策略
- PinPoint 的實現,它使用了一個拒絕策略鏈,會逐一嘗試策略鏈中每種拒絕策略
- 當高峰過去後,超過corePoolSize 的救急執行緒如果一段時間沒有任務做,需要結束節省資源,這個時間由 keepAliveTime 和 unit 來控制。
根據這個構造方法,JDK Executors 類中提供了眾多工廠方法來建立各種用途的執行緒池
3) newFixedThreadPool
這個是Executors類提供的工廠方法來建立執行緒池!Executors 是Executor 框架的工具類! Test19.java
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
通過原始碼可以看到 new ThreadPoolExecutor(xxx)方法其實是是呼叫了之前說的完整引數的構造方法,使用了預設的執行緒工廠和拒絕策略!
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
特點
- 核心執行緒數 == 最大執行緒數(沒有救急執行緒被建立),因此也無需超時時間
- 阻塞佇列是無界的,可以放任意數量的任務
- 適用於任務量已知,相對耗時的任務
4) newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
特點
- 核心執行緒數是 0, 最大執行緒數是 Integer.MAX_VALUE,救急執行緒的空閒生存時間是 60s,意味著
- 全部都是救急執行緒(60s 後可以回收)
- 救急執行緒可以無限建立
- 佇列採用了 SynchronousQueue 實現特點是,它沒有容量,沒有執行緒來取是放不進去的(一手交錢、一手交 貨)SynchronousQueue測試程式碼 Test20.java
- 整個執行緒池表現為執行緒數會根據任務量不斷增長,沒有上限,當任務執行完畢,空閒 1分鐘後釋放線 程。 適合任務數比較密集,但每個任務執行時間較短的情況
5) newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
使用場景:
- 希望多個任務排隊執行。執行緒數固定為 1,任務數多於 1 時,會放入無界佇列排隊。任務執行完畢,這唯一的執行緒也不會被釋放。
- 區別:
- 和自己建立單執行緒執行任務的區別:自己建立一個單執行緒序列執行任務,如果任務執行失敗而終止那麼沒有任何補救措施,而執行緒池還會新建一個執行緒,保證池的正常工作
- Executors.newSingleThreadExecutor() 執行緒個數始終為1,不能修改
- FinalizableDelegatedExecutorService 應用的是裝飾器模式,只對外暴露了 ExecutorService 介面,因 此不能呼叫 ThreadPoolExecutor 中特有的方法
- 和Executors.newFixedThreadPool(1) 初始時為1時的區別:Executors.newFixedThreadPool(1) 初始時為1,以後還可以修改,對外暴露的是 ThreadPoolExecutor 物件,可以強轉後呼叫 setCorePoolSize 等方法進行修改
Executors 返回執行緒池物件的弊端如下:
- FixedThreadPool 和 SingleThreadExecutor : 允許請求的佇列長度為 Integer.MAX_VALUE,可能堆積大量的請求,從而導致 OOM。
- CachedThreadPool 和 ScheduledThreadPool : 允許建立的執行緒數量為 Integer.MAX_VALUE ,可能會建立大量執行緒,從而導致 OOM。
說白了就是:使用有界佇列,控制執行緒建立數量。
除了避免 OOM 的原因之外,不推薦使用Executors
提供的兩種快捷的執行緒池的原因還有:
- 實際使用中需要根據自己機器的效能、業務場景來手動配置執行緒池的引數比如核心執行緒數、使用的任務佇列、飽和策略等等。
- 我們應該顯示地給我們的執行緒池命名,這樣有助於我們定位問題。
6) 提交任務
Test21.java
// 執行任務
void execute(Runnable command);
// 提交任務 task,用返回值 Future 獲得任務執行結果,Future的原理就是利用我們之前講到的保護性暫停模式來接受返回結果的,主執行緒可以執行 FutureTask.get()方法來等待任務執行完成
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任務
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 提交 tasks 中所有任務,帶超時時間
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 提交 tasks 中所有任務,哪個任務先成功執行完畢,返回此任務執行結果,其它任務取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任務,哪個任務先成功執行完畢,返回此任務執行結果,其它任務取消,帶超時時間
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
7) 關閉執行緒池
Test22.java
shutdown
/*
執行緒池狀態變為 SHUTDOWN
- 不會接收新任務
- 但已提交任務會執行完,包括等待佇列裡面的
- 此方法不會阻塞呼叫執行緒的執行
*/
void shutdown();
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改執行緒池狀態
advanceRunState(SHUTDOWN);
// 僅會打斷空閒執行緒
interruptIdleWorkers();
onShutdown(); // 擴充套件點 ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 嘗試終結(沒有執行的執行緒可以立刻終結)
tryTerminate();
}
shutdownNow
/*
執行緒池狀態變為 STOP
- 不會接收新任務
- 會將佇列中的任務返回
- 並用 interrupt 的方式中斷正在執行的任務
*/
List<Runnable> shutdownNow();
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改執行緒池狀態
advanceRunState(STOP);
// 打斷所有執行緒
interruptWorkers();
// 獲取佇列中剩餘任務
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 嘗試終結
tryTerminate();
return tasks;
}
其它方法
// 不在 RUNNING 狀態的執行緒池,此方法就返回 true
boolean isShutdown();
// 執行緒池狀態是否是 TERMINATED
boolean isTerminated();
// 呼叫 shutdown 後,由於呼叫使執行緒結束執行緒的方法是非同步的並不會等待所有任務執行結束就返回,因此如果它想線上程池 TERMINATED 後做些其它事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
非同步模式之工作執行緒
1.定義
讓有限的工作執行緒(Worker Thread)來輪流非同步處理無限多的任務。也可以將其歸類為分工模式,它的典型實現就是執行緒池,也體現了經典設計模式中的享元模式。
例如,海底撈的服務員(執行緒),輪流處理每位客人的點餐(任務),如果為每位客人都配一名專屬的服務員,那 麼成本就太高了(對比另一種多執行緒設計模式:Thread-Per-Message) 注意,不同任務型別應該使用不同的執行緒池,這樣能夠避免飢餓,並能提升效率 例如,如果一個餐館的工人既要招呼客人(任務型別A),又要到後廚做菜(任務型別B)顯然效率不咋地,分成 服務員(執行緒池A)與廚師(執行緒池B)更為合理,當然你能想到更細緻的分工
2.飢餓 固定大小執行緒池會有飢餓現象 Test23.java
- 兩個工人是同一個執行緒池中的兩個執行緒
- 他們要做的事情是:為客人點餐和到後廚做菜,這是兩個階段的工作
- 客人點餐:必須先點完餐,等菜做好,上菜,在此期間處理點餐的工人必須等待
- 後廚做菜:沒啥說的,做就是了
- 比如工人A 處理了點餐任務,接下來它要等著 工人B 把菜做好,然後上菜,他倆也配合的蠻好 但現在同時來了兩個客人,這個時候工人A 和工人B 都去處理點餐了,這時沒人做飯了,飢餓
解決方法可以增加執行緒池的大小,不過不是根本解決方案,還是前面提到的,不同的任務型別,採用不同的執行緒池,例如:Test24.java
-
建立多大的執行緒池合適?
過小會導致程式不能充分地利用系統資源、容易導致飢餓,過大會導致更多的執行緒上下文切換,佔用更多記憶體
- CPU 密集型運算 通常採用 cpu 核數 + 1 能夠實現最優的 CPU 利用率,+1 是保證當執行緒由於頁缺失故障(作業系統)或其它原因導致暫停時,額外的這個執行緒就能頂上去,保證 CPU 時鐘週期不被浪費
- I/O 密集型運算 CPU 不總是處於繁忙狀態,例如,當你執行業務計算時,這時候會使用 CPU 資源,但當你執行 I/O 操作時、遠端RPC 呼叫時,包括進行資料庫操作時,這時候 CPU 就閒下來了,你可以利用多執行緒提高它的利用率。
- 經驗公式如下:執行緒數 = 核數 * 期望 CPU 利用率 * 總時間(CPU計算時間+等待時間) / CPU 計算時間 例如 4 核 CPU 計算時間是 50% ,其它等待時間是 50%,期望 cpu 被 100% 利用,套用公式 4 * 100% * 100% / 50% = 8 例如 4 核 CPU 計算時間是 10% ,其它等待時間是 90%,期望 cpu 被 100% 利用,套用公式 4 * 100% * 100% / 10% = 40
8) 任務排程執行緒池
在『任務排程執行緒池』功能加入之前,可以使用 java.util.Timer 來實現定時功能,Timer 的優點在於簡單易用,但 由於所有任務都是由同一個執行緒來排程,因此所有任務都是序列執行的,同一時間只能有一個任務在執行,前一個 任務的延遲或異常都將會影響到之後的任務。Test25.java
使用 ScheduledExecutorService 改寫:Test26.java
- 整個執行緒池表現為:執行緒數固定,任務數多於執行緒數時,會放入無界佇列排隊。任務執行完畢,這些線 程也不會被釋放。用來執行延遲或反覆執行的任務。
- ScheduledExecutorService 中scheduleAtFixedRate方法的使用 Test27.java
- ScheduledExecutorService 中scheduleWithFixedDelay方法的使用 Test27.java
9) 正確處理執行任務異常
可以發現,如果執行緒池中的執行緒執行任務時,如果任務丟擲了異常,預設是中斷執行該任務而不是丟擲異常或者列印異常資訊。
方法1:主動捉異常
ExecutorService pool = Executors.newFixedThreadPool(1);
pool.submit(() -> {
try {
log.debug("task1");
int i = 1 / 0;
} catch (Exception e) {
log.error("error:", e);
}
});
方法2:使用 Future,錯誤資訊都被封裝進submit方法的返回方法中!
ExecutorService pool = Executors.newFixedThreadPool(1);
Future<Boolean> f = pool.submit(() -> {
log.debug("task1");
int i = 1 / 0;
return true;
});
log.debug("result:{}", f.get());
10) Tomcat 執行緒池
Tomcat 在哪裡用到了執行緒池呢
- LimitLatch 用來限流,可以控制最大連線個數,類似 J.U.C 中的 Semaphore 後面再講
- Acceptor 只負責【接收新的 socket 連線】
- Poller 只負責監聽 socket channel 是否有【可讀的 I/O 事件】
- 一旦可讀,封裝一個任務物件(socketProcessor),提交給 Executor 執行緒池處理
- Executor 執行緒池中的工作執行緒最終負責【處理請求】
Tomcat 執行緒池擴充套件了 ThreadPoolExecutor,行為稍有不同
- 如果匯流排程數達到 maximumPoolSize,這時不會立刻拋 RejectedExecutionException 異常,而是再次嘗試將任務放入佇列,如果還失敗,才丟擲 RejectedExecutionException 異常
原始碼 tomcat-7.0.42
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
// 使任務從新進入阻塞佇列
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
Thread.interrupted();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
TaskQueue.java
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if ( parent.isShutdown() )
throw new RejectedExecutionException(
"Executor not running, can't force a command into the queue"
);
return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task
is rejected
}
Connector 配置
Executor 執行緒配置
守護執行緒的意思就是執行緒會隨著主執行緒的結束而結束
下面的這張圖好像有點錯誤,提交任務<核心執行緒的時候應該直接交給核心執行緒執行。
8.1.3 Fork/Join
- 概念
- Fork/Join 是 JDK 1.7 加入的新的執行緒池實現,它體現的是一種分治思想,適用於能夠進行任務拆分的 cpu 密集型運算
- 所謂的任務拆分,是將一個大任務拆分為演算法上相同的小任務,直至不能拆分可以直接求解。跟遞迴相關的一些計算,如歸併排序、斐波那契數列、都可以用分治思想進行求解
- Fork/Join 在分治的基礎上加入了多執行緒,可以把每個任務的分解和合並交給不同的執行緒來完成,進一步提升了運算效率
- Fork/Join 預設會建立與 cpu 核心數大小相同的執行緒池
- 使用 提交給 Fork/Join 執行緒池的任務需要繼承 RecursiveTask(有返回值)或 RecursiveAction(沒有返回值),例如下 面定義了一個對 1~n 之間的整數求和的任務 Test28.java
改進Test29.java Test29的演算法邏輯圖
8.2 J.U.C
8.2.1 AQS 原理
- 概述:全稱是 AbstractQueuedSynchronizer,是阻塞式鎖和相關的同步器工具的框架
- 特點:
- 用 state 屬性來表示資源的狀態(分獨佔模式和共享模式),子類需要定義如何維護這個狀態,控制如何獲取鎖和釋放鎖
- getState - 獲取 state 狀態
- setState - 設定 state 狀態
- compareAndSetState - cas 機制設定 state 狀態
- 獨佔模式是隻有一個執行緒能夠訪問資源,而共享模式可以允許多個執行緒訪問資源
- 提供了基於 FIFO 的等待佇列,類似於 Monitor 的 EntryList
- 條件變數來實現等待、喚醒機制,支援多個條件變數,類似於 Monitor 的 WaitSet
- 用 state 屬性來表示資源的狀態(分獨佔模式和共享模式),子類需要定義如何維護這個狀態,控制如何獲取鎖和釋放鎖
子類主要實現這樣一些方法(預設丟擲 UnsupportedOperationException)
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
//獲取鎖的姿勢
// 如果獲取鎖失敗
if (!tryAcquire(arg)) {
// 入隊, 可以選擇阻塞當前執行緒 park unpark
}
//釋放鎖的姿勢
// 如果釋放鎖成功
if (tryRelease(arg)) {
// 讓阻塞執行緒恢復執行
}
下面實現一個不可重入的阻塞式鎖:使用AbstractQueuedSynchronizer自定義一個同步器來實現自定義鎖! Test30.java
8.2.2 ReentrantLock 原理
可以看到ReentrantLock提供了兩個同步器,實現公平鎖和非公平鎖,預設是非公平鎖!
1. 非公平鎖實現原理
圖解流程
加鎖解鎖流程,先從構造器開始看,預設為非公平鎖實現
public ReentrantLock() {
sync = new NonfairSync();
}
NonfairSync 繼承自 AQS
沒有競爭時
第一個競爭出現時,檢視原始碼的NonfairSync的lock方法
Thread-1 執行了
- lock方法中CAS 嘗試將 state 由 0 改為 1,結果失敗
- lock方法中進一步呼叫acquire方法,進入 tryAcquire 邏輯,這裡我們認為這時 state 已經是1,結果仍然失敗
- 接下來進入 acquire方法的addWaiter 邏輯,構造 Node 佇列
- 圖中黃色三角表示該 Node 的 waitStatus 狀態,其中 0 為預設正常狀態
- Node 的建立是懶惰的
- 其中第一個 Node 稱為 Dummy(啞元)或哨兵,用來佔位,並不關聯執行緒
當前執行緒進入 acquire方法的 acquireQueued 邏輯
-
acquireQueued 會在一個死迴圈中不斷嘗試獲得鎖,失敗後進入 park 阻塞
-
如果自己是緊鄰著 head(排第二位),那麼再次 tryAcquire 嘗試獲取鎖,我們這裡設定這時 state 仍為 1,失敗
-
進入 shouldParkAfterFailedAcquire 邏輯,將前驅 node,即 head 的 waitStatus 改為 -1,這次返回 false
-
shouldParkAfterFailedAcquire 執行完畢回到 acquireQueued ,再次 tryAcquire 嘗試獲取鎖,當然這時 state 仍為 1,失敗
-
當再次進入 shouldParkAfterFailedAcquire 時,這時因為其前驅 node 的 waitStatus 已經是 -1,這次返回 true
-
進入 parkAndCheckInterrupt, Thread-1 park(灰色表示已經阻塞)
再次有多個執行緒經歷上述過程競爭失敗,變成這個樣子
Thread-0 呼叫unlock方法裡的release方法釋放鎖,進入tryRelease(使用ctrl+alt+b檢視tryRelease方法的具體ReentrantLock
實現) 流程,如果成功,設定 exclusiveOwnerThread 為 null,state = 0
unlock方法裡的release方法方法中,如果當前佇列不為 null,並且 head 的 waitStatus = -1,進入 unparkSuccessor 流程: unparkSuccessor中會找到佇列中離 head 最近的一個 Node(沒取消的),unpark 恢復其執行,本例中即為 Thread-1 回到 Thread-1 的 acquireQueued 流程
如果加鎖成功(沒有競爭),會設定 (acquireQueued 方法中)
- exclusiveOwnerThread 為 Thread-1,state = 1
- head 指向剛剛 Thread-1 所在的 Node,該 Node 清空 Thread
- 原本的 head 因為從連結串列斷開,而可被垃圾回收
如果這時候有其它執行緒來競爭(非公平的體現),例如這時有 Thread-4 來了
如果不巧又被 Thread-4 佔了先
- Thread-4 被設定為 exclusiveOwnerThread,state = 1
- Thread-1 再次進入 acquireQueued 流程,獲取鎖失敗,重新進入 park 阻塞
加鎖原始碼
// Sync 繼承自 AQS
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 加鎖實現
final void lock() {
// 首先用 cas 嘗試(僅嘗試一次)將 state 從 0 改為 1, 如果成功表示獲得了獨佔鎖
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 如果嘗試失敗,進入 ㈠
acquire(1);
}
// ㈠ AQS 繼承過來的方法, 方便閱讀, 放在此處
public final void acquire(int arg) {
// ㈡ tryAcquire
if (
!tryAcquire(arg) &&
// 當 tryAcquire 返回為 false 時, 先呼叫 addWaiter ㈣, 接著 acquireQueued ㈤
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}
// ㈡ 進入 ㈢
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// ㈢ Sync 繼承過來的方法, 方便閱讀, 放在此處
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果還沒有獲得鎖
if (c == 0) {
// 嘗試用 cas 獲得, 這裡體現了非公平性: 不去檢查 AQS 佇列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已經獲得了鎖, 執行緒還是當前執行緒, 表示發生了鎖重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 獲取失敗, 回到呼叫處
return false;
}
// ㈣ AQS 繼承過來的方法, 方便閱讀, 放在此處
private Node addWaiter(Node mode) {
// 將當前執行緒關聯到一個 Node 物件上, 模式為獨佔模式,新建的Node的waitstatus預設為0,因為waitstatus是成員變數,預設被初始化為0
Node node = new Node(Thread.currentThread(), mode);
// 如果 tail 不為 null, cas 嘗試將 Node 物件加入 AQS 佇列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 雙向連結串列
pred.next = node;
return node;
}
}
//如果tail為null,嘗試將 Node 加入 AQS, 進入 ㈥
enq(node);
return node;
}
// ㈥ AQS 繼承過來的方法, 方便閱讀, 放在此處
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 還沒有, 設定 head 為哨兵節點(不對應執行緒,狀態為 0)
if (compareAndSetHead(new Node())) {
tail = head;
}
} else {
// cas 嘗試將 Node 物件加入 AQS 佇列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// ㈤ AQS 繼承過來的方法, 方便閱讀, 放在此處
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 上一個節點是 head, 表示輪到自己(當前執行緒對應的 node)了, 嘗試獲取
if (p == head && tryAcquire(arg)) {
// 獲取成功, 設定自己(當前執行緒對應的 node)為 head
setHead(node);
// 上一個節點 help GC
p.next = null;
failed = false;
// 返回中斷標記 false
return interrupted;
}
if (
// 判斷是否應當 park, 進入 ㈦
shouldParkAfterFailedAcquire(p, node) &&
// park 等待, 此時 Node 的狀態被置為 Node.SIGNAL ㈧
parkAndCheckInterrupt()
) {
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// ㈦ AQS 繼承過來的方法, 方便閱讀, 放在此處
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取上一個節點的狀態
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) {
// 上一個節點都在阻塞, 那麼自己也阻塞好了
return true;
}
// > 0 表示取消狀態
if (ws > 0) {
// 上一個節點取消, 那麼重構刪除前面所有取消的節點, 返回到外層迴圈重試
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 這次還沒有阻塞
// 但下次如果重試不成功, 則需要阻塞,這時需要設定上一個節點狀態為 Node.SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// ㈧ 阻塞當前執行緒
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
}
解鎖原始碼
// Sync 繼承自 AQS
static final class NonfairSync extends Sync {
// 解鎖實現
public void unlock() {
sync.release(1);
}
// AQS 繼承過來的方法, 方便閱讀, 放在此處
public final boolean release(int arg) {
// 嘗試釋放鎖, 進入 ㈠
if (tryRelease(arg)) {
// 佇列頭節點 unpark
Node h = head;
if (
// 佇列不為 null
h != null &&
// waitStatus == Node.SIGNAL 才需要 unpark
h.waitStatus != 0
) {
// unpark AQS 中等待的執行緒, 進入 ㈡
unparkSuccessor(h);
}
return true;
}
return false;
}
// ㈠ Sync 繼承過來的方法, 方便閱讀, 放在此處
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支援鎖重入, 只有 state 減為 0, 才釋放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// ㈡ AQS 繼承過來的方法, 方便閱讀, 放在此處
private void unparkSuccessor(Node node) {
// 如果狀態為 Node.SIGNAL 嘗試重置狀態為 0, 如果執行緒獲取到了鎖那麼後來頭結點會被拋棄掉
// 不成功也可以
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
// 找到需要 unpark 的節點, 但本節點從 AQS 佇列中脫離, 是由喚醒節點完成的
Node s = node.next;
// 不考慮已取消的節點, 從 AQS 佇列從後至前找到佇列最前面需要 unpark 的節點
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
}
2. 可重入原理
static final class NonfairSync extends Sync {
// ...
// Sync 繼承過來的方法, 方便閱讀, 放在此處
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已經獲得了鎖, 執行緒還是當前執行緒, 表示發生了鎖重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// Sync 繼承過來的方法, 方便閱讀, 放在此處
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支援鎖重入, 只有 state 減為 0, 才釋放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
3. 可打斷原理
不可打斷模式:在此模式下,即使它被打斷,仍會駐留在 AQS 佇列中,一直要等到獲得鎖後方能得知自己被打斷了
// Sync 繼承自 AQS
static final class NonfairSync extends Sync {
// ...
private final boolean parkAndCheckInterrupt() {
// 如果打斷標記已經是 true, 則 park 會失效
LockSupport.park(this);
// interrupted 會清除打斷標記
return Thread.interrupted();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
// 還是需要獲得鎖後, 才能返回打斷狀態
return interrupted;
}
if (
shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()
) {
// 如果是因為 interrupt 被喚醒, 返回打斷狀態為 true
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public final void acquire(int arg) {
if (
!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
// 如果打斷狀態為 true
selfInterrupt();
}
}
static void selfInterrupt() {
// 重新產生一次中斷,這時候執行緒是如果正常執行的狀態,那麼不是出於sleep等狀態,interrupt方法就不會報錯
Thread.currentThread().interrupt();
}
}
}
可打斷模式
static final class NonfairSync extends Sync {
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 如果沒有獲得到鎖, 進入 ㈠
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// ㈠ 可打斷的獲取鎖流程
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
// 在 park 過程中如果被 interrupt 會進入此
// 這時候丟擲異常, 而不會再次進入 for (;;)
throw new InterruptedException();
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
4. 公平鎖實現原理
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
// AQS 繼承過來的方法, 方便閱讀, 放在此處
public final void acquire(int arg) {
if (
!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}
// 與非公平鎖主要區別在於 tryAcquire 方法的實現
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先檢查 AQS 佇列中是否有前驅節點, 沒有才去競爭
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// ㈠ AQS 繼承過來的方法, 方便閱讀, 放在此處
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// h != t 時表示佇列中有 Node
return h != t &&
(
// (s = h.next) == null 表示佇列中還有沒有老二
(s = h.next) == null || // 或者佇列中老二執行緒不是此執行緒
s.thread != Thread.currentThread()
);
}
}
5. 條件變數實現原理
圖解流程
每個條件變數其實就對應著一個等待佇列,其實現類是 ConditionObject
await 流程 開始 Thread-0 持有鎖,呼叫 await,進入 ConditionObject 的 addConditionWaiter 流程 建立新的 Node 狀態為 -2(Node.CONDITION),關聯 Thread-0,加入等待佇列尾部
接下來進入 AQS 的 fullyRelease 流程,釋放同步器上的鎖
unpark AQS 佇列中的下一個節點,競爭鎖,假設沒有其他競爭執行緒,那麼 Thread-1 競爭成功
park 阻塞 Thread-0
signal 流程
假設 Thread-1 要來喚醒 Thread-0
進入 ConditionObject 的 doSignal 流程,取得等待佇列中第一個 Node,即 Thread-0 所在 Node
執行 transferForSignal 流程,將該 Node 加入 AQS 佇列尾部,將 Thread-0 的 waitStatus 改為 0,Thread-3 的waitStatus 改為 -1
Thread-1 釋放鎖,進入 unlock 流程,略
原始碼分析
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// 第一個等待節點
private transient Node firstWaiter;
// 最後一個等待節點
private transient Node lastWaiter;
public ConditionObject() { }
// ㈠ 新增一個 Node 至等待佇列
private Node addConditionWaiter() {
Node t = lastWaiter;
// 所有已取消的 Node 從佇列連結串列刪除, 見 ㈡
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 建立一個關聯當前執行緒的新 Node, 新增至佇列尾部
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 喚醒 - 將沒取消的第一個節點轉移至 AQS 佇列
private void doSignal(Node first) {
do {
// 已經是尾節點了
if ( (firstWaiter = first.nextWaiter) == null) {
lastWaiter = null;
}
first.nextWaiter = null;
} while (
// 將等待佇列中的 Node 轉移至 AQS 佇列, 不成功且還有節點則繼續迴圈 ㈢
!transferForSignal(first) &&
// 佇列還有節點
(first = firstWaiter) != null
);
}
// 外部類方法, 方便閱讀, 放在此處
// ㈢ 如果節點狀態是取消, 返回 false 表示轉移失敗, 否則轉移成功
final boolean transferForSignal(Node node) {
// 設定當前node狀態為0(因為處在佇列末尾),如果狀態已經不是 Node.CONDITION, 說明被取消了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 加入 AQS 佇列尾部
Node p = enq(node);
int ws = p.waitStatus;
if (
// 插入節點的上一個節點被取消
ws > 0 ||
// 插入節點的上一個節點不能設定狀態為 Node.SIGNAL
!compareAndSetWaitStatus(p, ws, Node.SIGNAL)
) {
// unpark 取消阻塞, 讓執行緒重新同步狀態
LockSupport.unpark(node.thread);
}
return true;
}
// 全部喚醒 - 等待佇列的所有節點轉移至 AQS 佇列
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
// ㈡
private void unlinkCancelledWaiters() {
// ...
}
// 喚醒 - 必須持有鎖才能喚醒, 因此 doSignal 內無需考慮加鎖
public final void signal() {
// 如果沒有持有鎖,會丟擲異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 全部喚醒 - 必須持有鎖才能喚醒, 因此 doSignalAll 內無需考慮加鎖
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 不可打斷等待 - 直到被喚醒
public final void awaitUninterruptibly() {
// 新增一個 Node 至等待佇列, 見 ㈠
Node node = addConditionWaiter();
// 釋放節點持有的鎖, 見 ㈣
int savedState = fullyRelease(node);
boolean interrupted = false;
// 如果該節點還沒有轉移至 AQS 佇列, 阻塞
while (!isOnSyncQueue(node)) {
// park 阻塞
LockSupport.park(this);
// 如果被打斷, 僅設定打斷狀態
if (Thread.interrupted())
interrupted = true;
}
// 喚醒後, 嘗試競爭鎖, 如果失敗進入 AQS 佇列
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
// 外部類方法, 方便閱讀, 放在此處
// ㈣ 因為某執行緒可能重入,需要將 state 全部釋放,獲取state,然後把它全部減掉,以全部釋放
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
// 喚醒等待佇列佇列中的下一個節點
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// 打斷模式 - 在退出等待時重新設定打斷狀態
private static final int REINTERRUPT = 1;
// 打斷模式 - 在退出等待時丟擲異常
private static final int THROW_IE = -1;
// 判斷打斷模式
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// ㈤ 應用打斷模式
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
// 等待 - 直到被喚醒或打斷
public final void await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 新增一個 Node 至等待佇列, 見 ㈠
Node node = addConditionWaiter();
// 釋放節點持有的鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果該節點還沒有轉移至 AQS 佇列, 阻塞
while (!isOnSyncQueue(node)) {
// park 阻塞
LockSupport.park(this);
// 如果被打斷, 退出等待佇列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 退出等待佇列後, 還需要獲得 AQS 佇列的鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 所有已取消的 Node 從佇列連結串列刪除, 見 ㈡
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 應用打斷模式, 見 ㈤
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 等待 - 直到被喚醒或打斷或超時
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 新增一個 Node 至等待佇列, 見 ㈠
Node node = addConditionWaiter();
// 釋放節點持有的鎖
int savedState = fullyRelease(node);
// 獲得最後期限
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
// 如果該節點還沒有轉移至 AQS 佇列, 阻塞
while (!isOnSyncQueue(node)) {
// 已超時, 退出等待佇列
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
// park 阻塞一定時間, spinForTimeoutThreshold 為 1000 ns
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 如果被打斷, 退出等待佇列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
// 退出等待佇列後, 還需要獲得 AQS 佇列的鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 所有已取消的 Node 從佇列連結串列刪除, 見 ㈡
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 應用打斷模式, 見 ㈤
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
// 等待 - 直到被喚醒或打斷或超時, 邏輯類似於 awaitNanos
public final boolean awaitUntil(Date deadline) throws InterruptedException {
// ...
}
// 等待 - 直到被喚醒或打斷或超時, 邏輯類似於 awaitNanos
public final boolean await(long time, TimeUnit unit) throws InterruptedException {
// ...
}
// 工具方法 省略 ...
}
8.2.3 讀寫鎖
1. ReentrantReadWriteLock
當讀操作遠遠高於寫操作時,這時候使用讀寫鎖讓讀-讀可以併發,提高效能。讀-寫,寫-寫都是相互互斥的!
提供一個數據容器類內部分別使用讀鎖保護資料的read()方法,寫鎖保護資料的write()方法 Test33.java
注意事項
-
讀鎖不支援條件變數
-
重入時升級不支援:即持有讀鎖的情況下去獲取寫鎖,會導致獲取寫鎖永久等待
-
r.lock(); try { // ... w.lock(); try { // ... } finally{ w.unlock(); } } finally{ r.unlock(); }
-
- 重入時降級支援:即持有寫鎖的情況下去獲取讀鎖
class CachedData {
Object data;
// 是否有效,如果失效,需要重新計算 data
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// 獲取寫鎖前必須釋放讀鎖
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// 判斷是否有其它執行緒已經獲取了寫鎖、更新了快取, 避免重複更新
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 降級為讀鎖, 釋放寫鎖, 這樣能夠讓其它執行緒讀取快取
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock();
}
}
// 自己用完資料, 釋放讀鎖
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
2. 應用之快取
- 快取更新策略
更新時,是先清快取還是先更新資料庫?先清快取
先更新資料庫
補充一種情況,假設查詢執行緒 A 查詢資料時恰好快取資料由於時間到期失效,或是第一次查詢:這種情況的出現機率非常小
- 讀寫鎖實現一致性快取
程式碼例項:使用讀寫鎖實現一個簡單的按需載入快取 Test35.java
3. 讀寫鎖原理
圖解流程
讀寫鎖用的是同一個 Sycn 同步器,因此等待佇列、state 等也是同一個 下面執行:t1 w.lock,t2 r.lock
1) t1 成功上鎖,流程與 ReentrantLock 加鎖相比沒有特殊之處,不同是寫鎖狀態佔了 state 的低 16 位,而讀鎖 使用的是 state 的高 16 位
2)t2 執行 r.lock,這時進入讀鎖的 sync.acquireShared(1) 流程,首先會進入 tryAcquireShared 流程。如果有寫 鎖佔據,那麼 tryAcquireShared 返回 -1 表示失敗
tryAcquireShared 返回值表示
- -1 表示失敗
- 0 表示成功,但後繼節點不會繼續喚醒
- 正數表示成功,而且數值是還有幾個後繼節點需要喚醒,我們這裡的讀寫鎖返回 1
3)這時會進入 sync.doAcquireShared(1) 流程,首先也是呼叫 addWaiter 新增節點,不同之處在於節點被設定為 Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此時 t2 仍處於活躍狀態
4)t2 會看看自己的節點是不是老二,如果是,還會再次呼叫 tryAcquireShared(1) 來嘗試獲取鎖
5)如果沒有成功,在 doAcquireShared 內 for (;;) 迴圈一次,把前驅節點的 waitStatus 改為 -1,再 for (;;) 迴圈一 次嘗試 tryAcquireShared(1) 如果還不成功,那麼在 parkAndCheckInterrupt() 處 park
又繼續執行:t3 r.lock,t4 w.lock這種狀態下,假設又有 t3 加讀鎖和 t4 加寫鎖,這期間 t1 仍然持有鎖,就變成了下面的樣子
繼續執行t1 w.unlock這時會走到寫鎖的 sync.release(1) 流程,呼叫 sync.tryRelease(1) 成功,變成下面的樣子
接下來執行喚醒流程 sync.unparkSuccessor,即讓老二恢復執行,這時 t2 在 doAcquireShared 內 parkAndCheckInterrupt() 處恢復執行,圖中的t2從黑色變成了藍色(注意這裡只是恢復執行而已,並沒有獲取到鎖!) 這回再來一次 for (;;) 執行 tryAcquireShared 成功則讓讀鎖計數加一
這時 t2 已經恢復執行,接下來 t2 呼叫 setHeadAndPropagate(node, 1),它原本所在節點被置為頭節點
事情還沒完,在 setHeadAndPropagate 方法內還會檢查下一個節點是否是 shared,如果是則呼叫 doReleaseShared() 將 head 的狀態從 -1 改為 0 並喚醒老二,這時 t3 在 doAcquireShared 內 parkAndCheckInterrupt() 處恢復執行
這回再來一次 for (;;) 執行 tryAcquireShared 成功則讓讀鎖計數加一
這時 t3 已經恢復執行,接下來 t3 呼叫 setHeadAndPropagate(node, 1),它原本所在節點被置為頭節點
下一個節點不是 shared 了,因此不會繼續喚醒 t4 所在節點
再繼續執行t2 r.unlock,t3 r.unlockt2 進入 sync.releaseShared(1) 中,呼叫 tryReleaseShared(1) 讓計數減一,但由於計數還不為零
t3 進入 sync.releaseShared(1) 中,呼叫 tryReleaseShared(1) 讓計數減一,這回計數為零了,進入 doReleaseShared() 將頭節點從 -1 改為 0 並喚醒老二,即
之後 t4 在 acquireQueued 中 parkAndCheckInterrupt 處恢復執行,再次 for (;;) 這次自己是老二,並且沒有其他 競爭,tryAcquire(1) 成功,修改頭結點,流程結束
原始碼分析
寫鎖上鎖流程
static final class NonfairSync extends Sync {
// ... 省略無關程式碼
// 外部類 WriteLock 方法, 方便閱讀, 放在此處
public void lock() {
sync.acquire(1);
}
// AQS 繼承過來的方法, 方便閱讀, 放在此處
public final void acquire(int arg) {
if (
// 嘗試獲得寫鎖失敗
!tryAcquire(arg) &&
// 將當前執行緒關聯到一個 Node 物件上, 模式為獨佔模式
// 進入 AQS 佇列阻塞
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}
// Sync 繼承過來的方法, 方便閱讀, 放在此處
protected final boolean tryAcquire(int acquires) {
// 獲得低 16 位, 代表寫鎖的 state 計數
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (
// c != 0 and w == 0 表示有讀鎖返回錯誤,讀鎖不支援鎖升級, 或者
w == 0 ||
// c != 0 and w == 0 表示有寫,如果 exclusiveOwnerThread 不是自己
current != getExclusiveOwnerThread()
) {
// 獲得鎖失敗
return false;
}
// 寫鎖計數超過低 16 位, 報異常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 寫鎖重入, 獲得鎖成功
setState(c + acquires);
return true;
}
if (
// 判斷寫鎖是否該阻塞這裡返回false, 或者
writerShouldBlock() ||
// 嘗試更改計數失敗
!compareAndSetState(c, c + acquires)
) {
// 獲得鎖失敗
return false;
}
// 獲得鎖成功
setExclusiveOwnerThread(current);
return true;
}
// 非公平鎖 writerShouldBlock 總是返回 false, 無需阻塞
final boolean writerShouldBlock() {
return false;
}
}
寫鎖釋放流程
static final class NonfairSync extends Sync {
// ... 省略無關程式碼
// WriteLock 方法, 方便閱讀, 放在此處
public void unlock() {
sync.release(1);
}
// AQS 繼承過來的方法, 方便閱讀, 放在此處
public final boolean release(int arg) {
// 嘗試釋放寫鎖成功
if (tryRelease(arg)) {
// unpark AQS 中等待的執行緒
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// Sync 繼承過來的方法, 方便閱讀, 放在此處
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// 因為可重入的原因, 寫鎖計數為 0, 才算釋放成功
boolean free = exclusiveCount(nextc) == 0;
if (free) {
setExclusiveOwnerThread(null);
}
setState(nextc);
return free;
}
}
讀鎖上鎖流程
static final class NonfairSync extends Sync {
// ReadLock 方法, 方便閱讀, 放在此處
public void lock() {
sync.acquireShared(1);
}
// AQS 繼承過來的方法, 方便閱讀, 放在此處
public final void acquireShared(int arg) {
// tryAcquireShared 返回負數, 表示獲取讀鎖失敗
if (tryAcquireShared(arg) < 0) {
doAcquireShared(arg);
}
}
// Sync 繼承過來的方法, 方便閱讀, 放在此處
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 如果是其它執行緒持有寫鎖, 獲取讀鎖失敗
if (
exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current
) {
return -1;
}
int r = sharedCount(c);
if (
// 讀鎖不該阻塞(如果老二是寫鎖,讀鎖該阻塞), 並且
!readerShouldBlock() &&
// 小於讀鎖計數, 並且
r < MAX_COUNT &&
// 嘗試增加計數成功
compareAndSetState(c, c + SHARED_UNIT)
) {
// ... 省略不重要的程式碼
return 1;
}
return fullTryAcquireShared(current);
}
// 非公平鎖 readerShouldBlock 看 AQS 佇列中第一個節點是否是寫鎖
// true 則該阻塞, false 則不阻塞
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
// AQS 繼承過來的方法, 方便閱讀, 放在此處
// 與 tryAcquireShared 功能類似, 但會不斷嘗試 for (;;) 獲取讀鎖, 執行過程中無阻塞
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
// ... 省略不重要的程式碼
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
// ... 省略不重要的程式碼
return 1;
}
}
}
// AQS 繼承過來的方法, 方便閱讀, 放在此處
private void doAcquireShared(int arg) {
// 將當前執行緒關聯到一個 Node 物件上, 模式為共享模式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 再一次嘗試獲取讀鎖
int r = tryAcquireShared(arg);
// 成功
if (r >= 0) {
// ㈠
// r 表示可用資源數, 在這裡總是 1 允許傳播
//(喚醒 AQS 中下一個 Share 節點)
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (
// 是否在獲取讀鎖失敗時阻塞(前一個階段 waitStatus == Node.SIGNAL)
shouldParkAfterFailedAcquire(p, node) &&
// park 當前執行緒
parkAndCheckInterrupt()
) {
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// ㈠ AQS 繼承過來的方法, 方便閱讀, 放在此處
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 設定自己為 head
setHead(node);
// propagate 表示有共享資源(例如共享讀鎖或訊號量)
// 原 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
// 現在 head waitStatus == Node.SIGNAL 或 Node.PROPAGATE
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果是最後一個節點或者是等待共享讀鎖的節點
if (s == null || s.isShared()) {
// 進入 ㈡
doReleaseShared();
}
}
}
// ㈡ AQS 繼承過來的方法, 方便閱讀, 放在此處
private void doReleaseShared() {
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一個節點 unpark
// 如果 head.waitStatus == 0 ==> Node.PROPAGATE, 為了解決 bug, 見後面分析,參考這裡:http://www.tianxiaobo.com/2018/05/01/AbstractQueuedSynchronizer-%E5%8E%9F%E7%90%86%E5%88%86%E6%9E%90-%E7%8B%AC%E5%8D%A0-%E5%85%B1%E4%BA%AB%E6%A8%A1%E5%BC%8F/#5propagate-%E7%8A%B6%E6%80%81%E5%AD%98%E5%9C%A8%E7%9A%84%E6%84%8F%E4%B9%89
for (;;) {
Node h = head;
// 佇列還有節點
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 下一個節點 unpark 如果成功獲取讀鎖
// 並且下下個節點還是 shared, 繼續 doReleaseShared
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
}
讀鎖釋放流程
static final class NonfairSync extends Sync {
// ReadLock 方法, 方便閱讀, 放在此處
public void unlock() {
sync.releaseShared(1);
}
// AQS 繼承過來的方法, 方便閱讀, 放在此處
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// Sync 繼承過來的方法, 方便閱讀, 放在此處
protected final boolean tryReleaseShared(int unused) {
// ... 省略不重要的程式碼
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) {
// 讀鎖的計數不會影響其它獲取讀鎖執行緒, 但會影響其它獲取寫鎖執行緒
// 計數為 0 才是真正釋放
return nextc == 0;
}
}
}
// AQS 繼承過來的方法, 方便閱讀, 放在此處
private void doReleaseShared() {
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一個節點 unpark
// 如果 head.waitStatus == 0 ==> Node.PROPAGATE
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果有其它執行緒也在釋放讀鎖,那麼需要將 waitStatus 先改為 0
// 防止 unparkSuccessor 被多次執行
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 如果已經是 0 了,改為 -3,用來解決傳播性,見後文訊號量 bug 分析
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
}
4. StampedLock
該類自 JDK 8 加入,是為了進一步優化讀效能,它的特點是在使用讀鎖、寫鎖時都必須配合【戳】使用 加解讀鎖
long stamp = lock.readLock();
lock.unlockRead(stamp);
加解寫鎖
long stamp = lock.writeLock();
lock.unlockWrite(stamp);
樂觀讀,StampedLock 支援 tryOptimisticRead() 方法(樂觀讀),讀取完畢後需要做一次 戳校驗 如果校驗通 過,表示這期間確實沒有寫操作,資料可以安全使用,如果校驗沒通過,需要重新獲取讀鎖,保證資料安全。
long stamp = lock.tryOptimisticRead();
// 驗戳
if(!lock.validate(stamp)){
// 鎖升級
}
提供一個 資料容器類 內部分別使用讀鎖保護資料的 read() 方法,寫鎖保護資料的 write() 方法 Test37.java
StampedLock 不支援條件變數 StampedLock 不支援可重入
8.2.4 Semaphore
基本使用
訊號量,用來限制能同時訪問共享資源的執行緒上限。Test42.java
public static void main(String[] args) {
// 1. 建立 semaphore 物件
Semaphore semaphore = new Semaphore(3);
// 2. 10個執行緒同時執行
for (int i = 0; i < 10; i++) {
new Thread(() -> {
// 3. 獲取許可
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("running...");
sleep(1);
log.debug("end...");
} finally {
// 4. 釋放許可
semaphore.release();
}
}).start();
}
}
圖解流程
Semaphore 有點像一個停車場,permits 就好像停車位數量,當執行緒獲得了 permits 就像是獲得了停車位,然後停車場顯示空餘車位減一剛開始,permits(state)為 3,這時 5 個執行緒來獲取資源
假設其中 Thread-1,Thread-2,Thread-4 cas 競爭成功,而 Thread-0 和 Thread-3 競爭失敗,進入 AQS 佇列park 阻塞
這時 Thread-4 釋放了 permits,狀態如下
接下來 Thread-0 競爭成功,permits 再次設定為 0,設定自己為 head 節點,斷開原來的 head 節點,unpark 接下來的 Thread-3 節點,但由於 permits 是 0,因此 Thread-3 在嘗試不成功後再次進入 park 狀態
原始碼分析
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
// permits 即 state
super(permits);
}
// Semaphore 方法, 方便閱讀, 放在此處
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AQS 繼承過來的方法, 方便閱讀, 放在此處
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 嘗試獲得共享鎖
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
// Sync 繼承過來的方法, 方便閱讀, 放在此處
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (
// 如果許可已經用完, 返回負數, 表示獲取失敗, 進入 doAcquireSharedInterruptibly
remaining < 0 ||
// 如果 cas 重試成功, 返回正數, 表示獲取成功
compareAndSetState(available, remaining)
) {
return remaining;
}
}
}
// AQS 繼承過來的方法, 方便閱讀, 放在此處
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 再次嘗試獲取許可
int r = tryAcquireShared(arg);
if (r >= 0) {
// 成功後本執行緒出隊(AQS), 所在 Node設定為 head
// 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一個節點 unpark
// 如果 head.waitStatus == 0 ==> Node.PROPAGATE
// r 表示可用資源數, 為 0 則不會繼續傳播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 不成功, 設定上一個節點 waitStatus = Node.SIGNAL, 下輪進入 park 阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// Semaphore 方法, 方便閱讀, 放在此處
public void release() {
sync.releaseShared(1);
}
// AQS 繼承過來的方法, 方便閱讀, 放在此處
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// Sync 繼承過來的方法, 方便閱讀, 放在此處
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
}
8.2.5 CountdownLatch
CountDownLatch允許 count 個執行緒阻塞在一個地方,直至所有執行緒的任務都執行完畢。在 Java 併發中,countdownlatch 的概念是一個常見的面試題,所以一定要確保你很好的理解了它。
CountDownLatch是共享鎖的一種實現,它預設構造 AQS 的 state 值為 count。當執行緒使用countDown方法時,其實使用了tryReleaseShared
方法以CAS的操作來減少state,直至state為0就代表所有的執行緒都呼叫了countDown方法。當呼叫await方法的時候,如果state不為0,就代表仍然有執行緒沒有呼叫countDown方法,那麼就把已經呼叫過countDown的執行緒都放入阻塞佇列Park,並自旋CAS判斷state == 0,直至最後一個執行緒呼叫了countDown,使得state == 0,於是阻塞的執行緒便判斷成功,全部往下執行。
用來進行執行緒同步協作,等待所有執行緒完成倒計時。 其中構造引數用來初始化等待計數值,await() 用來等待計數歸零,countDown() 用來讓計數減一 Test38.java
8.2..6 CyclicBarri
CyclicBarri[ˈsaɪklɪk ˈbæriɚ] 迴圈柵欄,用來進行執行緒協作,等待執行緒滿足某個計數。構造時設定『計數個數』,每個執行緒執行到某個需要“同步”的時刻呼叫 await() 方法進行等待,當等待的執行緒數滿足『計數個數』時,繼續執行。跟CountdownLatch一樣,但這個可以重用
CyclicBarrier cb = new CyclicBarrier(2); // 個數為2時才會繼續執行
for (int i=0;i<3;i++){
new Thread(()->{
System.out.println("執行緒1開始.."+new Date());
try {
cb.await(); // 當個數不足時,等待
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("執行緒1繼續向下執行..."+new Date());
}).start();
new Thread(()->{
System.out.println("執行緒2開始.."+new Date());
try { Thread.sleep(2000); } catch (InterruptedException e) { }
try {
cb.await(); // 2 秒後,執行緒個數夠2,繼續執行
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("執行緒2繼續向下執行..."+new Date());
}).start();
}