1. 程式人生 > >佇列同步器--AbstractQueuedSynchronizer

佇列同步器--AbstractQueuedSynchronizer

佇列同步器

佇列同步器 AbstractQueuedSynchronizer(以下簡稱同步器),是用來構建鎖或者其他同步元件的基礎框架,它使用了一個int成員變量表示同步狀態,通過內建的FIFO佇列來完成資源獲取執行緒的排隊工作。

同步器的主要使用方式是繼承,子類通過繼承同步器並實現它的抽象方法來管理同步狀態。

同步器是實現鎖(也可以是任意同步元件)的關鍵,在鎖的實現中聚合同步器,利用同步器實現鎖的語義。

重寫同步器指定的方法時,需要使用同步器提供的如下3個方法來訪問或修改同步狀態。

  • getState():獲取當前同步狀態。
  • setState(int newState):設定當前同步狀態。
  • compareAndSetState(int expect,int update):使用CAS設定當前狀態,該方法能夠保證狀態設定的原子性。

同步器可重寫的方法:

方法名稱 描述
protected boolean tryAcquire(int arg) 獨佔式獲取同步狀態,實現該方法需要查詢當前狀態是否符合預期,然後再進行CAS設定同步狀態
protected boolean tryRelease(int arg) 獨佔式釋放同步狀態,等待獲取同步狀態的執行緒將有機會獲取同步狀態
protected int tryAcquireShared(int arg) 共享式獲取同步狀態,返回大於等於0的值表示獲取成功,反之,獲取失敗
protected boolean tryReleaseShared(int arg) 共享式釋放同步狀態
protected boolean isHeldExclusively() 當前同步器是否在獨佔模式下被執行緒佔用,一般該方法表示是否被執行緒所獨佔

同步器提供的模板方法:

方法名稱 描述
void acquire(int arg) 獨佔式獲取同步狀態,如果當前執行緒獲取同步狀態成功,則由該方法返回,否則,將會進入同步佇列等待,該方法將會呼叫重寫的tryAcquire(int arg)方法
void acquireInterruptibly(int arg) 與acquire方法相同,但是該方法響應中斷,如果當前執行緒被中斷,該方法丟擲InterruptedException異常並返回
boolean tryAcquireNanos(int arg, long nanosTimeout) 在acquireInterruptibly的基礎上增加了超時限制

實現分析

1. 同步佇列

同步器依賴內部的同步佇列(一個FIFO雙向佇列)來完成同步狀態的管理,當前執行緒獲取同步狀態失敗時,同步器會將當前執行緒以及等待狀態等資訊構造成為一個節點(Node)並將其加入同步佇列,同時會阻塞當前執行緒,當同步狀態釋放時,會把首節點中的執行緒喚醒,使其再次嘗試獲取同步狀態。

同步佇列中的節點(Node)用來儲存獲取同步狀態失敗的執行緒引用、等待狀態以及前驅和後繼節點:

屬性型別與名稱 描述
int waitStatus 等待狀態 CANCELLED:1 SIGNAL:-1 CONDITION:-2 PROPAGATE:-3 INITIAL:0
Node prev 前驅節點
Node Next 後繼節點
Node NextWaiter
Thread thread 獲取同步狀態的執行緒

節點是構成同步佇列(等待佇列,在5.6節中將會介紹)的基礎,同步器擁有首節點(head)和尾節點(tail),沒有成功獲取同步狀態的執行緒將會成為節點加入該佇列的尾部,同步佇列的基本結構如圖所示:

image

同步器包含了兩個節點型別的引用,一個指向頭節點,而另一個指向尾節點。

當一個執行緒成功地獲取了同步狀態(或者鎖),其他執行緒將無法獲取到同步狀態,轉而被構造成為節點並加入到同步佇列中,而這個加入佇列的過程必須要保證執行緒安全,因此同步器提供了一個基於CAS的設定尾節點的方法:compareAndSetTail(Node expect,Node update),它需要傳遞當前執行緒“認為”的尾節點和當前節點,只有設定成功後,當前節點才正式與之前的尾節點建立關聯。同步器將節點加入到同步佇列的過程如圖:

image

同步佇列遵循FIFO,首節點是獲取同步狀態成功的節點,首節點的執行緒在釋放同步狀態時,將會喚醒後繼節點,而後繼節點將會在獲取同步狀態成功時將自己設定為首節點,該過程如圖:

image

設定首節點是通過獲取同步狀態成功的執行緒來完成的,由於只有一個執行緒能夠成功獲取到同步狀態,因此設定頭節點的方法並不需要使用CAS來保證,它只需要將首節點設定成為原首節點的後繼節點並斷開原首節點的next引用即可。

2. 獨佔式同步狀態獲取與釋放

通過呼叫同步器的acquire(int arg)方法可以獲取同步狀態,該方法對中斷不敏感,也就是由於執行緒獲取同步狀態失敗後進入同步佇列中,後續對執行緒進行中斷操作時,執行緒不會從同步佇列中移出,該方法程式碼如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上述程式碼主要完成了同步狀態獲取、節點構造、加入同步佇列以及在同步佇列中自旋等待的相關工作,其主要邏輯是:

首先呼叫自定義同步器實現的tryAcquire(int arg)方法,該方法保證執行緒安全的獲取同步狀態,如果同步狀態獲取失敗,則構造同步節點(獨佔式 Node.EXCLUSIVE,同一時刻只能有一個執行緒成功獲取同步狀態)並通過addWaiter(Node node) 方法將該節點加入到同步佇列的尾部,最後呼叫acquireQueued(Node node,int arg)方法,使得該節點以“死迴圈”的方式獲取同步狀態。如果獲取不到則阻塞節點中的執行緒,而被阻塞執行緒的喚醒主要依靠前驅節點的出隊或阻塞執行緒被中斷來實現。

首先是節點的構造以及加入同步佇列:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

上述程式碼通過使用 compareAndSetTail(Node expect,Node update) 方法來確保節點能夠被執行緒安全新增。試想一下:如果使用一個普通的LinkedList來維護節點之間的關係,那麼當一個執行緒獲取了同步狀態,而其他多個執行緒由於呼叫tryAcquire(int arg)方法獲取同步狀態失敗而併發地被新增到LinkedList時,LinkedList將難以保證Node的正確新增,最終的結果可能是節點的數量有偏差,而且順序也是混亂的。

在enq(final Node node)方法中,同步器通過“死迴圈”來保證節點的正確新增,在“死迴圈”中只有通過CAS將節點設定成為尾節點之後,當前執行緒才能從該方法返回,否則,當前執行緒不斷地嘗試設定。可以看出,enq(final Node node)方法將併發新增節點的請求通過CAS變得“序列化”了。

節點進入同步佇列之後,就進入了一個自旋的過程,每個節點(或者說每個執行緒)都在自省地觀察,當條件滿足,獲取到了同步狀態,就可以從這個自旋過程中退出,否則依舊留在這個自旋過程中(並會阻塞節點的執行緒):

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在acquireQueued(final Node node,int arg)方法中,當前執行緒在“死迴圈”中嘗試獲取同步狀態,而只有前驅節點是頭節點才能夠嘗試獲取同步狀態,這是為什麼?原因有兩個,如下:

  • 第一,頭節點是成功獲取到同步狀態的節點,而頭節點的執行緒釋放了同步狀態之後,將會喚醒其後繼節點,後繼節點的執行緒被喚醒後需要檢查自己的前驅節點是否是頭節點。
  • 第二,維護同步佇列的FIFO原則。

由於非首節點執行緒前驅節點出隊或者被中斷而從等待狀態返回,隨後檢查自己的前驅是否是頭節點,如果是則嘗試獲取同步狀態。可以看到節點和節點之間在迴圈檢查的過程中基本不相互通訊,而是簡單地判斷自己的前驅是否為頭節點,這樣就使得節點的釋放規則符合FIFO,並且也便於對過早通知的處理(過早通知是指前驅節點不是頭節點的執行緒由於中斷而被喚醒)。

獨佔式同步狀態獲取流程,也就是acquire(int arg)方法呼叫流程:

image

前驅節點為頭節點且能夠獲取同步狀態的判斷條件和執行緒進入等待狀態是獲取同步狀態的自旋過程。當同步狀態獲取成功之後,當前執行緒從acquire(int arg)方法返回,如果對於鎖這種併發元件而言,代表著當前執行緒獲取了鎖。

當前執行緒獲取同步狀態並執行了相應邏輯之後,就需要釋放同步狀態,使得後續節點能夠繼續獲取同步狀態。通過呼叫同步器的release(int arg)方法可以釋放同步狀態,該方法在釋放了同步狀態之後,會喚醒其後繼節點(進而使後繼節點重新嘗試獲取同步狀態)。該方法程式碼如圖所示:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

該方法執行時,會喚醒頭節點的後繼節點執行緒,unparkSuccessor(Node node)方法使用 LockSupport 來喚醒處於等待狀態的執行緒。

–《Java併發程式設計的藝術》部分總結