1. 程式人生 > >深入淺出 Java 同步器

深入淺出 Java 同步器

前言

在 java.util.concurrent.locks 包中有很多Lock的實現類,常用的有ReentrantLock、ReadWriteLock(實現類ReentrantReadWriteLock),內部實現都依賴AbstractQueuedSynchronizer類,接下去讓我們看看Doug Lea大神是如何使用一個普通類就完成了程式碼塊的併發訪問控制。為了方便,本文中使用AQS代替AbstractQueuedSynchronizer。

定義

123456789101112 publicabstractclassAbstractQueuedSynchronizer extendsAbstractOwnableSynchronizer implementsjava.io.Serializable{//等待佇列的頭節點privatetransient volatile Node head;//等待佇列的尾節點privatetransient volatile Node tail;//同步狀態privatevolatile intstate;protectedfinal
intgetState(){returnstate;}protectedfinalvoidsetState(intnewState){state=newState;}...}

佇列同步器AQS是用來構建鎖或其他同步元件的基礎框架,內部使用一個int成員變量表示同步狀態,通過內建的FIFO佇列來完成資源獲取執行緒的排隊工作,其中內部狀態state,等待佇列的頭節點head和尾節點head,都是通過volatile修飾,保證了多執行緒之間的可見。

在深入實現原理之前,我們先看看內部的FIFO佇列是如何實現的。

1234567891011121314 staticfinalclassNode{staticfinalNode SHARED=newNode();staticfinalNode EXCLUSIVE=null;staticfinalintCANCELLED=1;staticfinalintSIGNAL=-1;staticfinalintCONDITION=-2;staticfinalintPROPAGATE=-3;volatile intwaitStatus;volatile Node prev;volatile Node next;volatile Thread thread;Node nextWaiter;...}

先來一張形象的圖(該圖其實是網上找的)

FIFO.png

黃色節點是預設head節點,其實是一個空節點,我覺得可以理解成代表當前持有鎖的執行緒,每當有執行緒競爭失敗,都是插入到佇列的尾節點,tail節點始終指向佇列中的最後一個元素。

每個節點中, 除了儲存了當前執行緒,前後節點的引用以外,還有一個waitStatus變數,用於描述節點當前的狀態。多執行緒併發執行時,佇列中會有多個節點存在,這個waitStatus其實代表對應執行緒的狀態:有的執行緒可能獲取鎖因為某些原因放棄競爭;有的執行緒在等待滿足條件,滿足之後才能執行等等。一共有4中狀態:

  1. CANCELLED 取消狀態
  2. SIGNAL 等待觸發狀態
  3. CONDITION 等待條件狀態
  4. PROPAGATE 狀態需要向後傳播

等待佇列是FIFO先進先出,只有前一個節點的狀態為SIGNAL時,當前節點的執行緒才能被掛起。

實現原理

子類重寫tryAcquire和tryRelease方法通過CAS指令修改狀態變數state。

1234 publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}
執行緒獲取鎖過程

下列步驟中執行緒A和B進行競爭。

  1. 執行緒A執行CAS執行成功,state值被修改並返回true,執行緒A繼續執行。
  2. 執行緒A執行CAS指令失敗,說明執行緒B也在執行CAS指令且成功,這種情況下執行緒A會執行步驟3。
  3. 生成新Node節點node,並通過CAS指令插入到等待佇列的隊尾(同一時刻可能會有多個Node節點插入到等待佇列中),如果tail節點為空,則將head節點指向一個空節點(代表執行緒B),具體實現如下:
    1234567891011121314151617181920212223242526272829 privateNode addWaiter(Node mode){Node node=newNode(Thread.currentThread(),mode);// Try the fast path of enq; backup to full enq on failureNode pred=tail;if(pred!=null){node.prev=pred;if(compareAndSetTail(pred,node)){pred.next=node;returnnode;}}enq(node);returnnode;}privateNode enq(finalNode node){for(;;){Nodet=tail;if(t==null){// Must initializeif(compareAndSetHead(newNode()))tail=head;}else{node.prev=t;if(compareAndSetTail(t,node)){t.next=node;returnt;}}}}
  4. node插入到隊尾後,該執行緒不會立馬掛起,會進行自旋操作。因為在node的插入過程,執行緒B(即之前沒有阻塞的執行緒)可能已經執行完成,所以要判斷該node的前一個節點pred是否為head節點(代表執行緒B),如果pred == head,表明當前節點是佇列中第一個“有效的”節點,因此再次嘗試tryAcquire獲取鎖,
    1、如果成功獲取到鎖,表明執行緒B已經執行完成,執行緒A不需要掛起。
    2、如果獲取失敗,表示執行緒B還未完成,至少還未修改state值。進行步驟5。
    123456789101112131415161718192021 finalbooleanacquireQueued(finalNode node,intarg){booleanfailed=true;try{booleaninterrupted=false;for(;;){finalNodep=node.predecessor();if(p==head&&tryAcquire(arg)){setHead(node);p.next=null;// help GCfailed=false;returninterrupted;}if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())interrupted=true;}}finally{if(failed)cancelAcquire(node);}}
  5. 前面我們已經說過只有前一個節點pred的執行緒狀態為SIGNAL時,當前節點的執行緒才能被掛起。
    1、如果pred的waitStatus == 0,則通過CAS指令修改waitStatus為Node.SIGNAL。
    2、如果pred的waitStatus > 0,表明pred的執行緒狀態CANCELLED,需從佇列中刪除。
    3、如果pred的waitStatus為Node.SIGNAL,則通過LockSupport.park()方法把執行緒A掛起,並等待被喚醒,被喚醒後進入步驟6。
    具體實現如下:
    123456789101112131415161718192021222324252627 privatestaticbooleanshouldParkAfterFailedAcquire(Node pred,Node node){intws=pred.waitStatus;if(ws==Node.SIGNAL)/*      * This node has already set status asking a release      * to signal it, so it can safely park.      */returntrue;if(ws>0){/*      * Predecessor was cancelled. Skip over predecessors and      * indicate retry.      */do{node.prev=pred=pred.prev;}while(pred.waitStatus>0);pred.next=node;}else{/*      * waitStatus must be 0 or PROPAGATE.  Indicate that we      * need a signal, but don't park yet.  Caller will need to      * retry to make sure it cannot acquire before parking.      */compareAndSetWaitStatus(pred,ws,Node.SIGNAL);}returnfalse;}
  6. 執行緒每次被喚醒時,都要進行中斷檢測,如果發現當前執行緒被中斷,那麼丟擲InterruptedException並退出迴圈。從無限迴圈的程式碼可以看出,並不是被喚醒的執行緒一定能獲得鎖,必須呼叫tryAccquire重新競爭,因為鎖是非公平的,有可能被新加入的執行緒獲得,從而導致剛被喚醒的執行緒再次被阻塞,這個細節充分體現了“非公平”的精髓。
執行緒釋放鎖過程:
  1. 如果頭結點head的waitStatus值為-1,則用CAS指令重置為0;
  2. 找到waitStatus值小於0的節點s,通過LockSupport.unpark(s.thread)喚醒執行緒。
    1234567891011121314151617181920212223242526 privatevoidunparkSuccessor(Node node){/*  * If status is negative (i.e., possibly needing signal) try  * to clear in anticipation of signalling.  It is OK if this  * fails or if status is changed by waiting thread.  */intws=node.waitStatus;if(ws<0)compareAndSetWaitStatus(node,ws,0);/*  * Thread to unpark is held in successor, which is normally  * just the next node.  But if cancelled or apparently null,  * traverse backwards from tail to find the actual  * non-cancelled successor.  */Nodes=node.next;if(s==null||s.waitStatus>0){s=null;for(Nodet=tail;t!=null&&t!=node;t=t.prev)if(t.waitStatus<=0)s=t;}if(s!=null)LockSupport.unpark(s.thread);}

    總結

    Doug Lea大神的思路跳躍的太快,把CAS指令玩的出神入化,以至於有些邏輯反反覆覆debug很多次才明白。