1. 程式人生 > >JDK中AbstractQueuedSynchronizer應用解析

JDK中AbstractQueuedSynchronizer應用解析

int 例子 分別是 進入 mit compare res 為什麽 bool

這個類首先是一個抽象類,定義了一個模板,很多java同步相關的類(ReetrantLock、Semaphore、CountDownLatch等)都是基於AbstractQueuedSynchronizer來實現的

AbstractQueuedSynchronizer

本身就是一個鏈表,提供的線程安全的操作。核心思想是通過CAS插入鏈表的尾部和獲取鏈表的頭結點。算法暫時先不談,先說說他的應用,主要下先說說他的一些模板方法。

AbstractQueuedSynchronizer#acquire()

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

其中tryAcquire()就是一個模板方法,當tryAcquire()返回false的時候,就會把當前線程封裝為Node(鏈表的數據結構),然後掛起(使用LockSuport.park())當前線程。

其實在從隊列中獲取頭節點的時候並恢復的時候,也會調用tryAcquire()方法,因為對一些非公平鎖可能被強占。

AbstractQueuedSynchronizer#release()

和acquire()對應的就是release()

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease()也是一個模板方法,由子類自己去實現。如果tryRelease()返回true的話,就會從鏈表中獲取頭節點(如果有),並恢復線程。 因此,java並發包裏很多都會用到這個來進行自定義。下面說舉一個例子說說明如何利用上面2個模板方法來實現並發的

ReentrantLock

ReentrantLock的並發控制是通過繼承AbstractQueuedSynchronizer()來實現並發的。

abstract static class Sync extends AbstractQueuedSynchronizer{
    ...
}

  

而公平鎖和非公平鎖又是繼承了Sync, 以默認的非公平鎖分析,首先來tryAcquire(),通過註釋可以看出基本步驟,如果要實現公平鎖也很簡單,在當前線程沒有占有的情況下,多增加一個判斷鏈表是否有等待的線程即可

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    // 0 表示當前沒有其他線程占用
    if (c == 0) {
        //通過cas的方式來設置state,如果設置成功,則認為當前線程可以獲取鎖,cas失敗則任務其他線程比他快一步占用
        if (compareAndSetState(0, acquires)) {
            //設置當前線程獨占
            setExclusiveOwnerThread(current);
            //返回true,即不需要放入等待的鏈表中
            return true;
        }
    }
    //如果當前線程本來就占有鎖
    else if (current == getExclusiveOwnerThread()) {
        //占用的次數+1,這也是為什麽 lock()和unlock()需要一一對應
        int nextc = c + acquires;
        //說明同一個線程lock()的次數為int的最大值
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 設置state,返回獲取鎖成功
        setState(nextc);
        return true;
    }
    return false;
}

  

再來看看tryRelease()

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

  

因為釋放鎖的時候並沒有並發情況,只要保證當state為0的時候,設置獨占鎖為null並返回true即可。代碼很簡單。

小結

有人看到這裏覺得,ReentrantLock是通過lock()和unlock()方法來加鎖解鎖的,和你說的acquire()和release()有什麽關系呢?其實lock()的核心就是調用acquire(1),unlock()就是調用release(1)

AbstractQueuedSynchronizer#acquireShared()

acquireShared()和acquire()類似,只不過在調用的是

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

  

tryAcquireShared()是模板方法,可以簡單的理解為是把boolean類型換位int類型的acquire()

AbstractQueuedSynchronizer#releaseShared()

和acquireShared()對應acquire(),releaseShared()就是對應release(),就不在贅述

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

  

下面再舉個例子說明acquireShared()和releaseShared()的使用

Semaphore

Semaphore就是允許指定數量的線程進行入臨界區。看看是什麽實現,其實Semaphore的內部結構和ReentrantLock差不多,主要就是看看他是如何利用AbstractQueuedSynchronizer來實現並發的。首先看看如何利用tryAcquireShared()控制並發線程數的。

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

  

是不是很簡單,就是利用CAS來改變available值來控制,而available就是我們自己定義的能夠進入臨界區的最大線程數 那麽releaseShared()是怎麽實現的,其實簡單一想也應該是增加available值

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

 

## 小節
可以看出AbstractQueuedSynchronizer主要用分為共享鎖和獨占鎖,對於共享鎖就像是一個計數器,每次獲取鎖的時候計數器-1,釋放的時候計數器+1。

ReentrantReadWriteLock

ReentrantReadWriteLock是讀寫鎖,每個讀鎖之間是不競爭鎖的,但讀鎖和寫鎖、寫鎖和寫鎖之間是競爭關系。這個並發類就是綜合利用上面所說的內容。首先看一下內部結構

private final ReentrantReadWriteLock.ReadLock readerLock;

private final ReentrantReadWriteLock.WriteLock writerLock;

final Sync sync;

  

成員變量分別是讀鎖和寫鎖(內部類)和一個繼承了AbstractQueuedSynchronizer的Sync類,如果理解了上面的ReentrantLock和Semaphore,我們自己進行推敲一下,應該是讀鎖應該是和Semaphore類似,而寫鎖應該是和ReentrantLock類似。雖然這麽說,還是有許多細節需要註意的:

  • 如果保證非公平鎖下,寫鎖不會餓死?
  • 一個AbstractQueuedSynchronizer只有一個state字段,那麽是如何保存讀寫的數量和寫鎖的數量呢?
  • 對於讀鎖,如何知道當前自己持有多少個單位的鎖呢

上面都是需要在設計讀寫鎖需要考慮的地方,相信你看了源碼就會知道答案!

JDK中AbstractQueuedSynchronizer應用解析