執行緒安全實現與 CLH 佇列
阻塞同步
在 Java 中,我們經常使用 synchronized 關鍵字來做到互斥同步以解決多執行緒併發訪問共享資料的問題。synchronzied 關鍵字在編譯後,會在 synchronized 所包含的同步程式碼塊前後分別加入 monitorenter 和 monitorexit 這兩個位元組碼指令。synchronized 關鍵字需要指定一個物件來進行加鎖和解鎖。例如:
public class Main { private static final Object LOCK = new Object(); public static void fun1() { synchronized (LOCK) { // do something } } public static void fun2() { synchronized (LOCK) { // do something } } }
在沒有明確指定該物件時,根據 synchonized 修飾的是例項方法還是靜態方法,從而決定是採用物件例項或者類的class例項作為所物件。例如:
public class SynchronizedTest {
public synchronized void doSomething() {
//採用例項物件作為鎖物件
}
}
public class SynchronizedTest { public static synchronized void doSomething() { //採用SynchronizedTest.class 例項作為鎖物件 } }
由於基於 synchronized 實現的阻塞互斥,除了需要阻塞操作執行緒,而且喚醒或者阻塞作業系統級別的原生執行緒,需要從使用者態轉換到核心態中,這個狀態的轉換消耗的時間可能比使用者程式碼執行的時間還要長,因此我們經常說 synchronized 是 Java 語言中的 “重量級鎖”。
非阻塞同步
樂觀鎖與悲觀鎖
使用 synchronized 關鍵字的同步方式最主要的問題就是進行執行緒阻塞和喚醒時所帶來的的效能消耗問題。阻塞同步屬於悲觀的併發策略,只要有可能出現競爭,它都認為一定要加鎖。然而同步策略還有另外一種樂觀的策略,樂觀併發策略先進性對資料的操作,如果沒有發現其它執行緒也操作了資料,那麼就認為這個操作是成功的。如果發生了其它執行緒也操作了資料,那麼一般採取不斷重試的手段,直到成功為止,這種樂觀鎖的策略,不需要把執行緒阻塞,屬於非阻塞同步的一種手段。
CAS
樂觀併發策略主要有兩個重要的階段,一個是對資料進行操作,另外一個是進行衝突的檢測,即檢測其它執行緒有無同時也對該資料進行了操作。這裡的資料操作和衝突檢測需要具備原子性,否則就容易出現類似於 i++ 的問題。CAS 的含義為 compare and swap,目前絕大多數 CPU 都原生支援 CAS 原子指令,例如在 IA64、x86的指令集中,就有 cmpxchg 這樣的指令來完成 CAS 功能,它的原子性要求是在硬體層面上得到保證的。CAS 指令一般需要有三個引數,分別是值的記憶體地址、期望中的舊值和新值。CAS 指令執行時,如果該記憶體地址上的值符合期望中的舊值,處理器會用新值更新該記憶體地址上的值,否則就不更新。這個操作在 CPU 內部保證了是原子性的。在 Java 中有許多 CAS 相關的 API,我們常見的有 java.util.concurrent
包下的各種原子類,例如AtomicInteger
,AtomicReference
等等。這些類都支援 CAS 操作,其內部實際上也依賴於 sun.misc.Unsafe
這個類裡的 compareAndSwapInt() 和 compareAndSwapLong() 方法。CAS 並非是完美無缺的,儘管它能保證原子性,但它存在一個著名的 ABA 問題。一個變數初次讀取的時候值為 A,再一次讀取的時候也為 A,那麼我們是否能說明這個變數在兩次讀取中間沒有發生過變化?不能。在這期間,變數可能由 A 變為 B,再由 B 變為 A,第二次讀取的時候看到的是 A,但實際上這個變數發生了變化。一般的程式碼邏輯不會在意這個 ABA 問題,因為根據程式碼邏輯它不會影響併發的安全性,但如果在意的話,可能考慮採用阻塞同步的方式而不是 CAS。實際上 JDK 本身也對這個 ABA 問題解決方案,提供了 AtomicStampedReference
這個類,為變數加上版本來解決 ABA 問題。
自旋鎖
以 synchronized 為代表的阻塞同步,因為阻塞執行緒會恢復執行緒的操作都需要涉及到作業系統層面的使用者態和核心態之間的切換,這對系統的效能影響很大。自旋鎖的策略是當執行緒去獲取一個鎖時,如果發現該鎖已經被其它執行緒佔有,那麼它不馬上放棄 CPU 的執行時間片,而是進入一個“無意義”的迴圈,檢視該執行緒是否已經放棄了鎖。但自旋鎖適用於臨界區比較小的情況,如果鎖持有的時間過長,那麼自旋操作本身就會白白耗掉系統的效能。
以下為一個簡單的自旋鎖實現:
import java.util.concurrent.atomic.AtomicReference;
public class SpinLock {
private AtomicReference<Thread> owner = new AtomicReference<Thread>();
public void lock() {
Thread currentThread = Thread.currentThread();
// 如果鎖未被佔用,則設定當前執行緒為鎖的擁有者
while (!owner.compareAndSet(null, currentThread)) {}
}
public void unlock() {
Thread currentThread = Thread.currentThread();
// 只有鎖的擁有者才能釋放鎖
owner.compareAndSet(currentThread, null);
}
}
上述的程式碼中, owner 變數儲存獲得了鎖的執行緒。這裡的自旋鎖有一些缺點,第一個是沒有保證公平性,等待獲取鎖的執行緒之間,無法按先後順序分別獲得鎖;另一個,由於多個執行緒會去操作同一個變數 owner,在 CPU 的系統中,存在著各個 CPU 之間的快取資料需要同步,保證一致性,這會帶來效能問題。
公平的自旋
為了解決公平性問題,可以讓每個鎖擁有一個服務號,表示正在服務的執行緒,而每個執行緒嘗試獲取鎖之前需要先獲取一個排隊號,然後不斷輪詢當前鎖的服務號是否是自己的服務號,如果是,則表示獲得了鎖,否則就繼續輪詢。下面是一個簡單的實現:
import java.util.concurrent.atomic.AtomicInteger;
public class TicketLock {
private AtomicInteger serviceNum = new AtomicInteger(); // 服務號
private AtomicInteger ticketNum = new AtomicInteger(); // 排隊號
public int lock() {
// 首先原子性地獲得一個排隊號
int myTicketNum = ticketNum.getAndIncrement();
// 只要當前服務號不是自己的就不斷輪詢
while (serviceNum.get() != myTicketNum) {
}
return myTicketNum;
}
public void unlock(int myTicket) {
// 只有當前執行緒擁有者才能釋放鎖
int next = myTicket + 1;
serviceNum.compareAndSet(myTicket, next);
}
}
雖然解決了公平性的問題,但依然存在前面說的多 CPU 快取的同步問題,因為每個執行緒佔用的 CPU 都在同時讀寫同一個變數 serviceNum,這會導致繁重的系統匯流排流量和記憶體操作次數,從而降低了系統整體的效能。
MCS 自旋鎖
MCS 的名稱來自其發明人的名字:John Mellor-Crummey和Michael Scott。MCS 的實現是基於連結串列的,每個申請鎖的執行緒都是連結串列上的一個節點,這些執行緒會一直輪詢自己的本地變數,來知道它自己是否獲得了鎖。已經獲得了鎖的執行緒在釋放鎖的時候,負責通知其它執行緒,這樣 CPU 之間快取的同步操作就減少了很多,僅線上程通知另外一個執行緒的時候發生,降低了系統匯流排和記憶體的開銷。實現如下所示:
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
public class MCSLock {
public static class MCSNode {
volatile MCSNode next;
volatile boolean isWaiting = true; // 預設是在等待鎖
}
volatile MCSNode queue;// 指向最後一個申請鎖的MCSNode
private static final AtomicReferenceFieldUpdater<MCSLock, MCSNode> UPDATER = AtomicReferenceFieldUpdater
.newUpdater(MCSLock.class, MCSNode.class, "queue");
public void lock(MCSNode currentThread) {
MCSNode predecessor = UPDATER.getAndSet(this, currentThread);// step 1
if (predecessor != null) {
predecessor.next = currentThread;// step 2
while (currentThread.isWaiting) {// step 3
}
} else { // 只有一個執行緒在使用鎖,沒有前驅來通知它,所以得自己標記自己已獲得鎖
currentThread.isWaiting = false;
}
}
public void unlock(MCSNode currentThread) {
if (currentThread.isWaiting) {// 鎖擁有者進行釋放鎖才有意義
return;
}
if (currentThread.next == null) {// 檢查是否有人排在自己後面
if (UPDATER.compareAndSet(this, currentThread, null)) {// step 4
// compareAndSet返回true表示確實沒有人排在自己後面
return;
} else {
// 突然有人排在自己後面了,可能還不知道是誰,下面是等待後續者
// 這裡之所以要忙等是因為:step 1執行完後,step 2可能還沒執行完
while (currentThread.next == null) { // step 5
}
}
}
currentThread.next.isWaiting = false;
currentThread.next = null;// for GC
}
}
MCS 的能夠保證較高的效率,降低不必要的效能消耗,並且它是公平的自旋鎖。
CLH 自旋鎖
CLH 鎖與 MCS 鎖的原理大致相同,都是各個執行緒輪詢各自關注的變數,來避免多個執行緒對同一個變數的輪詢,從而從 CPU 快取一致性的角度上減少了系統的消耗。CLH 鎖的名字也與他們的發明人的名字相關:Craig,Landin and Hagersten。CLH 鎖與 MCS 鎖最大的不同是,MCS 輪詢的是當前佇列節點的變數,而 CLH 輪詢的是當前節點的前驅節點的變數,來判斷前一個執行緒是否釋放了鎖。實現如下所示:
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
public class CLHLock {
public static class CLHNode {
private volatile boolean isWaiting = true; // 預設是在等待鎖
}
private volatile CLHNode tail ;
private static final AtomicReferenceFieldUpdater<CLHLock, CLHNode> UPDATER = AtomicReferenceFieldUpdater
. newUpdater(CLHLock.class, CLHNode .class , "tail" );
public void lock(CLHNode currentThread) {
CLHNode preNode = UPDATER.getAndSet( this, currentThread);
if(preNode != null) {//已有執行緒佔用了鎖,進入自旋
while(preNode.isWaiting ) {
}
}
}
public void unlock(CLHNode currentThread) {
// 如果佇列裡只有當前執行緒,則釋放對當前執行緒的引用(for GC)。
if (!UPDATER .compareAndSet(this, currentThread, null)) {
// 還有後續執行緒
currentThread.isWaiting = false ;// 改變狀態,讓後續執行緒結束自旋
}
}
}
從上面可以看到,MCS 和 CLH 相比,CLH 的程式碼比 MCS 要少得多;CLH是在前驅節點的屬性上自旋,而MCS是在本地屬性變數上自旋;CLH的佇列是隱式的,通過輪詢關注上一個節點的某個變數,隱式地形成了鏈式的關係,但CLHNode並不實際持有下一個節點,MCS的佇列是物理存在的,而 CLH 的佇列是邏輯上存在的;此外,CLH 鎖釋放時只需要改變自己的屬性,MCS 鎖釋放則需要改變後繼節點的屬性。
CLH 佇列是 J.U.C 中 AQS 框架實現的核心原理。