併發工具類
CountDownLatch
CountDownLatch是一個同步工具類,它允許一個或者多個執行緒一直等待,知道其他執行緒的操作執行完畢再執行。
CountDownLatch提供了兩個方法,一個是countDown,一個是await,countDownLatch初始化的時候需要傳入一個整數,在這個整數倒數到0之前,呼叫了await方法的程式都必須要等待,然後通過countDown來倒數。
public static void main(String[] args) throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(4); new Thread(new Runnable() { @Override public void run() { System.out.println("" + Thread.currentThread().getName() + "-執行中"); countDownLatch.countDown(); System.out.println("" + Thread.currentThread().getName() + "-執行完畢"); } }, "t1").start(); new Thread(new Runnable() { @Override public void run() { System.out.println("" + Thread.currentThread().getName() + "-執行中"); countDownLatch.countDown(); System.out.println("" + Thread.currentThread().getName() + "-執行完畢"); } }, "t2").start(); new Thread(new Runnable() { @Override public void run() { System.out.println("" + Thread.currentThread().getName() + "-執行中"); countDownLatch.countDown(); System.out.println("" + Thread.currentThread().getName() + "-執行完畢"); } }, "t3").start(); countDownLatch.await(); System.out.println("所有執行緒已經執行完畢"); }
從程式碼實現看,類似join的功能,但是比join更加靈活。CountDownLatch建構函式會接受一個int型別的引數作為計數器的初始值,當呼叫CountDownLatch的countDown方法時,這和計數器就會減一。
模擬高併發場景
static CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) { for (int i = 0; i < 1000; i++) { new Thread(new CountDownLatchDemo()).start(); } countDownLatch.countDown(); } @Override public void run() { try { countDownLatch.await(); }catch (InterruptedException e){ e.printStackTrace(); } System.out.println("ThreadName:"+Thread.currentThread().getName()); }
總的來說,凡是涉及到需要指定某個任務再執行之前,要等到前置任務執行完畢之後才能執行的場景,都可以使用CountDownLatch。
CountDownLatch原始碼解析
對於countDownLatch,只要有await()方法和countDown()方法。
countDown()方法每次呼叫都會將state減1,直到state的值為0;而await是一個阻塞方法,當state減為0的時候,await方法才會返回。await可以被多個執行緒呼叫。所有呼叫了await方法的執行緒阻塞在AQS的紫色佇列來,條件滿足(state==0),將執行緒從佇列中一個個喚醒過來。
acquireSharedInterruptibly
countDownLatch也使用到AQS,在CountDownLatch內部寫了一個Sync並且繼承了AQS這個抽象類重寫了AQS中的共享鎖方法。如下程式碼,這塊程式碼只要是判斷當前執行緒是否獲取到了共享鎖;(在CountDownLatch中,使用的是共享鎖機制,因為CountDownLatch並不需要實現互斥的特性)
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//state如果不等於0,說明當前執行緒需要加入帶共享鎖佇列
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
doAcquireSharedInterruptibly
1.addWaiter設定為shared模式
2.tryAcquire和tryAcquireShared的返回值不同,因此會多出一個判斷過程
3.在判斷前驅結點是頭結點後,呼叫了setHeadAndPropagate方法,而不是簡單地更新了一下頭結點
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedE
//建立一個共享模式的節點新增到佇列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//判斷嘗試獲得鎖
int r = tryAcquireShared(arg);
//r>=0表示獲取到了執行許可權,這個時候state!=0,所以不會執行這段程式碼
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//阻塞執行緒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
圖解分析
加入這個時候有3個執行緒呼叫了await方法,由於這個時候state的值還不為0,所以這三個執行緒都會加入到AQS佇列中。並且三個執行緒都處於阻塞狀態。
CountDownLatch.countDown
由於執行緒被await方法阻塞了,所以只有等到countDown方法使得state=0的時候才會被喚醒。
1.只有當state減為0的時候,tryReleaseShared才會返回true,否則只是簡單的state=state-1
2.如果state=0,則呼叫doReleaseShared喚醒處於await狀態下的執行緒
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
//用自旋的方式實現state減1
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
AQS.doReleaseShared
共享鎖的釋放和獨佔鎖的釋放有一定的差別,前面喚醒鎖的邏輯和獨佔鎖是一樣的,先判斷頭結點是不是SIGNAL狀態,如果是,則修改為0,並且喚醒頭結點的而下一個節點
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//這個CAS失敗的場景是:執行到這裡的時候,剛好有一個節點入隊,入隊會將這個ws設定為-1
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果當到這裡的時候,前面喚醒的執行緒已經佔了了head,那麼再迴圈
//通過檢查頭節點是否改變了,如果改變了就繼續迴圈
if (h == head) // loop if head changed
break;
}
}
PROPAGATE:標識為PROPAGATE狀態的節點,是共享模式下的節點狀態,處於這個狀態下的節點,會對縣城內的喚醒進行傳播
h==head:說明頭節點還沒有被剛剛用unparkSuccessor喚醒的執行緒(這裡可以理解為ThreadB)佔有,是break退出迴圈。
h!=head:頭節點被剛剛喚醒的執行緒(這裡可以理解為ThreadB)佔有,那麼這裡重新進入下一輪玄幻,喚醒下一個節點(這裡是ThreadB)。然後後面喚醒傳遞。。
一旦ThreadA被喚醒,程式碼又回到了doAcquireSharedInterruptibly中來執行。如果當前state滿足等於0的條件,則會執行setHeadAndPropagate方法
if (p == head) {
//判斷嘗試獲得鎖
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
setHeadAndPropagate
這個方法主要作用是把被喚醒的節點,設定成head節點。然後繼續喚醒佇列中的其他執行緒。
由於現在佇列有3個執行緒處於阻塞狀態,一旦ThreadA被喚醒,並且設定為head之後,會繼續喚醒後續的ThreadB。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
Semaphore
semaphore 也就是我們常說的訊號燈,semaphore 可以控 制同時訪問的執行緒個數,通過 acquire 獲取一個許可,如 果沒有就等待,通過 release 釋放一個許可。有點類似限流 的作用。叫訊號燈的原因也和他的用處有關,比如某商場 就 5 個停車位,每個停車位只能停一輛車,如果這個時候 來了 10 輛車,必須要等前面有空的車位才能進入。
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < 10; i++) {
new Car(i, semaphore).start();
}
}
static class Car extends Thread {
private int num;
private Semaphore semaphore;
public Car(int num, Semaphore semaphore) {
this.num = num;
this.semaphore = semaphore;
}
public void run() {
try {
semaphore.acquire();
System.out.println("第" + num + "佔用一個停車位");
TimeUnit.SECONDS.sleep(2);
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
使用場景
Semaphore比較常見的就是用來作限流操作。
Semaphore原始碼分析
從Semaphore的功能來看,我們可以猜測它的底層原理一定是基於AQS的共享鎖。
建立Semaphore例項的時候,需要一個引數permits,這個基本上可以確定是設定給AQS的state的,然後每個執行緒呼叫acquire的時候,執行state=state-1,release的時候執行state=state+1,當然,acquire的時候,如果state=0,說明沒有資源了,需要等待其他的執行緒release。
Semaphore分公平策略和非公平策略
FairSync
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
//區別就在於是不是會先判斷是否有執行緒在排隊,然後才進行CAS鍵操作
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
NoFairSync
通過對別發現公平鎖和非公平鎖的區別就是在於是否多了一個hasQueuedPredecessors的判斷
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//都是基於共享鎖來實現的
CyclicBarrier
CyclicBarrier的字面意思是可迴圈使用(Cyclic)的屏障(Barrier)。他要做的事情是,讓一組執行緒到達一個屏障(也可以佳作同步點)時被阻塞,知道最後一個執行緒到達平衡住那個是,屏障才會開門,所有被屏障攔截的執行緒才會繼續工作。CyclicBarrier預設的構造方法是CyclicBarrier(int parties),其引數表示屏障攔截的執行緒數量,每個執行緒呼叫await方法告訴CyclicBarrier當前執行緒已經到達了屏障,然後當前執行緒被阻塞。
使用場景
當存在需要所有的子任務都完成時,才會執行主任務,這個時候就可以選擇使用CyclicBarrier。
案例
DataImportThread
public class DataImportThread extends Thread{
private CyclicBarrier cyclicBarrier;
private String path;
public DataImportThread(CyclicBarrier cyclicBarrier,String path){
this.cyclicBarrier = cyclicBarrier;
this.path = path;
}
@Override
public void run() {
System.out.println("開始匯入:"+path+"位置的資料");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
CyclicBarrierDemo
public class CyclicBarrierDemo extends Thread{
@Override
public void run() {
System.out.println("開始進行資料分析");
}
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3,new CyclicBarrierDemo());
new Thread(new DataImportThread(cyclicBarrier,"file1")).start();
new Thread(new DataImportThread(cyclicBarrier,"file2")).start();
new Thread(new DataImportThread(cyclicBarrier,"file3")).start();
}
}
注意點:
1)對於制定計數值parties。若由於某種原因,沒有足夠的執行緒呼叫CyclicBarrier的await,則所有呼叫await的執行緒都會被阻塞。
2)同樣的CyclicBarrier也可以呼叫await(timeout, unit),設定超時時間,在設定時間內,如果沒有足夠執行緒到達,則解除阻塞狀態,繼續工作。
3)通過reset重置計數,會使得進入await的執行緒出現BrokenBarrierExecption;
4)如果採用是CyclicBarrier(int parteis, Runnable barrierAction)構造方法,執行barrierAction操作的是最後一個到達的執行緒。
實現原理
CyclicBarrier相比CountDownLatch來說,要簡單很多,原始碼實現是基於ReentrantLock和Condition的組合使用。如下圖,CyclicBarrier和CountDownLatch是不是很像,只是CyclicBarrier可以不止一個柵欄,因為他的柵欄(Barrier)可以重複使用。