1. 程式人生 > >併發程式設計AQS----共享鎖

併發程式設計AQS----共享鎖

Semaphore

Semaphore 字面意思是訊號量的意思,它的作用是控制訪問特定資源的執行緒數目。應用場景:資源訪問,服務限流。 Semaphore 實現AbstractQueuedSynchronizer的方法與ReentrantLock一樣

 

 Semaphore構造方法

public Semaphore(int permits) {------permits 表示能同時有多少個執行緒訪問我們的資源
        sync = new NonfairSync(permits); -------------預設建立的是非公平鎖。
    }


abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;

Sync(int permits) {
setState(permits);-------傳入的permits做i為了state的值,作為資源總數

}
 
semaphore.acquire();獲取資源,原始碼實現
 public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);---------每次申請一次資源
    }
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();-----------------執行緒無效直接拋異常
if (tryAcquireShared(arg) < 0) --------------------拿不到資源,需要進行入隊操作
doAcquireSharedInterruptibly(arg); ---------入隊操作
}
final int nonfairTryAcquireShared(int acquires) {  --------獲取資源的操作
for (;;) {
int available = getState(); --------------拿到現有的資源
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) -----------原子操作,多執行緒情況下會可能失敗,所以無線迴圈自旋下去,直到成功;
return remaining;---------------------如果大於等於0那麼就是拿到了資源,如果小於0,那麼執行緒就要進入等待佇列
}
}
為什麼要用死迴圈----compareAndSetState這個是cas原子操作,失敗之後要迴圈重複繼續操作,直到成功。死迴圈也就結束了。
private void doAcquireSharedInterruptibly(int arg)-------------執行緒入隊操作
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);---------------注意這裡是以共享的方式入隊
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) { --------新入隊的節點會判斷他上一個節點是不是頭節點,如果是頭節點會再次嘗試獲取資源,
int r = tryAcquireShared(arg);
if (r >= 0) { -------------如果獲取到資源,那麼這個阻塞佇列就要清空了,裡面沒有在等待的執行緒了。
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && ----------如果獲取不到資源,那麼就要執行緒阻塞了
parkAndCheckInterrupt()) -----------parkAndCheckInterrupt這個方法會將執行緒阻塞(掛起),執行緒都阻塞了,這個死迴圈就不會執行了,這也就是為什麼juc原始碼寫了很多
死迴圈都沒問題地原因,我們可以借鑑。當執行緒被喚醒之後又開始這個死迴圈,嘗試拿資源(非公平鎖有可能拿不到),
拿不到再次被阻塞掛起。

throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  ---------------判斷執行緒能否被正常阻塞
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) -----------------如果上一個節點是有效的在等待的執行緒,那麼該執行緒就可以插入到佇列後面
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { -----------如果上一個節點是無效的,那就查詢上上個節點是不是有效的,直到找到那個有效的節點,然後將該節點插入到那個有效節點後面,中間的無效節點從連結串列中刪除,後面的節點要找前面
的節點這也就說明了為什麼我們地等待佇列要設計成雙鏈表,不光有next。next這種找後驅節點地操作還有pre .pre這樣前驅節點。所以需要雙鏈表。
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
 

 

 semaphore.release();釋放資源,原始碼分析
protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;    -----------獲取當前的資源然後給資源加回去
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))  -----------------CAS演算法還資源,死迴圈,直到成功還回去,死迴圈結束。
                    return true;
            }
        }

資源還回去之後執行doReleaseShared方法喚醒其他執行緒搶資源
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) { --------發現阻塞佇列有阻塞執行緒
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); ---------跳過頭節點,喚醒下一個節點
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}


private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t; ------迴圈找到waitStatus<0能喚醒的節點呼叫unpark方法喚醒執行緒。

}
if (s != null)
LockSupport.unpark(s.thread);
}
 

CountDownLatch是什麼?

CountDownLatch這個類能夠使一個執行緒等待其他執行緒完成各自的工作後再執行。例如,應用程式的主執行緒希望在負責啟動框架服務的執行緒已經啟動所有的框架服務之後再執行。 CountDownLatch如何工作? CountDownLatch是通過一個計數器來實現的,計數器的初始值為執行緒的數量。每當一個執行緒完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的執行緒已經完成了任務,然後在閉鎖上等待的執行緒就可以恢復執行任務。 API CountDownLatch.countDown()分執行緒執行------   執行緒執行完任務之後處於等待狀態 CountDownLatch.await();   主執行緒執行 -------監控所有執行緒,所有執行緒結束之後主執行緒繼續走下去。 CountDownLatch 不可重用

CyclicBarrier

柵欄屏障,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續執行。CyclicBarrier預設的構造方法是CyclicBarrier(int parties),其引數表示屏障攔截的執行緒數量,每個執行緒呼叫await方法告CyclicBarrier我已經到達了屏障,然後當前執行緒被阻塞。 API cyclicBarrier.await(); 應用場景 可以用於多執行緒計算資料,最後合併計算結果的場景。例如,用一個Excel儲存了使用者所有銀行流水,每個Sheet儲存一個賬戶近一年的每筆銀行流水,現在需要統計使用者的日均銀行流水,先用多執行緒處理每個sheet裡的銀行流水,都執行完之後,得到每個sheet的日均銀行流水,最後,再用barrierAction用這些執行緒的計算結果,計算出整個Excel的日均銀行流水 CyclicBarrier 可以重用。