1. 程式人生 > >11、JUC之AQS中的CLH佇列

11、JUC之AQS中的CLH佇列

CLH佇列

AQS內部維護著一個FIFO的佇列,即CLH佇列。AQS的同步機制就是依靠CLH佇列實現的。CLH佇列是FIFO的雙端雙向佇列,實現公平鎖。執行緒通過AQS獲取鎖失敗,就會將執行緒封裝成一個Node節點,插入佇列尾。當有執行緒釋放鎖時,後嘗試把隊頭的next節點佔用鎖。

CLH佇列結構

Node

CLH佇列由Node物件組成,Node是AQS中的內部類。

重要屬性

   //用於標識共享鎖
   static final Node SHARED = new Node();

   //用於標識獨佔鎖
   static final Node EXCLUSIVE = null;

   /**
    * 因為超時或者中斷,節點會被設定為取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變為其他狀態;
    */
   static final int CANCELLED =  1;

   /**
    * 當前節點釋放鎖的時候,需要喚醒下一個節點
    */
   static final int SIGNAL    = -1;

   /**
    * 節點在等待佇列中,節點執行緒等待Condition喚醒
    */
   static final int CONDITION = -2;

   /**
    * 表示下一次共享式同步狀態獲取將會無條件地傳播下去
    */
   static final int PROPAGATE = -3;

   /** 等待狀態 */
   volatile int waitStatus;

   /** 前驅節點 */
   volatile Node prev;

   /** 後繼節點 */
   volatile Node next;

   /** 節點執行緒 */
   volatile Thread thread;
   
   //
   Node nextWaiter;

CLH佇列執行

  1. 執行緒呼叫acquire方法獲取鎖,如果獲取失敗則會進入CLH佇列
  public final void acquire(int arg) {
       if (!tryAcquire(arg) &&
           acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
           selfInterrupt();
   }

2.addWaiter(Node.EXCLUSIVE)方法會將當前執行緒封裝成Node節點,追加在隊尾。

private Node addWaiter(Node mode) {
       Node node = new Node(Thread.currentThread(), mode);
       // 獲取原隊尾
       Node pred = tail;
       if (pred != null) {
           node.prev = pred;
           //用cas更新 ,pred是原來隊尾,作為預期值,node作為新值
           if (compareAndSetTail(pred, node)) {
               pred.next = node;
               return node;
           }
       }
       //前面cas更新失敗後,再enq方法中迴圈用cas更新直到成功
       enq(node);
       return node;
}

  1. acquireQueued方法中會使執行緒自旋阻塞,直到獲取到鎖。
final boolean acquireQueued(final Node node, int arg) {
       boolean failed = true;
       try {
           boolean interrupted = false;
           for (;;) {
               //1. 拿到當前節點的前置節點
               final Node p = node.predecessor();
               
               //2. 如果當前節點的前置節點是頭節點的話,就再次嘗試獲取鎖
               if (p == head && tryAcquire(arg)) {
                   //成功獲取鎖後,將節點設定為頭節點
                   setHead(node);
                   p.next = null; // help GC
                   failed = false;
                   return interrupted;
               }
               /**
               更改當前節點前置節點的waitStatus,只有前置節點的waitStatus=Node.SIGNAL,當前節點才有可能被喚醒。如果前置節點的waitStatus>0(即取消),則跳過取更前面的節點。
               */
               if (shouldParkAfterFailedAcquire(p, node) &&
               //通過Unsafe.park來阻塞執行緒
                   parkAndCheckInterrupt())
                   interrupted = true;
           }
       } finally {
           if (failed)
               cancelAcquire(node);
       }
   }

  1. 執行緒釋放鎖,從前面可以知道,獲取到鎖的執行緒會設定為CLH佇列的頭部。這裡如果tryRelease返回true,且head的waitStatus!=0。就會更新head的waitStatus為0並且 喚醒執行緒head.next節點的執行緒。
 public final boolean release(int arg) { 
       //判斷是否可以釋放鎖。
       if (tryRelease(arg)) {
           Node h = head;
           if (h != null && h.waitStatus != 0)
               unparkSuccessor(h);
           return true;
       }
       return false;
   }

  1. 更新head的waitStatus為0並且喚醒執行緒head.next節點的執行緒
 private void unparkSuccessor(Node node) {
       
       int ws = node.waitStatus;
       //waitStatus不是取消狀態,就設定成0
       if (ws < 0)
           compareAndSetWaitStatus(node, ws, 0);

       
       //獲取下個waitStatus不為取消的Node
       Node s = node.next;
       if (s == null || s.waitStatus > 0) {
           s = null;
           for (Node t = tail; t != null && t != node; t = t.prev)
               if (t.waitStatus <= 0)
                   s = t;
       }
       //LockSupport.unpark是呼叫了Unsafe.unpark,喚醒執行緒。
       if (s != null)
           LockSupport.unpark(s.thread);
   }