1. 程式人生 > >淺談AQS --JAVA

淺談AQS --JAVA

一   AQS能幹什麼

 AQS是對列同步器AbustactQueuedSynchronizer的簡稱,位於juc包下。

你所看見的鎖包括 ReentrantLock、ReentrantReadWriteLock,還有同步元件 CyclicBarrier等,它的內部都是通過AQS實現的。通過學習AQS,你可以寫出自己的鎖與同步元件。

二AQS內部實現

1.同步佇列(一個FIFO雙向佇列)

作用:主要作用是用來存放在鎖上阻塞的執行緒,當一個執行緒嘗試獲取鎖時,如果鎖已經被佔用,那麼當前執行緒就會被構造成一個Node節點假如到同步佇列的尾部,

注意:同步佇列實際是不存在的,在AQS原始碼中只定義了節點的前一個節點與後一個節點。

           waitStatus值代表的含義。

 1 static final class Node {
 2         /** waitStatus值,表示執行緒已被取消(等待超時或者被中斷)*/
 3         static final int CANCELLED =  1;
 4         /** waitStatus值,表示後繼執行緒需要被喚醒(unpaking)*/
 5         static final int SIGNAL    = -1;
 6         /**waitStatus值,表示結點執行緒等待在condition上,當被signal後,會從等待佇列轉移到同步到佇列中 */
 7         /** waitStatus value to indicate thread is waiting on condition */
 8         static final int CONDITION = -2;
 9        /** waitStatus值,表示下一次共享式同步狀態會被無條件地傳播下去
10         static final int PROPAGATE = -3;
11         /** 等待狀態,初始為0 */
12         volatile int waitStatus;
13         /**當前結點的前驅結點 */
14         volatile Node prev;
15         /** 當前結點的後繼結點 */
16         volatile Node next;
17         /** 與當前結點關聯的排隊中的執行緒 */
18         volatile Thread thread;
19         /** ...... */
20     }

  2.AQS提供的便利

在AQS原始碼中,同步狀態通過設定一個int型別的state來實現。同步狀態可以用來判斷當前執行緒是否是安全的。對於同步狀態的修改與獲取,AQS已經幫你實現好了。

getState()

setState()

compareAndSetState()

 AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個執行緒能執行,如ReentrantLock)和Share(共享,多個執行緒可同時執行,如Semaphore/CountDownLatch)。

  不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可

,至於具體執行緒等待佇列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現以下幾種方法:

isHeldExclusively():該執行緒是否正在獨佔資源。只有用到condition才需要去實現它。

tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。

tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。

tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。

tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待結點返回true,否則返回false

實現自定義同步器時,還需要呼叫AQS提供的模板方法

 

3  獨佔式同步狀態的獲取與釋放

 

      lock方法一般會直接代理到acquire上

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

 a.首先,呼叫使用者重寫的tryAcquire方法,若返回true,意味著獲取同步狀態成功,後面的邏輯不再執行;若返回false,也就是獲取同步狀態失敗,進入b步驟;

 b.此時,獲取同步狀態失敗,構造獨佔式同步結點,通過addWatiter將此結點新增到同步佇列的尾部(此時可能會有多個執行緒結點試圖加入同步佇列尾部,需要以執行緒安全的方  式新增);

 c.該結點以在佇列中嘗試獲取同步狀態,若獲取不到,則阻塞結點執行緒,直到被前驅結點喚醒或者被中斷。

addWaiter

    為獲取同步狀態失敗的執行緒,構造成一個Node結點,新增到同步佇列尾部

 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);//構造結點
        //指向尾結點tail
        Node pred = tail;
        //如果尾結點不為空,CAS快速嘗試在尾部新增,若CAS設定成功,返回;否則,eng。
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

先cas快速設定,若失敗,進入enq方法  

  將結點新增到同步佇列尾部這個操作,同時可能會有多個執行緒嘗試新增到尾部,是非執行緒安全的操作。

  以上程式碼可以看出,使用了compareAndSetTail這個cas操作保證安全新增尾結點。

enq方法

 private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { //如果佇列為空,建立結點,同時被head和tail引用
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {//cas設定尾結點,不成功就一直重試
                    t.next = node;
                    return t;
                }
            }
        }
    }

  enq內部是個死迴圈,通過CAS設定尾結點,不成功就一直重試。很經典的CAS自旋的用法。這是一種樂觀的併發策略

最後,看下acquireQueued方法

acquireQueued

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {//死迴圈
                final Node p = node.predecessor();//找到當前結點的前驅結點
                if (p == head && tryAcquire(arg)) {//如果前驅結點是頭結點,才tryAcquire,其他結點是沒有機會tryAcquire的。
                    setHead(node);//獲取同步狀態成功,將當前結點設定為頭結點。
                    p.next = null; // 方便GC
                    failed = false;
                    return interrupted;
                }
                // 如果沒有獲取到同步狀態,通過shouldParkAfterFailedAcquire判斷是否應該阻塞,parkAndCheckInterrupt用來阻塞執行緒
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  acquireQueued 內部也是一個死迴圈,只有前驅結點是頭結點的結點,也就是老二結點,才有機會去tryAcquire;若tryAcquire成功,表示獲取同步狀態成 功,將此結點設定為頭結點;若是非老二結點,或者tryAcquire失敗,則進入shouldParkAfterFailedAcquire去判斷判斷 當前執行緒是否應該阻塞,若可以,呼叫parkAndCheckInterrupt阻塞當前執行緒,直到被中斷或者被前驅結點喚醒。若還不能休息,繼續迴圈。

shouldParkAfterFailedAcquire

shouldParkAfterFailedAcquire用來判斷當前結點執行緒是否能休息
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //獲取前驅結點的waitStatus值 
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)//若前驅結點的狀態是SIGNAL,意味著當前結點可以被安全地park
            return true;
        if (ws > 0) {
        // ws>0,只有CANCEL狀態ws才大於0。若前驅結點處於CANCEL狀態,也就是此結點執行緒已經無效,從後往前遍歷,找到一個非CANCEL狀態的結點,將自己設定為它的後繼結點
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {  
            // 若前驅結點為其他狀態,將其設定為SIGNAL狀態
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }    

  若shouldParkAfterFailedAcquire返回true,也就是當前結點的前驅結點為SIGNAL狀態,則意味著當前結點可以放心休息,進入parking狀態了。parkAncCheckInterrupt阻塞執行緒並處理中斷。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);//使用LockSupport使執行緒進入阻塞狀態
        return Thread.interrupted();// 執行緒是否被中斷過
    }

  至此,關於acquire的方法原始碼已經分析完畢,我們來簡單總結下

    a.首先tryAcquire獲取同步狀態,成功則直接返回;否則,進入下一環節;

    b.執行緒獲取同步狀態失敗,就構造一個結點,加入同步佇列中,這個過程要保證執行緒安全;

    c.加入佇列中的結點執行緒進入自旋狀態,若是老二結點(即前驅結點為頭結點),才有機會嘗試去獲取同步狀態;否則,當其前驅結點的狀態為SIGNAL,執行緒便可安心休息,進入阻塞狀態,直到被中斷或者被前驅結點喚醒。

釋放同步狀態--release()

  當前執行緒執行完自己的邏輯之後,需要釋放同步狀態,來看看release方法的邏輯

 public final boolean release(int arg) {
        if (tryRelease(arg)) {//呼叫使用者重寫的tryRelease方法,若成功,喚醒其後繼結點,失敗則返回false
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//喚醒後繼結點
            return true;
        }
        return false;
    }
  unparkSuccessor:喚醒後繼結點 
 1 private void unparkSuccessor(Node node) {
 2         //獲取wait狀態
 3         int ws = node.waitStatus;
 4         if (ws < 0)
 5             compareAndSetWaitStatus(node, ws, 0);// 將等待狀態waitStatus設定為初始值0
 6         Node s = node.next;//後繼結點
 7         if (s == null || s.waitStatus > 0) {//若後繼結點為空,或狀態為CANCEL(已失效),則從後尾部往前遍歷找到一個處於正常阻塞狀態的結點     進行喚醒
 8             s = null;
 9             for (Node t = tail; t != null && t != node; t = t.prev)
10                 if (t.waitStatus <= 0)
11                     s = t;
12         }
13         if (s != null)
14             LockSupport.unpark(s.thread);//使用LockSupprot喚醒結點對應的執行緒
15     }    

  release的同步狀態相對簡單,需要找到頭結點的後繼結點進行喚醒,若後繼結點為空或處於CANCEL狀態,從後向前遍歷找尋一個正常的結點,喚醒其對應執行緒。

 

 

4  共享式

  共享式:共 享式地獲取同步狀態。對於獨佔式同步元件來講,同一時刻只有一個執行緒能獲取到同步狀態,其他執行緒都得去排隊等待,其待重寫的嘗試獲取同步狀態的方法 tryAcquire返回值為boolean,這很容易理解;對於共享式同步元件來講,同一時刻可以有多個執行緒同時獲取到同步狀態,這也是“共享”的意義 所在。其待重寫的嘗試獲取同步狀態的方法tryAcquireShared返回值為int。

 protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

  1.當返回值大於0時,表示獲取同步狀態成功,同時還有剩餘同步狀態可供其他執行緒獲取;

  2.當返回值等於0時,表示獲取同步狀態成功,但沒有可用同步狀態了;

  3.當返回值小於0時,表示獲取同步狀態失敗。

  獲取同步狀態--acquireShared  

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)//返回值小於0,獲取同步狀態失敗,排隊去;獲取同步狀態成功,直接返回去幹自己的事兒。
            doAcquireShared(arg);
    }

  doAcquireShared

 1  private void doAcquireShared(int arg) {
 2         final Node node = addWaiter(Node.SHARED);//構造一個共享結點,新增到同步佇列尾部。若佇列初始為空,先新增一個無意義的傀儡結點,再將新節點新增到佇列尾部。
 3         boolean failed = true;//是否獲取成功
 4         try {
 5             boolean interrupted = false;//執行緒parking過程中是否被中斷過
 6             for (;;) {//死迴圈
 7                 final Node p = node.predecessor();//找到前驅結點
 8                 if (p == head) {//頭結點持有同步狀態,只有前驅是頭結點,才有機會嘗試獲取同步狀態
 9                     int r = tryAcquireShared(arg);//嘗試獲取同步裝填
10                     if (r >= 0) {//r>=0,獲取成功
11                         setHeadAndPropagate(node, r);//獲取成功就將當前結點設定為頭結點,若還有可用資源,傳播下去,也就是繼續喚醒後繼結點
12                         p.next = null; // 方便GC
13                         if (interrupted)
14                             selfInterrupt();
15                         failed = false;
16                         return;
17                     }
18                 }
19                 if (shouldParkAfterFailedAcquire(p, node) &&//是否能安心進入parking狀態
20                     parkAndCheckInterrupt())//阻塞執行緒
21                     interrupted = true;
22             }
23         } finally {
24             if (failed)
25                 cancelAcquire(node);
26         }
27     }

  大體邏輯與獨佔式的acquireQueued差距不大,只不過由於是共享式,會有多個執行緒同時獲取到執行緒,也可能同時釋放執行緒,空出很多同步狀態,所以當排隊中的老二獲取到同步狀態,如果還有可用資源,會繼續傳播下去。

  setHeadAndPropagate

 private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

  釋放同步狀態--releaseShared

 public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();//釋放同步狀態
            return true;
        }
        return false;
    }

 

  doReleaseShared

 private void doReleaseShared() {
        for (;;) {//死迴圈,共享模式,持有同步狀態的執行緒可能有多個,採用迴圈CAS保證執行緒安全
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;          
                    unparkSuccessor(h);//喚醒後繼結點
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                
            }
            if (h == head)              
                break;
        }
    }

  程式碼邏輯比較容易理解,需要注意的是,共享模式,釋放同步狀態也是多執行緒的,此處採用了CAS自旋來保證。

 

 

三 自定義同步元件的實現。

  編寫一個允許最大訪問量為3 的共享鎖。

public class ThreeLock implements Lock {
    private  Sync sync = new Sync(3); //最大訪問量為3

    private  static  class  Sync extends AbstractQueuedSynchronizer{

        public Sync(int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("count must >0");
            }
            setState(count);
        }
        //共享式的獲取鎖
        @Override
        protected int tryAcquireShared(int reduceCount) {
            for (;;){
                int count = getState();
                int newCount = count - reduceCount;
                if(newCount<0||compareAndSetState(count,newCount)){
                    return  newCount;
                }
            }
        }
        //共享鎖的釋放
        @Override
        protected boolean tryReleaseShared(int returnCount) {
            for (;;){
                int count = getState();
                int newCount = count+returnCount;
                if(compareAndSetState(count,newCount)){
                    return true;
                }
            }
        }
    }


    @Override
    public void lock() {
        sync.acquireShared(1);  //每次獲取鎖都是state-1
    }
    @Override
    public void unlock() {
        sync.releaseShared(1);    //每次釋放鎖都是state+1
    }
xxxx略去一些方法
}

 

測試:

public class TestThreeLock {
    public static void main(String[] args) {
      new TestThreeLock().test();
    }

    public  void test(){

        Lock threeLock = new ThreeLock();
        Runnable rn = new Runnable() {
            @Override
            public void run() {
                threeLock.lock();
                System.out.println(Thread.currentThread().getName());

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    threeLock.unlock();
                }
            }
        };
        
        for(int i=0;i<10;i++){
            new Thread(rn).start();
        }

    }
}

結果:執行緒名稱以每三個一組的形式打印出來,測試成功!

轉載:https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html