xfchgfwwwbb0002com可重入讀寫鎖 ReentrantReadW19908836661
我們只需要弄清楚 synchronized 和 AQS 的原理,再去理解並發鎖的性質和局限就很簡單了。因此這篇文章重點放在原理上,對於使用和特點不會過多涉及。
下面是關於鎖的一些概念解釋,這些都是一些關於鎖的性質的描述,並非具體實現。
悲觀鎖和樂觀鎖
悲觀鎖和獨占鎖是一個意思,它假設一定會發生沖突,因此獲取到鎖之後會阻塞其他等待線程。這麽做的好處是簡單安全,但是掛起線程和恢復線程都需要轉入內核態進行,這樣做會帶來很大的性能開銷。悲觀鎖的代表是 synchronized。然而在真實環境中,大部分時候都不會產生沖突。悲觀鎖會造成很大的浪費。而樂觀鎖不一樣,它假設不會產生沖突,先去嘗試執行某項操作,失敗了再進行其他處理(一般都是不斷循環重試)。這種鎖不會阻塞其他的線程,也不涉及上下文切換,性能開銷小。代表實現是 CAS。
公平鎖和非公平鎖
公平鎖是指各個線程在加鎖前先檢查有無排隊的線程,按排隊順序去獲得鎖。
可重入鎖和不可重入鎖
如果一個線程已經獲取到了一個鎖,那麽它可以訪問被這個鎖鎖住的所有代碼塊。不可重入鎖與之相反。
Synchronized 關鍵字
Synchronized 是一種獨占鎖。在修飾靜態方法時,鎖的是類對象,如 Object.class。修飾非靜態方法時,鎖的是對象,即 this。修飾方法塊時,鎖的是括號裏的對象。
每個對象有一個鎖和一個等待隊列,鎖只能被一個線程持有,其他需要鎖的線程需要阻塞等待。鎖被釋放後,對象會從隊列中取出一個並喚醒,喚醒哪個線程是不確定的,不保證公平性。
synchronized 修飾靜態方法時,鎖的是類對象,如 Object.class。修飾非靜態方法時,鎖的是對象,即 this。
多個線程是可以同時執行同一個synchronized實例方法的,只要它們訪問的對象是不同的。
synchronized 鎖住的是對象而非代碼,只要訪問的是同一個對象的 synchronized 方法,即使是不同的代碼,也會被同步順序訪問。
此外,需要說明的,synchronized方法不能防止非synchronized方法被同時執行,所以,一般在保護變量時,需要在所有訪問該變量的方法上加上synchronized。
實現原理
synchronized 是基於 Java 對象頭和 Monitor 機制來實現的。
Java 對象頭
一個對象在內存中包含三部分:對象頭,實例數據和對齊填充。其中 Java 對象頭包含兩部分:
Class Metadata Address (類型指針)。存儲類的元數據的指針。虛擬機通過這個指針找到它是哪個類的實例。
Mark Word(標記字段)。存出一些對象自身運行時的數據。包括哈希碼,GC 分代年齡,鎖狀態標誌等。
Monitor
Mark Word 有一個字段指向 monitor 對象。monitor 中記錄了鎖的持有線程,等待的線程隊列等信息。前面說的每個對象都有一個鎖和一個等待隊列,就是在這裏實現的。
monitor 對象由 C++ 實現。其中有三個關鍵字段:
_owner 記錄當前持有鎖的線程
_EntryList 是一個隊列,記錄所有阻塞等待鎖的線程
_WaitSet 也是一個隊列,記錄調用 wait() 方法並還未被通知的線程。
Monitor的操作機制如下:
多個線程競爭鎖時,會先進入 EntryList 隊列。競爭成功的線程被標記為 Owner。其他線程繼續在此隊列中阻塞等待。
如果 Owner 線程調用 wait() 方法,則其釋放對象鎖並進入 WaitSet 中等待被喚醒。Owner 被置空,EntryList 中的線程再次競爭鎖。
如果 Owner 線程執行完了,便會釋放鎖,Owner 被置空,EntryList 中的線程再次競爭鎖。
JVM 對 synchronized 的處理
上面了解了 monitor 的機制,那虛擬機是如何將 synchronized 和 monitor 關聯起來的呢?分兩種情況:
如果同步的是代碼塊,編譯時會直接在同步代碼塊前加上 monitorenter 指令,代碼塊後加上 monitorexit 指令。這稱為顯示同步。
如果同步的是方法,虛擬機會為方法設置 ACC_SYNCHRONIZED 標誌。調用的時候 JVM 根據這個標誌判斷是否是同步方法。
JVM 對 synchronized 的優化
synchronized 是重量級鎖,由於消耗太大,虛擬機對其做了一些優化。
自旋鎖與自適應自旋
在許多應用中,鎖定狀態只會持續很短的時間,為了這麽一點時間去掛起恢復線程,不值得。我們可以讓等待線程執行一定次數的循環,在循環中去獲取鎖。這項技術稱為自旋鎖,它可以節省系統切換線程的消耗,但仍然要占用處理器。在 JDK1.4.2 中,自選的次數可以通過參數來控制。 JDK 1.6又引入了自適應的自旋鎖,不再通過次數來限制,而是由前一次在同一個鎖上的自旋時間及鎖的擁有者的狀態來決定。
鎖消除
虛擬機在運行時,如果發現一段被鎖住的代碼中不可能存在共享數據,就會將這個鎖清除。
鎖粗化
當虛擬機檢測到有一串零碎的操作都對同一個對象加鎖時,會把鎖擴展到整個操作序列外部。如 StringBuffer 的 append 操作。
輕量級鎖
對絕大部分的鎖來說,在整個同步周期內都不存在競爭。如果沒有競爭,輕量級鎖可以使用 CAS 操作避免使用互斥量的開銷。
偏向鎖
偏向鎖的核心思想是,如果一個線程獲得了鎖,那麽鎖就進入偏向模式,當這個線程再次請求鎖時,無需再做任何同步操作,即可獲取鎖。
CAS
操作模型
CAS 是 compare and swap 的簡寫,即比較並交換。它是指一種操作機制,而不是某個具體的類或方法。在 Java 平臺上對這種操作進行了包裝。在 Unsafe 類中,調用代碼如下:
unsafe.compareAndSwapInt(this, valueOffset, expect, update);
復制代碼它需要三個參數,分別是內存位置 V,舊的預期值 A 和新的值 B。操作時,先從內存位置讀取到值,然後和預期值A比較。如果相等,則將此內存位置的值改為新值 B,返回 true。如果不相等,說明和其他線程沖突了,則不做任何改變,返回 false。
這種機制在不阻塞其他線程的情況下避免了並發沖突,比獨占鎖的性能高很多。 CAS 在 Java 的原子類和並發包中有大量使用。
重試機制(循環 CAS)
有很多文章說,CAS 操作失敗後會一直重試直到成功,這種說法很不嚴謹。
第一,CAS 本身並未實現失敗後的處理機制,它只負責返回成功或失敗的布爾值,後續由調用者自行處理。只不過我們最常用的處理方式是重試而已。
第二,這句話很容易理解錯,被理解成重新比較並交換。實際上失敗的時候,原值已經被修改,如果不更改期望值,再怎麽比較都會失敗。而新值同樣需要修改。
所以正確的方法是,使用一個死循環進行 CAS 操作,成功了就結束循環返回,失敗了就重新從內存讀取值和計算新值,再調用 CAS。看下 AtomicInteger 的源碼就什麽都懂了:
public final int incrementAndGet () {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
底層實現
CAS 主要分三步,讀取-比較-修改。其中比較是在檢測是否有沖突,如果檢測到沒有沖突後,其他線程還能修改這個值,那麽 CAS 還是無法保證正確性。所以最關鍵的是要保證比較-修改這兩步操作的原子性。
CAS 底層是靠調用 CPU 指令集的 cmpxchg 完成的,它是 x86 和 Intel 架構中的 compare and exchange 指令。在多核的情況下,這個指令也不能保證原子性,需要在前面加上 lock 指令。lock 指令可以保證一個 CPU 核心在操作期間獨占一片內存區域。那麽 這又是如何實現的呢?
在處理器中,一般有兩種方式來實現上述效果:總線鎖和緩存鎖。在多核處理器的結構中,CPU 核心並不能直接訪問內存,而是統一通過一條總線訪問。總線鎖就是鎖住這條總線,使其他核心無法訪問內存。這種方式代價太大了,會導致其他核心停止工作。而緩存鎖並不鎖定總線,只是鎖定某部分內存區域。當一個 CPU 核心將內存區域的數據讀取到自己的緩存區後,它會鎖定緩存對應的內存區域。鎖住期間,其他核心無法操作這塊內存區域。
CAS 就是通過這種方式實現比較和交換操作的原子性的。值得註意的是, CAS 只是保證了操作的原子性,並不保證變量的可見性,因此變量需要加上 volatile 關鍵字。
ABA 問題
上面提到,CAS 保證了比較和交換的原子性。但是從讀取到開始比較這段期間,其他核心仍然是可以修改這個值的。如果核心將 A 修改為 B,CAS 可以判斷出來。但是如果核心將 A 修改為 B 再修改回 A。那麽 CAS 會認為這個值並沒有被改變,從而繼續操作。這是和實際情況不符的。解決方案是加一個版本號。
可重入鎖 ReentrantLock
ReentrantLock 使用代碼實現了和 synchronized 一樣的語義,包括可重入,保證內存可見性和解決競態條件問題等。相比 synchronized,它還有如下好處:
支持以非阻塞方式獲取鎖
可以響應中斷
可以限時
支持了公平鎖和非公平鎖
基本用法如下:
public class Counter {
private final Lock lock = new ReentrantLock();
private volatile int count;
public void incr() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public int getCount() {
return count;
}
}
ReentrantLock 內部有兩個內部類,分別是 FairSync 和 NoFairSync,對應公平鎖和非公平鎖。他們都繼承自 Sync。Sync 又繼承自AQS。
AQS
AQS 全稱 AbstractQueuedSynchronizer。AQS 中有兩個重要的成員:
成員變量 state。用於表示鎖現在的狀態,用 volatile 修飾,保證內存一致性。同時所用對 state 的操作都是使用 CAS 進行的。state 為0表示沒有任何線程持有這個鎖,線程持有該鎖後將 state 加1,釋放時減1。多次持有釋放則多次加減。
還有一個雙向鏈表,鏈表除了頭結點外,每一個節點都記錄了線程的信息,代表一個等待線程。這是一個 FIFO 的鏈表。
下面以 ReentrantLock 非公平鎖的代碼看看 AQS 的原理。
請求鎖
請求鎖時有三種可能:
如果沒有線程持有鎖,則請求成功,當前線程直接獲取到鎖。
如果當前線程已經持有鎖,則使用 CAS 將 state 值加1,表示自己再次申請了鎖,釋放鎖時減1。這就是可重入性的實現。
如果由其他線程持有鎖,那麽將自己添加進等待隊列。
f```
inal void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread()); //沒有線程持有鎖時,直接獲取鎖,對應情況1
else
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) && //在此方法中會判斷當前持有線程是否等於自己,對應情況2
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //將自己加入隊列中,對應情況3
selfInterrupt();
}
創建 Node 節點並加入鏈表
如果沒競爭到鎖,這時候就要進入等待隊列。隊列是默認有一個 head 節點的,並且不包含線程信息。上面情況3中,addWaiter 會創建一個 Node,並添加到鏈表的末尾,Node 中持有當前線程的引用。同時還有一個成員變量 waitStatus,表示線程的等待狀態,初始值為0。我們還需要關註兩個值:
CANCELLED,值為1,表示取消狀態,就是說我不要這個鎖了,請你把我移出去。
SINGAL,值為-1,表示下一個節點正在掛起等待,註意是下一個節點,不是當前節點。
同時,加到鏈表末尾的操作使用了 CAS+死循環的模式,很有代表性,拿出來看一看:
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
U.putObject(node, Node.PREV, oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
可以看到,在死循環裏調用了 CAS 的方法。如果多個線程同時調用該方法,那麽每次循環都只有一個線程執行成功,其他線程進入下一次循環,重新調用。N個線程就會循環N次。這樣就在無鎖的模式下實現了並發模型。
掛起等待
如果此節點的上一個節點是頭部節點,則再次嘗試獲取鎖,獲取到了就移除並返回。獲取不到就進入下一步;
判斷前一個節點的 waitStatus,如果是 SINGAL,則返回 true,並調用 LockSupport.park() 將線程掛起;
如果是 CANCELLED,則將前一個節點移除;
如果是其他值,則將前一個節點的 waitStatus 標記為 SINGAL,進入下一次循環。
可以看到,一個線程最多有兩次機會,還競爭不到就去掛起等待。
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
釋放鎖
調用 tryRelease,此方法由子類實現。實現非常簡單,如果當前線程是持有鎖的線程,就將 state 減1。減完後如果 state 大於0,表示當前線程仍然持有鎖,返回 false。如果等於0,表示已經沒有線程持有鎖,返回 true,進入下一步;
如果頭部節點的 waitStatus 不等於0,則調用LockSupport.unpark()喚醒其下一個節點。頭部節點的下一個節點就是等待隊列中的第一個線程,這反映了 AQS 先進先出的特點。另外,即使是非公平鎖,進入隊列之後,還是得按順序來。
public final boolean release(int arg) {
if (tryRelease(arg)) { //將 state 減1
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null) //喚醒第一個等待的線程
LockSupport.unpark(s.thread);
}
公平鎖如何實現
上面分析的是非公平鎖,那公平鎖呢?很簡單,在競爭鎖之前判斷一下等待隊列中有沒有線程在等待就行了。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && //判斷等待隊列是否有節點
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
......
return false;
}
可重入讀寫鎖 ReentrantReadWriteLock
讀寫鎖機制
理解 ReentrantLock 和 AQS 之後,再來理解讀寫鎖就很簡單了。讀寫鎖有一個讀鎖和一個寫鎖,分別對應讀操作和鎖操作。鎖的特性如下:
只有一個線程可以獲取到寫鎖。在獲取寫鎖時,只有沒有任何線程持有任何鎖才能獲取成功;
如果有線程正持有寫鎖,其他任何線程都獲取不到任何鎖;
沒有線程持有寫鎖時,可以有多個線程獲取到讀鎖。
上面鎖的特點保證了可以並發讀取,這大大提高了效率,在實際開發中非常有用。那麽在具體是如何實現的呢?
實現原理
讀寫鎖雖然有兩個鎖,但實際上只有一個等待隊列。
獲取寫鎖時,要保證沒有任何線程持有鎖;
寫鎖釋放後,會喚醒隊列第一個線程,可能是讀鎖和寫鎖;
獲取讀鎖時,先判斷寫鎖有沒有被持有,沒有就可以獲取成功;
獲取讀鎖成功後,會將隊列中等待讀鎖的線程挨個喚醒,知道遇到等待寫鎖的線程位置;
釋放讀鎖時,要檢查讀鎖數,如果為0,則喚醒隊列中的下一個線程,否則不進行操作。
xfchgfwwwbb0002com可重入讀寫鎖 ReentrantReadW19908836661