java多執行緒之同步計數器
阿新 • • 發佈:2019-01-23
同步計數器之--CountDownLatch
使用場景:當用戶上傳檔案,我們可以多開幾個執行緒來上傳檔案,當所有的執行緒上傳完成後,檔案才算上傳完成,在此場景下可以使用CountDownLatch,當每個執行緒上傳完成,呼叫CountDownLatch.countDown()使計數器減1,當計數器為0時表示檔案上傳完成。
CountDownLatch的同步器是通過AQS來實現同步的,在CountDownLatch內部定義了一個Sycn類,此類繼承AQS來實現同步,我們來看下CountDownLatch的幾個主要方法的執行過程
1).當CountDownLatch呼叫countDown()時
//CountDownLatch內部呼叫靜態內部類Sync的方法,但是Sync並沒有releaseShared(int i)的實現,所以我們檢視它的父類AQS
public void countDown() {
sync.releaseShared(1);
}
AbstractQueuedSynchronizer類
//在AQS中呼叫子類的tryReleaseShared(int i)的方法,如果子類的方法返回true,則 doReleaseShared(),並且自身返回true public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared();//釋放訊號量 return true; } return false; }
Sync類
// protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState();//獲取當前計數器 if (c == 0)//如果計數器為0,則直接返回,因為計數器為0,則釋放所有執行緒,CountDownLatch工作已經結束了 return false; int nextc = c-1; if (compareAndSetState(c, nextc))//原子性的將計數器減1,返回計數器是否==0 return nextc == 0; } }
2).當CountDownLatch呼叫await()時
//CountDownLatch呼叫Sync類中的acquireSharedInterruptibly(int i)方法,Sync中沒有實現,所以是呼叫AQS的方法()
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
AQS類
//呼叫子類的tryAcquireShared(int i)方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//判斷執行緒是否中斷
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)//結合下面的程式碼可以看出,當計數器不等於0時,往下執行,
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)//這段有點看不懂啊
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);//獲取當前訊號量是否為0,如果為0這個方法返回1,否則返回-1
if (r >= 0) {//當訊號量為0的時候就退出死迴圈,否則一直在這裡面出不去,所以實現了一直等待
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Sync類//當計數器等於0時返回1,否則返回-1;
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
CountDownLatch示例
package com.synchronize;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 同步計數器
* @author lijh
*
*/
public class MyCountDownLatch {
public static void main(String[] args) {
CountDownLatch doneSignal = new CountDownLatch(5);
//定長執行緒池
ExecutorService pool = Executors.newFixedThreadPool(1);
for (int i = 0; i < 5; ++i){
pool.execute(new WorkerRunnable(doneSignal, i));
}
try {
//在計數器為0之前會一直等待
doneSignal.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
//關閉執行緒池
pool.shutdown();
System.out.println("main 執行緒結束");
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
//每個執行緒做完自己的事情後,計數器減1
doWork(i);
doneSignal.countDown();
Thread.currentThread().interrupt();
}
void doWork(int i) {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(doneSignal.getCount());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
同步計數器之--Semaphore
semaphore通常用於限制訪問一些資源的執行緒數目,比如有一個伺服器,同時最多隻能三個人訪問,此時可以使用semaphore實現
Semaphore方法比較多,此處展示一些常用的
Semaphore示例
package com.synchronize;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* Semaphore同步器會規定一個閥值來限制對資源的訪問,使用資源時,執行緒最多不能超出這個閥值
* 場景:當前駕校有2輛車,有5個學員練車,每次只有2個學員可以練車,
* 其他學員需要等待,當有學員練完車了,可以讓給下一個學員練車
* @author lijh
*
*/
public class MySemaphore {
public static void main(String[] args) {
//最多兩個執行緒訪問資源
Semaphore sp = new Semaphore(2);
//建立一個容納5個執行緒的執行緒池
ExecutorService pool = Executors.newFixedThreadPool(5);
Driver d = new Driver(sp);
Car c1 = new Car(d);
Car c2 = new Car(d);
Car c3 = new Car(d);
Car c4 = new Car(d);
Car c5 = new Car(d);
pool.execute(c1);
pool.execute(c2);
pool.execute(c3);
pool.execute(c4);
pool.execute(c5);
pool.shutdown();//關閉執行緒池
}
}
//車
class Car implements Runnable{
private Driver d;
public Car(Driver d){
this.d = d;
}
@Override
public void run() {
d.driverCar();
}
}
//司機
class Driver{
//同步計數器
private Semaphore sp;
public Driver(Semaphore sp){
this.sp = sp;
}
//學員練車
public void driverCar(){
try {
sp.acquire();//從此訊號量獲取一個許可,在提供一個許可前一直將執行緒阻塞,否則執行緒被中斷。
System.out.println(Thread.currentThread().getName()+" start");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+" end");
sp.release();////釋放一個許可,將其返回給訊號量。
//Thread.currentThread().interrupt();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
同步計數器之--CyclicBarrier
先看一個示例
package com.synchronize;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* CyclicBarrier 允許一組執行緒互相等待,直到到達某個公共屏障點,然後所有執行緒繼續往後執行。
* CyclicBarrier 與 CountDownLatch 區別在於 CyclicBarrier 是多個執行緒互相等待,
* CountDownLatch 是一個執行緒等待多個執行緒完成工作,等待的物件不同了
* 場景:公司有4個人,每天早上開晨會時,大家一起去會議室,當所有人都到達會議室,會議開始。
*
* @author lijh
*
*/
public class MyCyclicBarrier {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier cb = new CyclicBarrier(4,new Thread(new Meeting()));
ExecutorService pool = Executors.newFixedThreadPool(4);
Member s1 = new Member(cb);
Member s2 = new Member(cb);
Member s3 = new Member(cb);
Member s4 = new Member(cb);
pool.execute(s1);
pool.execute(s2);
pool.execute(s3);
//pool.execute(s4);
TimeUnit.SECONDS.sleep(5);
//當CyclicBarrier.await超時時,會丟擲異常
pool.execute(s4);
pool.shutdown();
}
}
/**
* 成員
* @author lijh
*
*/
class Member implements Runnable{
private CyclicBarrier cb;
public Member(CyclicBarrier cb){
this.cb = cb;
}
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println("成員 "+Thread.currentThread().getName()+" 已經到達會議室!");
cb.await(3,TimeUnit.SECONDS);
//cb.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
/**
* 會議
* @author lijh
*
*/
class Meeting implements Runnable{
@Override
public void run() {
System.out.println("會議開始!");
}
}
CyclicBarrier有兩個構造方法
CyclicBarrier(int i) 和CyclicBarrier(int i , Runnable r),第二個構造,是在所有參與者都在此屏障上後,會優先執行r這個執行緒,上面的示例也演示出了這一點。