1. 程式人生 > 其它 >【併發程式設計】ReentrantLock

【併發程式設計】ReentrantLock

一。aqs

  AQS全稱是AbstractQueuedSynchronizer,是阻塞式鎖和相關的同步器工具的框架。

  特點:

    用 state 屬性來表示資源的狀態(分獨佔模式和共享模式),子類需要定義如何維護這個狀態,控制如何獲取 鎖和釋放鎖

      getState - 獲取 state 狀態

      setState - 設定 state 狀態

      compareAndSetState - cas 機制設定 state 狀態

      獨佔模式是隻有一個執行緒能夠訪問資源,而共享模式可以允許多個執行緒訪問資源

    提供了基於 FIFO 的等待佇列,類似於 Monitor 的 EntryList

    條件變數來實現等待、喚醒機制,支援多個條件變數,類似於 Monitor 的 WaitSet

  子類主要實現這樣一些方法(預設丟擲 UnsupportedOperationException)

    tryAcquire

    tryRelease

    tryAcquireShared

    tryReleaseShared

    isHeldExclusively

 二。ReentrantLock非公平鎖實現

  非公平鎖就是線上程搶佔鎖時沒有按照先後順序執行。

  當沒有競爭時,該鎖的owner為當前執行緒,state=1代表有執行緒正在佔用鎖。

   當有競爭出現時CAS 嘗試將 state 由 0 改為 1,結果失敗,進入 tryAcquire 邏輯,這時 state 已經是1,結果仍然失敗,接下來進入 addWaiter 邏輯,構造 Node 佇列:

  圖中黃色三角表示該 Node 的 waitStatus 狀態,其中 0 為預設正常狀態

  Node 的建立是懶惰的。

  其中第一個 Node 稱為 Dummy(啞元)或哨兵,用來佔位,並不關聯執行緒

   當前執行緒進入 acquireQueued 邏輯

  1. acquireQueued 會在一個死迴圈中不斷嘗試獲得鎖,失敗後進入 park 阻塞

  2. 如果自己是緊鄰著 head(排第二位),那麼再次 tryAcquire 嘗試獲取鎖,當然這時 state 仍為 1,失敗

  3. 進入 shouldParkAfterFailedAcquire 邏輯,將前驅 node,即 head 的 waitStatus 改為 -1,這次返回 false

   4. shouldParkAfterFailedAcquire 執行完畢回到 acquireQueued ,再次 tryAcquire 嘗試獲取鎖,當然這時 state 仍為 1,失敗

  5. 當再次進入 shouldParkAfterFailedAcquire 時,這時因為其前驅 node 的 waitStatus 已經是 -1,這次返回 true

  6. 進入 parkAndCheckInterrupt, Thread-1 park(灰色表示)

   再次有多個執行緒經歷上述過程競爭失敗,變成這個樣子

   Thread-0 釋放鎖,進入 tryRelease 流程,如果成功

  設定 exclusiveOwnerThread 為 null ,state = 0

  當前佇列不為 null,並且 head 的 waitStatus = -1,進入 unparkSuccessor 流程 找到佇列中離 head 最近的一個 Node(沒取消的),unpark 恢復其執行,本例中即為 Thread-1 回到 Thread-1 的 acquireQueued 流程

   如果加鎖成功(沒有競爭),會設定

    exclusiveOwnerThread 為 Thread-1,state = 1

    head 指向剛剛 Thread-1 所在的 Node,該 Node 清空 Thread

    原本的 head 因為從連結串列斷開,而可被垃圾回收 如果這時候有其它執行緒來競爭(非公平的體現),例如這時有 Thread-4 來了

   如果不巧又被 Thread-4 佔了先

    Thread-4 被設定為 exclusiveOwnerThread,state = 1

    Thread-1 再次進入 acquireQueued 流程,獲取鎖失敗,重新進入 park 阻塞

  加鎖原始碼:

// Sync 繼承自 AQS
static final class NonfairSync extends Sync {
 private static final long serialVersionUID = 7316153563782823691L;

 // 加鎖實現
 final void lock() {
 // 首先用 cas 嘗試(僅嘗試一次)將 state 從 0 改為 1, 如果成功表示獲得了獨佔鎖
 if (compareAndSetState(0, 1))
 setExclusiveOwnerThread(Thread.currentThread());
 else
 // 如果嘗試失敗,進入 ㈠
 acquire(1);
 }

 // ㈠ AQS 繼承過來的方法, 方便閱讀, 放在此處
 public final void acquire(int arg) {
 // ㈡ tryAcquire
 if (
 !tryAcquire(arg) &&
 // 當 tryAcquire 返回為 false 時, 先呼叫 addWaiter ㈣, 接著 acquireQueued ㈤
 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
 ) {
 selfInterrupt();
 }
 }

 // ㈡ 進入 ㈢
 protected final boolean tryAcquire(int acquires) {
 return nonfairTryAcquire(acquires);
 }

 // ㈢ Sync 繼承過來的方法, 方便閱讀, 放在此處
 final boolean nonfairTryAcquire(int acquires) {
 final Thread current = Thread.currentThread();
 int c = getState();
 // 如果還沒有獲得鎖
 if (c == 0) {
 // 嘗試用 cas 獲得, 這裡體現了非公平性: 不去檢查 AQS 佇列
 if (compareAndSetState(0, acquires)) {
 setExclusiveOwnerThread(current);
 return true;
 }
 }
 // 如果已經獲得了鎖, 執行緒還是當前執行緒, 表示發生了鎖重入
 else if (current == getExclusiveOwnerThread()) {
 // state++
 int nextc = c + acquires;
 if (nextc < 0) // overflow
 throw new Error("Maximum lock count exceeded");
 setState(nextc);
 return true;
 }
 // 獲取失敗, 回到呼叫處
 return false;
 }

 // ㈣ AQS 繼承過來的方法, 方便閱讀, 放在此處
 private Node addWaiter(Node mode) {
 // 將當前執行緒關聯到一個 Node 物件上, 模式為獨佔模式
 Node node = new Node(Thread.currentThread(), mode);
 // 如果 tail 不為 null, cas 嘗試將 Node 物件加入 AQS 佇列尾部
 Node pred = tail;
 if (pred != null) {
 node.prev = pred;
 if (compareAndSetTail(pred, node)) {
 // 雙向連結串列
 pred.next = node;
 return node;
 }
 }
 // 嘗試將 Node 加入 AQS, 進入 ㈥
 enq(node);
 return node;
 }

 // ㈥ AQS 繼承過來的方法, 方便閱讀, 放在此處
 private Node enq(final Node node) {
 for (;;) {
 Node t = tail;
 if (t == null) {
 // 還沒有, 設定 head 為哨兵節點(不對應執行緒,狀態為 0)
 if (compareAndSetHead(new Node())) {
 tail = head;
 }
 } else {
 // cas 嘗試將 Node 物件加入 AQS 佇列尾部
 node.prev = t;
 if (compareAndSetTail(t, node)) {
 t.next = node;
 return t;
 }
 }
 }
 }

 // ㈤ AQS 繼承過來的方法, 方便閱讀, 放在此處
 final boolean acquireQueued(final Node node, int arg) {
 boolean failed = true;
 try {
 boolean interrupted = false;
 for (;;) {
 final Node p = node.predecessor();
 // 上一個節點是 head, 表示輪到自己(當前執行緒對應的 node)了, 嘗試獲取
 if (p == head && tryAcquire(arg)) {
 // 獲取成功, 設定自己(當前執行緒對應的 node)為 head
 setHead(node);
 // 上一個節點 help GC
 p.next = null;
 failed = false;
 // 返回中斷標記 false
 return interrupted;
 }
 if (
 // 判斷是否應當 park, 進入 ㈦
 shouldParkAfterFailedAcquire(p, node) &&
 // park 等待, 此時 Node 的狀態被置為 Node.SIGNAL ㈧
 parkAndCheckInterrupt()
 ) {
 interrupted = true;
 }
 }
 } finally {
 if (failed)
 cancelAcquire(node);
 }
 }

 // ㈦ AQS 繼承過來的方法, 方便閱讀, 放在此處
 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 // 獲取上一個節點的狀態
 int ws = pred.waitStatus;
 if (ws == Node.SIGNAL) {
 // 上一個節點都在阻塞, 那麼自己也阻塞好了
 return true;
 }
 // > 0 表示取消狀態
 if (ws > 0) {
 // 上一個節點取消, 那麼重構刪除前面所有取消的節點, 返回到外層迴圈重試
 do {
 node.prev = pred = pred.prev;
 } while (pred.waitStatus > 0);
 pred.next = node;
 } else {
 // 這次還沒有阻塞
 // 但下次如果重試不成功, 則需要阻塞,這時需要設定上一個節點狀態為 Node.SIGNAL
 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
 }
 return false;
 }

 // ㈧ 阻塞當前執行緒
 private final boolean parkAndCheckInterrupt() {
 LockSupport.park(this);
 return Thread.interrupted();
 }
}
View Code

  解鎖原始碼:

// Sync 繼承自 AQS
static final class NonfairSync extends Sync {
 // 解鎖實現
 public void unlock() {
 sync.release(1);
 }

 // AQS 繼承過來的方法, 方便閱讀, 放在此處
 public final boolean release(int arg) {
 // 嘗試釋放鎖, 進入 ㈠
 if (tryRelease(arg)) {
 // 佇列頭節點 unpark
 Node h = head;
 if (
 // 佇列不為 null
 h != null &&
 // waitStatus == Node.SIGNAL 才需要 unpark
 h.waitStatus != 0
 ) {
 // unpark AQS 中等待的執行緒, 進入 ㈡
 unparkSuccessor(h);
 }
 return true;
 }
 return false;
 }

 // ㈠ Sync 繼承過來的方法, 方便閱讀, 放在此處
 protected final boolean tryRelease(int releases) {
 // state--
 int c = getState() - releases;
 if (Thread.currentThread() != getExclusiveOwnerThread())
 throw new IllegalMonitorStateException();
 boolean free = false;
 // 支援鎖重入, 只有 state 減為 0, 才釋放成功
 if (c == 0) {
 free = true;
 setExclusiveOwnerThread(null);
 }
 setState(c);
 return free;
 }

 // ㈡ AQS 繼承過來的方法, 方便閱讀, 放在此處
 private void unparkSuccessor(Node node) {
 // 如果狀態為 Node.SIGNAL 嘗試重置狀態為 0
 // 不成功也可以
 int ws = node.waitStatus;
 if (ws < 0) {
 compareAndSetWaitStatus(node, ws, 0);
 }
 // 找到需要 unpark 的節點, 但本節點從 AQS 佇列中脫離, 是由喚醒節點完成的
 Node s = node.next;
 // 不考慮已取消的節點, 從 AQS 佇列從後至前找到佇列最前面需要 unpark 的節點
 if (s == null || s.waitStatus > 0) {
 s = null;
 for (Node t = tail; t != null && t != node; t = t.prev)
 if (t.waitStatus <= 0)
 s = t;
 }
 if (s != null)
 LockSupport.unpark(s.thread);
 }
}
View Code

三。條件變數實現原理

  每個條件變數其實就對應著一個等待佇列,其實現類是 ConditionObject

  await 流程:

  開始 Thread-0 持有鎖,呼叫 await,進入 ConditionObject 的 addConditionWaiter 流程 建立新的 Node 狀態為 -2(Node.CONDITION),關聯 Thread-0,加入等待佇列尾部

   接下來進入 AQS 的 fullyRelease 流程,釋放同步器上的鎖

   unpark AQS 佇列中的下一個節點,競爭鎖,假設沒有其他競爭執行緒,那麼 Thread-1 競爭成功

   park 阻塞 Thread-0

   signal 流程

  假設 Thread-1 要來喚醒 Thread-0

   進入 ConditionObject 的 doSignal 流程,取得等待佇列中第一個 Node,即 Thread-0 所在 Node

   執行 transferForSignal 流程,將該 Node 加入 AQS 佇列尾部,將 Thread-0 的 waitStatus 改為 0,Thread-3 的 waitStatus 改為 -1

 四。讀寫鎖原理

  讀寫鎖用的是同一個 Sycn 同步器,因此等待佇列、state 等也是同一個

  t1 w.lock,t2 r.lock

  1) t1 成功上鎖,流程與 ReentrantLock 加鎖相比沒有特殊之處,不同是寫鎖狀態佔了 state 的低 16 位,而讀鎖 使用的是 state 的高 16 位

   2)t2 執行 r.lock,這時進入讀鎖的 sync.acquireShared(1) 流程,首先會進入 tryAcquireShared 流程。如果有寫 鎖佔據,那麼 tryAcquireShared 返回 -1 表示失敗

  tryAcquireShared 返回值表示

    -1 表示失敗 

    0 表示成功,但後繼節點不會繼續喚醒

    正數表示成功,而且數值是還有幾個後繼節點需要喚醒,讀寫鎖返回 1

   3)這時會進入 sync.doAcquireShared(1) 流程,首先也是呼叫 addWaiter 新增節點,不同之處在於節點被設定為 Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此時 t2 仍處於活躍狀態.

  4)t2 會看看自己的節點是不是老二,如果是,還會再次呼叫 tryAcquireShared(1) 來嘗試獲取鎖

  5)如果沒有成功,在 doAcquireShared 內 for (;;) 迴圈一次,把前驅節點的 waitStatus 改為 -1,再 for (;;) 迴圈一 次嘗試 tryAcquireShared(1) 如果還不成功,那麼在 parkAndCheckInterrupt() 處 park

   t3 r.lock,t4 w.lock

  這種狀態下,假設又有 t3 加讀鎖和 t4 加寫鎖,這期間 t1 仍然持有鎖,就變成了下面的樣子

   t1 w.unlock

  這時會走到寫鎖的 sync.release(1) 流程,呼叫 sync.tryRelease(1) 成功,變成下面的樣子

   接下來執行喚醒流程 sync.unparkSuccessor,即讓老二恢復執行,這時 t2 在 doAcquireShared 內 parkAndCheckInterrupt() 處恢復執行

  這回再來一次 for (;;) 執行 tryAcquireShared 成功則讓讀鎖計數加一

   這時 t2 已經恢復執行,接下來 t2 呼叫 setHeadAndPropagate(node, 1),它原本所在節點被置為頭節點

   事情還沒完,在 setHeadAndPropagate 方法內還會檢查下一個節點是否是 shared,如果是則呼叫 doReleaseShared() 將 head 的狀態從 -1 改為 0 並喚醒老二,這時 t3 在 doAcquireShared 內 parkAndCheckInterrupt() 處恢復執行

   這回再來一次 for (;;) 執行 tryAcquireShared 成功則讓讀鎖計數加一

   這時 t3 已經恢復執行,接下來 t3 呼叫 setHeadAndPropagate(node, 1),它原本所在節點被置為頭節點

   下一個節點不是 shared 了,因此不會繼續喚醒 t4 所在節點

  t2 r.unlock,t3 r.unlock

  t2 進入 sync.releaseShared(1) 中,呼叫 tryReleaseShared(1) 讓計數減一,但由於計數還不為零

   t3 進入 sync.releaseShared(1) 中,呼叫 tryReleaseShared(1) 讓計數減一,這回計數為零了,進入 doReleaseShared() 將頭節點從 -1 改為 0 並喚醒老二,即

   之後 t4 在 acquireQueued 中 parkAndCheckInterrupt 處恢復執行,再次 for (;;) 這次自己是老二,並且沒有其他 競爭,tryAcquire(1) 成功,修改頭結點,流程結束