Java併發程式設計的藝術之五----java中的鎖
1.Lock介面
鎖是用來控制多個執行緒訪問共享資源的方式,一般來說,一個鎖能夠防止多個執行緒同時訪問共享資源(但是有些鎖可以允許多個執行緒併發的訪問共享資源,比如讀寫鎖)。Java SE 5之後,併發包中新增了Lock介面(以及相關實現類)用來實現鎖功能,在使用時需要顯式地獲取和釋放鎖。雖然它缺少了(通過synchronized塊或者方法所提供的)隱式獲取釋放鎖的便捷性,但是卻擁有了鎖獲取與釋放的可操作性、可中斷的獲取鎖以及超時獲取鎖等多種synchronized關鍵字所不具備的同步特性。
使用synchronized關鍵字將會隱式地獲取鎖,但是它將鎖的獲取和釋放固化了,也就是先獲取再釋放
在finally塊中釋放鎖,目的是保證在獲取到鎖之後,最終能夠被釋放。
不要將獲取鎖的過程寫在try塊中,因為如果在獲取鎖(自定義鎖的實現)時發生了異常,異常丟擲的同時,也會導致鎖無故釋放。
基本操作
2.佇列同步器AQS
佇列同步器AbstractQueuedSynchronizer,是用來構建鎖
同步器的主要使用方式是繼承,子類通過繼承同步器並實現它的抽象方法來管理同步狀態,在抽象方法的實現過程中免不了要對同步狀態進行更改,這時就需要使用同步器提供的3個方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))來進行操作,因為它們能夠保證狀態的改變是安全的。
同步器提供的模板方法基本上分為3類:獨佔式獲取與釋放同步狀態、共享式獲取與釋放同步狀態和查詢同步佇列中的等待執行緒
來實現自己的同步語義。
顧名思義,獨佔鎖就是在同一時刻只能有一個執行緒獲取到鎖,而其他獲取鎖的執行緒只能處於同步佇列中等待,只有獲取鎖的執行緒釋放了鎖,後繼的執行緒才能夠獲取鎖,
class Mutex1 implements Lock { // 靜態內部類,自定義同步器 private static class Sync extends AbstractQueuedSynchronizer { // 是否處於佔用狀態 protected boolean isHeldExclusively() { return getState() == 1; }
// 當狀態為0的時候獲取鎖 public boolean tryAcquire(int acquires) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
// 釋放鎖,將狀態設定為0 protected boolean tryRelease(int releases) { if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; }
// 返回一個Condition,每個condition都包含了一個condition佇列 Condition newCondition() { return new ConditionObject(); } }
// 僅需要將操作代理到Sync上即可 private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } } |
獨佔鎖Mutex是一個自定義同步元件,它在同一時刻只允許一個執行緒佔有鎖,實現了tryAcquire和tryRelease獨佔方法。Mutex中定義了一個靜態內部類Sync,該內部類繼承了同步器並實現了獨佔式獲取和釋放同步狀態。在tryAcquire(int acquires)方法中,如果原來狀態是0然後經過CAS設定成功(同步狀態設定為1),則代表獲取了同步狀態,然後設定當前執行緒為擁有鎖執行緒,而在tryRelease(int releases)方法中只是將同步狀態重置為0,並將擁有鎖執行緒設定為空。使用者使用Mutex時並不會直接和內部同步器的實現打交道,而是呼叫Mutex提供的方法,在Mutex的實現中,以獲取鎖的lock()方法為例,只需要在方法實現中呼叫同步器的模板方法acquire(int args)即可,當前執行緒呼叫該方法獲取同步狀態失敗後會被加入到同步佇列中等待,這樣就大大降低了實現一個可靠自定義同步元件的門檻。
2.1佇列同行器的實現分析
從實現角度分析同步器是如何完成執行緒同步的,主要包括:同步佇列、獨佔式同步狀態獲取與釋放、共享式同步狀態獲取與釋放以及超時獲取同步狀態等同步器的核心資料結構與模板方法。
2.1.1 同步佇列
同步器依賴內部的同步佇列(一個FIFO雙向佇列)來完成同步狀態的管理,當前執行緒獲取同步狀態失敗時,同步器會將當前執行緒以及等待狀態等資訊構造成為一個節點(Node)並將其加入同步佇列,同時會阻塞當前執行緒,當同步狀態釋放時,會把首節點中的執行緒喚醒,使其再次嘗試獲取同步狀態。
同步佇列中的節點(Node)用來儲存獲取同步狀態失敗的執行緒引用、等待狀態以及前驅和
後繼節點。
同步器包含了兩個節點型別的引用,一個指向頭節點,而另一個指向尾節點。試想一下,當一個執行緒成功地獲取了同步狀態(或者鎖),其他執行緒將無法獲取到同步狀態,轉而被構造成為節點並加入到同步佇列中,而這個加入佇列的過程必須要保證執行緒安全,因此同步器提供了一個基於CAS的設定尾節點的方法:compareAndSetTail(Node expect,Node update),它需要傳遞當前執行緒“認為”的尾節點和當前節點,只有設定成功後,當前節點才正式與之前的尾節點建立關聯。
同步佇列遵循FIFO,首節點是獲取同步狀態成功的節點,首節點的執行緒在釋放同步狀態時,將會喚醒後繼節點,而後繼節點將會在獲取同步狀態成功時將自己設定為首節點,
注意:只有在獲取同步狀態時,才會對首節點進行設定,假設這麼一條佇列A-B-C,當其他執行緒釋放鎖之後,同步佇列首節點A先獲取同步狀態,然後在同步狀態過程中將B設為新的首節點。因為在同步狀態中實現的,所以不許要CAS保證。
2.1.2.獨佔式同步狀態獲取與釋放
通過呼叫同步器的acquire(int arg)方法可以獲取同步狀態,該方法對中斷不敏感,也就是由於執行緒獲取同步狀態失敗後進入同步佇列中,後續對執行緒進行中斷操作時,執行緒不會從同
步佇列中移出
首先呼叫自定義同步器實現的tryAcquire(int arg)方法,該方法保證執行緒安全的獲取同步狀態,如果同步狀態獲取失敗,則構造同步節點(獨佔式Node.EXCLUSIVE,同一時刻只能有一個執行緒成功獲取同步狀態)並通過addWaiter(Node node)方法將該節點加入到同步佇列的尾部,最後呼叫acquireQueued(Node node,int arg)方法,使得該節點以“死迴圈”的方式獲取同步狀態。如果獲取不到則阻塞節點中的執行緒,而被阻塞執行緒的喚醒主要依靠前驅節點的出隊或阻塞執行緒被中斷來實現。
節點的構造以及加入同步佇列
通過使用compareAndSetTail(Node expect,Node update)方法來確保節點能夠被執行緒安全新增。如果不符合上一種情況,就使用enq(final Node node)方法,同步器通過“死迴圈”來保證節點的正確新增,在“死迴圈”中只有通過CAS將節點設定成為尾節點之後,當前執行緒才能從該方法返回,否則,當前執行緒不斷地嘗試設定。
節點進入同步佇列之後,就進入了一個自旋的過程,當條件滿足,獲取到了同步狀態,就可以從這個自旋過程中退出,否則依舊留在這個自旋過程中(並會阻塞節點的執行緒)
在acquireQueued(final Node node,int arg)方法中,當前執行緒在“死迴圈”中嘗試獲取同步狀態,而只有前驅節點是頭節點才能夠嘗試獲取同步狀態,這是為什麼?
①頭結點是成功獲得同步狀態的節點,而頭結點的執行緒釋放了同步狀態後,將會喚醒後繼節點,後繼節點的執行緒被喚醒後,需要檢查自己的前驅節點是否為頭節點。
②維護同步佇列的FIFO原則。
可以看到節點和節點之間在迴圈檢查的過程中基本不相互通訊,而是簡單地判斷自己的前驅是否為頭節點,這樣就使得節點的釋放規則符合FIFO
當前執行緒獲取同步狀態並執行了相應邏輯之後,就需要釋放同步狀態,使得後續節點能夠繼續獲取同步狀態。通過呼叫同步器的release(int arg)方法可以釋放同步狀態,該方法在釋放了同步狀態之後,會喚醒其後繼節點(進而使後繼節點重新嘗試獲取同步狀態,從等待狀態到繼續自旋)。該方法程式碼如程式碼清單5-6所示。
該方法執行時,如果成功釋放了同步狀態,就會喚醒頭結點的後繼節點執行緒,unparkSuccessor(Node node)方法使用LockSupport來喚醒處於等待的執行緒。
在獲取同步狀態時,同步器維護一個同步佇列,獲取狀態失敗的執行緒都會被加入到佇列中並在佇列中進行自旋;移出佇列(或停止自旋)的條件是前驅節點為頭節點且成功獲取了同步狀態。在釋放同步狀態時,同步器呼叫tryRelease(int arg)方法釋放同步狀態,然後喚醒頭節點的後繼節點。
2.1.3 共享式同步狀態獲取與釋放
左:共享式訪問資源時,其他共享式的訪問軍備允許,而獨佔式訪問被阻塞
右:獨佔式訪問資源時,同一時刻其他訪問均被阻塞
通過呼叫acquireShared(int arg)方法可以共享式的獲取同步狀態
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } 同步器呼叫tryAcquireShared方法嘗試獲取同步狀態,該方法返回值為int型別,如果返回大於等於0,表示能夠獲取同步狀態。因此在共享式獲取的自旋過程中,成功獲取到同步狀態並退出自旋的條件是tryacquireshared方法返回值大於0. --------------------------------------------------------------- private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 如果當前節點的前驅為頭結點,嘗試獲取同步狀態,呼叫tryacquireshared方法,繼續自旋,直到返回值>=0,表示該次獲取同步狀態成功並從自旋狀態退出。
共享式釋放同步狀態,通過releaseShared(int args)方法可以釋放同步狀態。 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } 使用tryreleaseshared嘗試將狀態設定為共享式同步釋放的狀態,然後使用doReleaseShared()確保同步狀態執行緒 ----------------------------------------------------------- 執行緒同步狀態安全釋放後,進行迴圈操作,得到當前頭結點的waitStatus,如果是-1, 表明後繼執行緒需要喚醒了,使用cas將h的waitStatus置為0,然後將喚醒後面的執行緒。 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } |
2.1.4 獨佔式超時獲取同步狀態
通過呼叫同步器的doAcquireNanos(int arg,long nanosTimeout)方法可以超時獲取同步狀態,即在指定的時間段內獲取同步狀態,如果獲取到同步狀態則返回true,否則,返回false。
設定最後期限為當前時間+nanosTimeout,一直自旋,如果前一個執行緒p是頭結點,就嘗試獲取同步狀態,獲取成功,將當前設定為獨佔狀態的執行緒的節點node設定為頭結點,,將執行緒p的next置空,有助於gc,返回true表示獲取同步狀態,否則計算當前的nanosTimeout = deadline – 系統當前時間,如果小於等於0,說明超時了,如果執行緒發生中斷你,直接丟擲異常 private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); 如果p!=head或者tryacquire失敗了,shouldParkAfterFailedAcquire(p, node)檢查前驅p和node的狀態,如果前驅的waitStatus==-1,表明p已經準備好去喚醒下一節點即node,如果大於0,說明前驅p已經不是頭結點了,需要找到新的前驅,waitStatus為小於等於0的,如果等於0,採用cas將其設定為signal表示可以喚醒下一個執行緒;如果nanosTimeout>快速自旋的閾值,就使用LockSupport.parkNanos進行等待nanosTimeout秒;然後從該方法返回。如果小於等於快速自旋那麼就進入快速自旋、 if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } |
獨佔式超時獲取同步狀態doAcquireNanos(int arg,long nanosTimeout)和獨佔式獲取同步狀態acquire(int args)在流程上非常相似,其主要區別在於未獲取到同步狀態時的處理邏輯。acquire(int args)在未獲取到同步狀態時,將會使當前執行緒一直處於等待狀態,而doAcquireNanos(int arg,long nanosTimeout)會使當前執行緒等待nanosTimeout納秒,如果當前執行緒在nanosTimeout納秒內沒有獲取到同步狀態,將會從等待邏輯中自動返回。
2.1.5 自定義同步元件
設計同步工具:該工具在同一時刻,只允許至多兩個執行緒同時訪問,超過兩個執行緒的訪問將被阻塞,我們將這個同步工具命名為TwinsLock。
首先,確定訪問模式。TwinsLock能夠在同一時刻支援多個執行緒的訪問,這顯然是共享式訪問,因此,需要使用同步器提供的acquireShared(int args)方法等和Shared相關的方法,這就要求TwinsLock必須重寫tryAcquireShared(int args)方法和tryReleaseShared(int args)方法,這樣才能保證同步器的共享式同步狀態的獲取與釋放方法得以執行。
其次,定義資源數。TwinsLock在同一時刻允許至多兩個執行緒的同時訪問,表明同步資源數為2,這樣可以設定初始狀態status為2,當一個執行緒進行獲取,status減1,該執行緒釋放,則status加1,狀態的合法範圍為0、1和2,其中0表示當前已經有兩個執行緒獲取了同步資源,此時再有其他執行緒對同步狀態進行獲取,該執行緒只能被阻塞。在同步狀態變更時,需要使用compareAndSet(int expect,int update)方法做原子性保障。
最後,組合自定義同步器。前面的章節提到,自定義同步元件通過組合自定義同步器來完成同步功能,一般情況下自定義同步器會被定義為自定義同步元件的內部類。
public class TwinsLock implements Lock { private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { if (count <= 0) { throw new IllegalArgumentException("count must largethan zero."); } setState(count); }
public int tryAcquireShared(int reduceCount) { for (;;) { int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } }
public boolean tryReleaseShared(int returnCount) { for (;;) { int current = getState(); int newCount = current + returnCount; if (compareAndSetState(current, newCount)) { return true; } } } }
public void lock() { sync.acquireShared(1); }
public void unlock() { sync.releaseShared(1); } // 其他介面方法略 } |
實現了lock介面,提供了面向使用者的介面,lock和unlock,使用者呼叫lock方法獲取鎖,隨後用unlock方法釋放鎖,同一時刻只能有兩個執行緒同時獲取到鎖。TwinsLock包含一個自定義同步器Sync,以共享式獲取同步狀態為例,通過獲取當前state值,減去reducecount得到同步後的狀態值,然後使用cas對狀態值進行更新,當tryAcquireShared方法返回值大於等於0時,當前執行緒才獲取同步狀態,即獲得了鎖。釋放鎖也是同理。
設計一個測試方法:
public class TwinsLockTest { @Test public void test() { final Lock lock = new TwinsLock(); class Worker extends Thread { public void run() { while (true) { lock.lock(); try { SleepUtils.second(1); System.out.println(Thread.currentThread().getName() + "---" +System.currentTimeMillis()); SleepUtils.second(1); } finally { lock.unlock(); } } } } // 啟動10個執行緒 for (int i = 0; i < 10; i++) { Worker w = new Worker(); w.setDaemon(true); w.start(); } // 每隔1秒換行 for (int i = 0; i < 10; i++) { SleepUtils.second(1); System.out.println(); } } } |
定義一個work執行緒,然後設定為守護執行緒,啟動10個work執行緒,讓主函式執行10秒鐘,為了防止該test方法過早結束;然後在try塊前進行加鎖finally進行解鎖,保證執行緒成對輸出。
3.重入鎖
它表示該鎖能夠支援同一個執行緒對資源的重複加鎖。除此之外,該鎖支援獲取鎖時的公平和非公平性選擇。
ReentrantLock在呼叫lock方法時,已經獲取到鎖的執行緒,能夠再次呼叫lock方法獲取鎖而不阻塞。
公平鎖:先對鎖進行獲取的請求一定先被滿足,那麼這個鎖是公平的。
非公平鎖:獲取鎖的執行緒是隨機的。
3.1實現可重入
重進入:任意執行緒在獲取到鎖之後,能夠再次獲取該鎖而不被鎖阻塞。
需要解決兩個問題:
①執行緒再次獲取鎖。需要識別獲取鎖的執行緒是否為當前佔據鎖的執行緒,如果是,則再次成功獲取。
②鎖的最終釋放。重複獲得n次鎖,隨後在n次釋放該鎖後,其他執行緒能夠獲取該鎖。鎖的最終釋放要求鎖對於獲取次數進行記錄,計數表示當前鎖被重複獲取的次數,而鎖被釋放時,計數自減,當計數為0時,表示成功釋放。
ReentrantLock的nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) {
|