1. 程式人生 > >Java 併發 ---AbstractQueuedSynchronizer(同步器)-獨佔模式

Java 併發 ---AbstractQueuedSynchronizer(同步器)-獨佔模式

基於jdk 1.8

前面我們瞭解到可以通過synchronized關鍵字實現鎖功能,使用該關鍵字不需要我們顯式的獲取和釋放鎖,操作很方便簡單,但是如果我們需要對鎖進行操作或者干預,那麼這個是沒有辦法的。
在Lock接口出現之前,Java程式是靠synchronized關鍵字實現鎖功能的,而Java SE5之後,併發包中新增了Lock(和相關實現類)用來實現鎖功能,來看看兩者有什麼區別:

  • Lock可以讓等待鎖的執行緒響應中斷,而synchronized卻不行,使用synchronized時,等待的執行緒會一直等待下去,不能夠響應中斷;
  • lock可以嘗試獲取鎖,如果鎖被其他執行緒持有,則返回 false,不會使當前執行緒休眠(嘗試非阻塞獲取鎖)。
  • lock 可以 超時獲取鎖。
  • synchronized 會自動釋放鎖,lock 則不會自動釋放鎖。
  • Lock可以提高多個執行緒進行讀操作的效率。

Java Lock介面:

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

lock(),unlock()

lock() 可以用於進行加鎖,unlock()當然是釋放鎖,需要注意, lock不會像 synchronized 那樣自動釋放鎖 ,所以: 一定要放在 try-finally塊中,保證鎖的釋放。 例如:

try {
    lock.lock();
    ......
} finally {
    lock.unlock();  
}

tryLock()

  • tryLock():嘗試獲得鎖,如果成功,返回 true,否則,返回 false。
  • tryLock(long time,TimeUnit unit):超時的獲取鎖,當前執行緒在下面3種情況下會返回:
    • 當前執行緒在超時時間內獲得了鎖。
    • 當前執行緒在超時時間內被中斷。
    • 超時時間結束,返回false。

lockInterruptibly 方法(後面會單獨講中斷)

  • 當通過這個方法去獲取鎖時,如果執行緒正在等待獲取鎖,則這個執行緒能夠響應中斷,例如當兩個執行緒同時通過lock.lockInterruptibly()想獲取某個鎖時,假若執行緒A獲取到了鎖,而執行緒B在等待,那麼對執行緒B呼叫threadB.interrupt()方法能夠中斷執行緒B的等待過程,讓其返回。
  • 用synchronized修飾的話,當一個執行緒處於等待某個鎖的狀態,是無法被中斷的,只有一直等待下去。
    newCondition()

newCondition()

用於獲取一個 Conodition 物件,使得某個,或者某些執行緒一起等待某個條件(Condition),只有當該條件具備( signal 或者 signalAll方法被帶呼叫)時 ,這些等待執行緒才會被喚醒,從而重新爭奪鎖。

lock 方法大體就介紹到這裡,通過lock介面分析,我們知道了,通過lock的介面方法,可以實現鎖的獲取,釋放等操作,這些介面都是統一的,對使用者(開發者)來說都是透明的,不需要明白底層實現細節就可以實現呼叫,但是作為開發者僅僅會用也太沒意思了,我們來看看lock 有哪些實現類:
這裡寫圖片描述
我們暫且不討論每個實現類,我們看看ReentrantLock(可重入鎖,synchronized可以重入嗎?)會發現該類裡面有個重要的內部類Syn:
這裡寫圖片描述
Syn繼承AbstractQueuedSynchronizer(同步器) ,而它就是我們今天討論的重點,Lock介面的實現基本都是通過聚合一個同步器的子類來完成執行緒訪問控制的。

佇列同步器 AbstractQueuedSynchronizer 使用來構建鎖或者其他同步元件的基礎框架:

  • 使用一個int成員變量表示同步狀態
  • 通過內建的FIFO佇列來完成資源獲取執行緒的排隊工作

同步器的設計是基於模板方法模式,也就是說,使用者需要繼承同步器並重寫指定的方法,隨後將同步器組合在自定義同步元件的實現中,並呼叫同步器提供的模板方法,而這些模板方法將會呼叫使用者重寫的方法
重寫同步器指定的方法時,需要使用同步器提供的如下三個方法來訪問或者修改同步狀態:

  • getState():獲取當前同步狀態。
  • setState():設定當前同步狀態。
  • compareAndSetState():使用CAS設定當前狀態。

同步器可重寫的方法:

方法名稱 描述
tryAcquire(int arg) 獨佔獲取同步狀態,實現該方法需要查詢當前狀態,並判斷同步狀態是否符合預期狀態,然後再進行CAS設定同步狀態。
treRelease(int arg) 獨佔式釋放同步狀態,等待獲取同步狀態的執行緒將有機會獲取同步狀態
tryAcquireShared(int arg) 共享式獲取同步狀態,返回大於等於0的值,表示獲取成功,反之失敗
tryReleaseShared(int arg) 共享式釋放同步狀態
isHeldExclusively() 當前同步器是否在獨佔模式下被執行緒佔用,一般該方法表示是否被當前執行緒所獨佔

同步器提供的模板方法:
(1)void acquire(int arg)

  • 獨佔式獲取同步狀態,如果當前執行緒獲取同步狀態成功,則由該方法返回,否則,將會進入同步佇列等待,該方法將會呼叫重寫的tryAcquire(intarg)方法。

(2)void acquireInterruptibly(int arg)

  • 與acquire(int arg)相同,但是該方法響應中斷,當前執行緒未獲取到同步狀態而進入同步佇列中,如果當前執行緒被中斷,則該方法會丟擲InterruptedException並返回

(3)boolean tryAcquireNanos(int arg,long nanos)

  • 在acquireInterruptibly(int arg)基礎上增加了超時限制,如果當前執行緒在超時時間內沒有獲取到同步狀態,那麼就會返回false,如果獲取到就返回true

(4)void acquireShared(int arg)

  • 共享式的獲取同步狀態,如果當前執行緒未獲取到同步狀態,將會進入同步佇列等待,與獨佔式獲取的主要區別是在同一時刻可以有多個執行緒獲取到同步狀態。

(5)void acquireSharedInterruptibly(int arg)

  • 與acquireShared(int arg)相同,該方法響應中斷。

(6)boolean tryAcquireSharedNanos(int arg,long nanos)

  • 在acquireSharedInterruptibly(int arg)基礎上增加了超時限制。

(7)boolean release(int arg)

  • 獨佔式的釋放同步狀態,該方法會在釋放同步狀態之後,將同步佇列中第一個節點包含的執行緒喚醒。

(8)boolean releaseShared(int arg)

  • 共享式的釋放同步狀態

(9)CollectiongetQueuedThreads()

  • 獲取等待在同步佇列上的執行緒集合

到這裡我們就可以根據前面的知識自己寫一個同步元件WriteLock,用於對某個資源”寫”的訪問:

public class WriteLock implements Lock {

    private  static  Sync sync=new Sync();

    private  static   class  Sync extends AbstractQueuedSynchronizer{

        @Override
        protected boolean tryAcquire(int acquires) {

            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg)  {
            if (getState()==0)
                throw  new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;

        }

        public Condition newCondition() {
            return new ConditionObject();
        }

        /*
           是否處於獨佔狀態
         */
        @Override
        protected boolean isHeldExclusively() {
            return  getState()==1;
        }
    }
    public void lock() {
        sync.acquire(1);
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock() {
        return  sync.tryAcquire(1);
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }


}

整體過程很簡單(因為是獨佔模式,所以狀態只有0和1):

  • 繼承同步器,重寫裡面的指定方法,其實就是對同步狀態安裝自己的需求進行修改和判斷。
  • 組合Sync,在WriteLock類中,提供對外介面,對外介面實際依賴Sync類中的方法。

簡單測試一下,模擬了100個執行緒對一個非執行緒安全的集合的操作

public class TestMyLock {

    public  static List<String>list=new ArrayList<>();

    static  WriteLock writeLock=new WriteLock();
    static ReadLock readLock=new ReadLock(4);

    static  class  Task implements  Runnable{

        @Override
        public void run() {

            for (int i=0;i<100;i++){
//                writeLock.lock();
                list.add(Thread.currentThread().getName()+"---"+i);
//                writeLock.unlock();
            }
        }
    }
    public static void main(String[] args) throws  Exception{

        List<Thread>list=new ArrayList<>();

        Task task=new Task();
        for (int i=0;i<100;i++){
            list.add(new Thread(task,"thread"+i));
        }
        for (int i=0;i<100;i++){
            list.get(i).start();
        }
        for (int i=0;i<100;i++){
            list.get(i).join();
        }
        System.out.println(TestMyLock.list.size());

    }
}

如果不加鎖,那麼總的大小可能不會是100*100,加鎖後,總的集合大小就是100*100

佇列同步器的實現分析(獨佔模式)

同步器依賴內部的同步佇列(一個FIFO雙向佇列)來完成同步狀態的管理,當前執行緒獲取同步狀態失敗時,同步器會將當前執行緒以及等待狀態資訊構造成一個節點(Node)並加入到同步佇列,同時會阻塞當前執行緒,當同步狀態釋放時,會把首節點中的執行緒喚醒,使其再次嘗試獲取同步狀態

定義

static final class Node {

    //表示當前執行緒處於共享狀態
    static final Node SHARED = new Node();

    //獨佔狀態
    static final Node EXCLUSIVE = null;

    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
    volatile int waitStatus;

    //前驅節點
    volatile Node prev;

    //後繼節點
    volatile Node next;

    //獲取同步狀態的執行緒
    volatile Thread thread;
    //指向下一個處於阻塞等待的節點
    Node nextWaiter;
        ...
}

waitStatus變數,用於描述節點當前的狀態,一共有5種狀態:

  • CANCELLED 取消狀態(需要從同步佇列中取消等待)
  • SIGNAL 等待觸發狀態(後繼節點的執行緒處於等待狀態,如果當前執行緒釋放了同步狀態或者被取消,將會通知後繼節點
  • CONDITION 等待條件狀態(節點在等待佇列(不是同步佇列)中,等待在Condition上,當其他執行緒對Condition呼叫signal()後,該節點會從等待佇列中轉移到同步佇列中)
  • PROPAGATE 狀態需要向後傳播
  • INITIAL,值為0 ,初始狀態

同步佇列的基本結構

這裡寫圖片描述
同步器包含了兩個節點型別的引用,一個指向頭節點,一個指向尾節點,一個雙向連結串列,當執行緒獲取同步狀態失敗時,會被構造成一個Node,然後加入到佇列尾中。
同步佇列遵循FIFO,首節點是獲取同步狀態成功的節點,首節點的執行緒在釋放同步狀態時,會喚醒後繼節點,而後繼節點將會在獲取同步狀態成功時將自己設定為首節點

獨佔式同步狀態獲取

在我們WriteLock中,通過呼叫lock方法,進行獨佔式同步狀態獲取

    public void lock() {
        sync.acquire(1);
    }

內部呼叫了son中的acquire()方法,這個方法是同步器中的方法

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

主要邏輯:
(1)首先呼叫我們自定義同步器實現的tryAcquire(int arg)方法。

        protected boolean tryAcquire(int acquires) {
            if (compareAndSetState(0, acquires)) {
             setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

該方法保證執行緒安全的獲取同步狀態(CAS),如果獲取失敗則返回false,如果獲取成功,因為這個是獨佔式的,因此需要設定當前獨佔執行緒。返回true。
(2)如果獲取同步狀態成功,那麼獲取鎖成功,如果獲取失敗,則需要執行:acquireQueued(addWaiter(Node.EXCLUSIVE), arg),它由兩步構成:

  • addWaiter,新增一個等待者
  • acquireQueued,嘗試從等待佇列中去獲取執行一次acquire動作
    分別看一下每一步做了什麼。

addWaiter

傳入的引數Node.EXCLUSIVE,我們知道這是獨佔模式:

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

首先生成一個與當前執行緒相關的節點(node),然後獲得當前資料結構中的尾節點:
(1)如果有尾節點(佇列不為空),新生成的node的前驅節點指向尾節點,再更新尾節點(設定當前尾節點為新節點node),因為這裡存在併發,因此需要通過CAS來設定,設定成功後,新的node節點成為了尾節點,那麼更新原來的尾節點的next域(指向新的尾節點 node)。假如當前節點沒有被設定為尾節點,那麼執行enq方法。
(2)如果尾節點為空(佇列為空)則執行enq方法。

能執行enq方法:佇列為空,或者設定當前節點為尾節點失敗。

    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

這段程式碼的邏輯為:
(1)如果尾節點為空,也就是佇列為空,那麼new一個不帶任何狀態的Node作為頭節點,此時該節點既是尾節點也是頭結點,然後回到迴圈開頭再執行。
(2)如果尾節點不為空,那麼併發下使用CAS演算法將當前Node追加成為尾節點,由於是一個死迴圈,因此所有沒有成功acquire的Node最終都會被追加到同步佇列中

以上過程流程圖展示如下:
這裡寫圖片描述

acquireQueued

節點進入同步佇列之後,需要讓佇列中符合條件的節點獲取同步狀態。

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

當前執行緒在死迴圈中嘗試獲取同步狀態:
(1)獲取當前執行緒節點的前驅節點(要麼是head 要麼是其它節點)

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

(2)如果前驅節點是head,那麼tryAcquire(嘗試獲取同步狀態),如果獲取成功,設定當前獲取成功的節點為head,將head 的next域置空(原head會被回收)。返回是否被中斷。

(3)如果前驅節點不是head,或者是head但是獲取同步狀態失敗,那麼需要判斷當前執行緒是否需要被掛起(執行shouldParkAfterFailedAcquire),我們來看看shouldParkAfterFailedAcquire

    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

這裡需要判斷它前驅節點的狀態,如果:
(1)它的前驅節點是SIGNAL狀態的,返回true,表示當前節點應當被掛起(阻塞),否則以下都返回false,暫時不忙掛起,以為後序在操作的過程中,同步狀態可能已經改變。
(2)它的前驅節點的waitStatus>0,相當於CANCELLED(因為狀態值裡面只有CANCELLED是大於0的,表示該執行緒需要從同步佇列中取消等待),那麼CANCELLED的節點作廢,當前節點不斷向前找並重新連線為雙向佇列,直到找到一個前驅節點waitStats不是CANCELLED的為止
(3)它的前驅節點不是SIGNAL狀態且waitStatus<=0,也就是waitStatus 為 0 or PROPAGATE,也就是說,前面還有執行緒在等待,那麼目前該節點是需要等待的,如果阻塞了需要前面節點喚醒,那麼前面節點如何知道才需不需要喚醒後繼節點呢,那麼就是利用CAS機制,更新前驅節點等待狀態為SIGNAL狀態。那麼前驅節點在釋放同步狀態是通過判斷狀態就可知道需不需要喚醒後繼節點了。

如果一直獲取不到同步狀態,那麼執行緒執行(可能會多次) shouldParkAfterFailedAcquire就會返回true,表示該執行緒應該被掛起,接下來執行parkAndCheckInterrupt();

    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

LockSupport.park(this)會掛起當前執行緒,遇到下列情況之一返回:

  1. 其他某個執行緒將當前執行緒作為目標呼叫 unpark
  2. 其他某個執行緒中斷當前執行緒
  3. 該呼叫不合邏輯,返回。

好,我們再回到acquireQueued 方法中,如果當前現在在阻塞後返回,如果在是被中斷的,那麼就會記錄(interrupted = true),返回後重新進行執行上面邏輯獲取同步狀態。
如果是在中斷後成功獲取到同步狀態那麼就會執行selfInterrupt()。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    /**
     * Convenience method to interrupt current thread.
     */
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

該方法將會自己中斷自己(中斷方法只是設定中斷標誌位為true,並不發生異常,便於呼叫者通過中斷標誌位來判斷是否被中斷)。

到這裡我們可以知道,同步器的acquire(int arg)方法對中斷不敏感,也就是說對執行緒中斷操作時,執行緒不會從同步佇列中移除(只是會再次獲取同步狀態,如果不符合條件容易會被再次加入佇列),但是在成功獲取到同步狀態後,我們可以檢視該執行緒的中斷標誌位(isInterrupted()),知道該執行緒是否被中斷過。

上面整個流程如下:
這裡寫圖片描述

獨佔模式同步狀態的獲取我們已經分析完了,

小總結

(1)等待佇列是FIFO先進先出。
(2)在掛起該執行緒前,會先設定前驅節點的狀態為SIGNAL。
(3)只有前一個節點的狀態為SIGNAL時,當前節點的執行緒才能被掛起(不就是第二點嘛)
(4)將節點加入同步佇列時,使用了無限迴圈,並使用CAS,保證了節點會被執行緒安全的新增到尾節點上。
(5)只有前驅節點是頭節點,才能夠嘗試獲取同步狀態(說明佇列是FIFO)
(6)加入同步佇列後,並不是立即掛起,而是再次進行獲取同步狀態, 到掛起之前都是在自旋(無限迴圈嘗試),因為同步狀態的變化很快,執行緒上下文的切換比較耗時,所以用短暫的自旋來換取時間開銷,當然如果一直自旋,那麼開銷反而大於了執行緒切換。所以把自旋時間(次數)控制在一定範圍有利於提高效能。

下面是整個獨佔模式獲取同步狀態的流程,個人覺得流程圖有助於把知識點串起來,只是單個分析某個功能,就算弄明白,但是整個還是會感覺有點模糊,而且這樣都是碎片段,不容易理清思路
這裡寫圖片描述

獨佔式同步狀態釋放

看完獲取我們在來看看如何釋放的

    public void unlock() {
        sync.release(1);
    }

同樣 釋放同步狀態內部依賴於sync的realease 方法

    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

同步器中的release 方法將會呼叫我們的tryRelease 方法

        protected boolean tryRelease(int arg)  {
            if (getState()==0)
                throw  new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;

        }

如果同步狀態已經為0了,表示狀態錯誤,不需要再釋放,否則設定獨佔執行緒為NULL,並將狀態重新設定為0,這裡我們設定狀態為什麼沒有用CAS呢?,這樣是執行緒安全的嗎?,因為這是獨佔模式,那麼就只有一個執行緒能獲取到鎖(獲取鎖的過程是競爭的,所以需要CAS),所以釋放的時候肯定也是隻有一個一個執行緒的釋放,不存在競爭,所以不需要用CAS。

如果當前執行緒釋放成功了,那麼我們前面說了,需要喚醒後繼執行緒,當執行緒獲取同步狀態成功後,會將自己設定為頭節點,因此這裡只需要從head 開始,如果head的狀態<0,則可以進行喚醒工作。

    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

如果head 節點狀態小於0,那麼恢復成初始值0,設定失敗了也沒關係,個人理解,當前head節點已經釋放了同步狀態,喚醒後繼狀態的過程中了,其等待狀態不會影響到後面的行為(如果不對,歡迎指正)。
將head後面的節點進行喚醒即可,但是後面的節點(執行緒)可能已經被取消了(CANCELLED),因此不需要將等待狀態大於0的執行緒喚醒了,這裡是從後往前找離head最近的需要被喚醒的節點(為什麼需要從後往前找呢?不直接從前面開始找),然後對它進行unpark。喚醒後就會進行我們上面的分析的那樣重新進入同步狀態的競爭中。

獨佔式同步鎖的獲取和釋基本上分析得差不多了,不知道我表述明白了沒有,獨佔式還有其他幾個方法:獨佔式超時獲取同步狀態,可中斷的獲取同步鎖,其實這個兩個就是在上面的獲取同步狀態上做了一點小手腳,詳細分析應該沒必要了吧,就簡單看一下就可以了。

可中斷的獲取同步鎖,獨佔式超時獲取同步狀態

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

這個tryAcquire 的差別在於,如果當前執行緒被中斷,那麼會丟擲中斷異常,在看看doAcquireInterruptibly 和acquireQueued差別如下:

   if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
       throw new InterruptedException();

想想acquireQueued 裡面怎麼做的? acquireQueued中如果當前執行緒被中斷,那麼會標記該執行緒被中斷,但是會繼續進行同步狀態競爭,如果失敗又會加入到同步佇列中,也就是隻是記錄是否被中斷,不會告訴使用者哪個時候中斷的。而doAcquireInterruptibly 裡面一旦被中斷,那麼就會丟擲中斷異常,不會再進行同步狀態的競爭了,so 就是這樣了,明白了吧。

至於超時中斷可以被視作響應中斷獲取同步狀態過程的加強版,針對超時獲取,先計算最後期限deadline,deadline=系統當前時間+nanosTimeout(超時時間),當執行緒喚醒後用deadline-系統當前時間,如果小於0,那麼超時,否則還需要睡眠nanosTimeout = deadline - 系統當前時間。

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

回到自己實現的WriteLock中,我們看到在Syn內部都是設定狀態為0或者1,這個是因為這個是獨佔式同步器元件,如果我們狀態不止0,1呢,那豈不是、就不是獨佔了,是的,其實我們可以控制狀態,控制併發執行緒數,類似Semaphore(不知道?沒關係)。

public class ReadLock implements Lock {

    private    Sync sync;

    private static int count;

    public  ReadLock(int count){
        this.count=count;
        sync=new Sync(count);
    }
    private  static   class  Sync extends AbstractQueuedSynchronizer {

        public Sync(int count){
            setState(count);
        }
        @Override
        protected boolean tryAcquire(int acquires) {
            int c=getState();
            if (c==0)
                return false;
            if (compareAndSetState(c, c-acquires)) {
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg)  {
            int c=getState();
            if (c==count)
                throw  new IllegalMonitorStateException();
            if (compareAndSetState(c, c+arg)) {
                return true;
            }
            return false;

        }
        public Condition newCondition() {
            return new ConditionObject();
        }
    }
    public void lock() {
        sync.acquire(1);
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock() {
        return  sync.tryAcquire(1);
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

}

在上面程式碼中,我們不在是獨佔式獲取同步狀態了,我們可以多個執行緒獲取同步狀態,具體有好多個執行緒可以獲取到呢(count個)。其實就是把state初始值設定成了count ,每次有執行緒獲取同步狀態時,就把state-1,如果state=0,則不能再繼續獲取了,獲取失敗。釋放的時候則把state+1即可。仔細觀察,我們發現在WriteLock和ReadLock裡面的tryRelease設定state是不一樣的,在WriteLock因為是獨佔式的,所以不需要用CAS來保證執行緒安全,但是在ReadLock中,因為可能多個執行緒會同時釋放,因此我們需要用CAS來保證執行緒安全。

public class TestMyLock {

    static  WriteLock writeLock=new WriteLock();
    static ReadLock readLock=new ReadLock(4);

    static  class  Task implements  Runnable{

        @Override
        public void run() {
            readLock.lock();
            System.out.println(Thread.currentThread().getName()+" 進入lock");
            try {

                Thread.sleep(1500);
            }catch (Exception e){
                e.printStackTrace();
            }
            readLock.unlock();

            System.out.println(Thread.currentThread().getName()+" 釋放lock");

        }
    }

    public static void main(String[] args) throws  Exception{


        List<Thread>list=new ArrayList<>();
        Task task=new Task();
        for (int i=0;i<10;i++){
            list.add(new Thread(task,"thread"+i));
        }
        for (int i=0;i<10;i++){
            list.get(i).start();
        }
        for (int i=0;i<10;i++){
            list.get(i).join();
        }
    }

}

簡單測試了一下,設定了併發數為4,模擬了10個執行緒去獲取鎖的過程。
這裡寫圖片描述

其實在同步器佇列中有共享式同步狀態獲取與釋放,其實現方式和獨佔式大同小異,下篇部落格將分析共享式與基於Condition的等待/通知機制實現。

最後,以上分析因為知識有限,不保證完全正確,歡迎指出問題。

相關推薦

Java 併發 ---AbstractQueuedSynchronizer(同步)-獨佔模式

基於jdk 1.8 前面我們瞭解到可以通過synchronized關鍵字實現鎖功能,使用該關鍵字不需要我們顯式的獲取和釋放鎖,操作很方便簡單,但是如果我們需要對鎖進行操作或者干預,那麼這個是沒有辦法的。 在Lock接口出現之前,Java程式是靠synchro

java併發同步

Java concurrent包中有提供多種同步器,訊號量(Semaphore)、計數栓(CountDownLatch)、迴圈屏障(CyclicBarrier)、交換器(Exchanger)、Phaser 一、 Semaphore同步器 特徵: 1. 經典的訊號量,通過計

Java 中佇列同步 AQS(AbstractQueuedSynchronizer)實現原理

#### 前言 在 Java 中通過鎖來控制多個執行緒對共享資源的訪問,使用 Java 程式語言開發的朋友都知道,可以通過 synchronized 關鍵字來實現鎖的功能,它可以隱式的獲取鎖,也就是說我們使用該關鍵字並不需要去關心鎖的獲取和釋放過程,但是在提供方便的同時也意味著其靈活性的下降。例如,有這樣的一

Java併發程式設計中的設計模式解析(二)一個單例的七種寫法

Java單例模式是最常見的設計模式之一,廣泛應用於各種框架、中介軟體和應用開發中。單例模式實現起來比較簡單,基本是每個Java工程師都能信手拈來的,本文將結合多執行緒、類的載入等知識,系統地介紹一下單例模式的演變,並體現在7種不同的單例設計中。說到這個,非常像孔乙己裡那個“回字有四種寫法”的梗,不過與封建迂腐

深入理解Java中的同步靜態方法和synchronized(class)程式碼塊的類鎖 深入理解Java併發synchronized同步化的程式碼塊不是this物件時的操作

一.回顧學習內容  在前面幾篇部落格中我我們已經理解了synchronized物件鎖、物件鎖的重入、synchronized方法塊、synchronized非本物件的程式碼塊,  連結:https://www.cnblogs.com/SAM-CJM/category/1314992.h

深入理解Java併發synchronized同步化的程式碼塊不是this物件時的操作

一.明確一點synchronized同步的是物件不是方法也不是程式碼塊   我有關synchronized同步的是物件討論的部落格在這裡:https://www.cnblogs.com/SAM-CJM/p/9798263.html  只要明確了synchroni

Java併發-從同步容器到併發容器

引言 容器是Java基礎類庫中使用頻率最高的一部分,Java集合包中提供了大量的容器類來幫組我們簡化開發,我前面的文章中對Java集合包中的關鍵容器進行過一個系列的分析,但這些集合類都是非執行緒安全的,即在多執行緒的環境下,都需要其他額外的手段來保證資料的正確性,最簡單的就是通過synchronized關鍵

Java併發同步&非同步(個人屌絲版,有點亂,可以噴)

0、併發的概念挺多的,對作業系統越是不熟,坑越多,那乾脆對一些常見詞彙先掃盲掃盲吧。   1、併發 白話文:一段時間內,多個執行緒、或程序同時執行(其實cpu切片不會同時,但是切換很快,像快槍手)   2、同步基本概念 比如有兩個任務,那麼他們是按照先

java併發之----實現生產者/消費者模式(操作值&一對一交替列印)

一、實現生產者/消費者模式 1、一生產與一消費:操作值 利用synchronized 實現,程式碼如下: public class Producer { private String lock; public Producer(String lock){ this.loc

Java併發--互斥同步--Java兩種鎖機制synchronized和ReentrantLock詳解

Java 提供了兩種鎖機制來控制多個執行緒對共享資源的互斥訪問,第一個是 JVM 實現的 synchronized,而另一個是 JDK 實現的 ReentrantLock。 synchronized 1. 同步一個程式碼塊 public void func() {

java 併發同步 CountDownLatch, CyclicBarrier, Semaphore

java 執行緒併發包 通常為java.util.concurrent 下的包 執行緒包提供的同步結構主要有三個 CountDownLatch CyclicBarrier Semaphore C

Java併發——Phaser “階段

1. Phaser Phaser是一個更加彈性的同步屏障。類java.util.concurrent實現了Phaser. 這段文字轉自:https://blog.csdn.net/u010739551/article/details/51083004  Phaser表

Java併發程式設計-同步輔助類之CountDownLatch

操作方法建構函式CountDownLatch(int count),count表示要等待的運算元的數目。await()方法,阻塞等待,需要其他執行緒完成期待的操作,直到count為0。countDown()方法,當某一個操作完成後,呼叫此方法,count數減一。CountDo

Java併發同步總結

Java中提供併發控制的兩種方式:1、同步關鍵字 2、鎖 Java 5.0之前使用的是同步關鍵字Synchronized和volatile,他們是jvm中的隱式鎖 Synchronized和volatile的實現是基於jvm指令的同步程式碼塊實現的。 新增

Java併發程式設計-同步輔助類之CyclicBarrier

在上一篇文章中我們介紹了同步輔助類CountDownLatch,在Java concurrent包下還有另一個同步輔助類CyclicBarrier與CountDownLatch非常類似,它也允許多個執行緒在某個點進行同步,但CyclicBarrier類更加強大。CyclicB

Java併發程式設計鎖之獨佔公平鎖與非公平鎖比較

Java併發程式設計鎖之獨佔公平鎖與非公平鎖比較 公平鎖和非公平鎖理解: 在上一篇文章中,我們知道了非公平鎖。其實Java中還存在著公平鎖呢。公平二字怎麼理解呢?和我們現實理解是一樣的。大家去排隊本著先來先得到的原則,在排隊中,無論身份貴賤,一律平等對待。這是就是我們現實生活中的公平。大家都喜歡公平的。但是在

【搞定Java併發程式設計】第16篇:佇列同步AQS原始碼分析之獨佔模式

AQS系列文章: 1、佇列同步器AQS原始碼分析之概要分析 2、佇列同步器AQS原始碼分析之獨佔模式 3、佇列同步器AQS原始碼分析之共享模式 4、佇列同步器AQS原始碼分析之Condition介面、等待佇列 本文主要講解佇列同步器AQS的獨佔模式:主要分為獨佔式同步狀態獲取

【搞定Java併發程式設計】第17篇:佇列同步AQS原始碼分析之共享模式

AQS系列文章: 1、佇列同步器AQS原始碼分析之概要分析 2、佇列同步器AQS原始碼分析之獨佔模式 3、佇列同步器AQS原始碼分析之共享模式 4、佇列同步器AQS原始碼分析之Condition介面、等待佇列 通過上一篇文章的的分析,我們知道獨佔模式獲取同步狀態(或者說獲取鎖

Java併發程式設計-佇列同步(AbstractQueuedSynchronizer)

章節目錄 Lock介面與Synchronized的區別及特性 佇列同步器的介面與自定義鎖示例 佇列同步器的實現分析 1.Lock介面與Synchronized的區別及特性 特性 描述 嘗試非阻塞性的獲取鎖 當前執行緒嘗試獲取鎖(自旋獲取鎖),如果這一時刻鎖沒有被其他執行緒獲取到,則成

[Java併發] AQS抽象佇列同步原始碼解析--獨佔鎖釋放過程

[Java併發] AQS抽象佇列同步器原始碼解析--獨佔鎖獲取過程 上一篇已經講解了AQS獨佔鎖的獲取過程,接下來就是對AQS獨佔鎖的釋放過程進行詳細的分析說明,廢話不多說,直接進入正文... 鎖釋放入口release(int arg) 首先進行說明下,能夠正常執行到release方法這裡來的執行緒都是獲取到