執行緒池中的柵欄
阿新 • • 發佈:2018-12-18
多執行緒中有三個類,分別是CountDownLatch,CyclicBarrier,Semaphore。代表著執行緒中的柵欄。共享鎖。
CountDownLatch
在一組執行緒中,一個執行緒等待其他執行緒。我把它理解為門栓。
檢視該類的資料結構圖如下圖一
圖一
有一個靜態的內部類,Sync繼承自AQS。
- CountDownLatch(int) : 有參構造方法,該類擁有的共享鎖次數。
- await() : 阻塞,等待該類釋放掉共享鎖,也就是說,擁有共享鎖的次數為0
- countDown:釋放一次共享鎖
- getCount: 獲取共享鎖次數
使用例子:程式碼如下:
/** * @ClassName CountDownLatchTest * @Description 共享鎖。在完成一組正在其他執行緒中執行的操作之前,允許一個或者多個執行緒一直等待 * @Author ouyangkang * @Date 2018/10/23 14:33 **/ public class CountDownLatchTest { private static int SIZE = 6; private static CountDownLatch countDownLatch; /** * @Date 2018/10/25 15:49 * @Description 阻塞佇列為3 核心池的大小為2 最大池的大小為6的執行緒池 **/ private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2, 6, 60L,TimeUnit.SECONDS, new LinkedBlockingDeque<>(3)); public static void main(String[] args) { countDownLatch = new CountDownLatch(SIZE); for (int i = 0; i < 6; i++) { threadPoolExecutor.execute(new ThreadTest()); } try { // 阻塞等待 countDownLatch.await(); System.out.printf("thread main = [{%s}] \n", Thread.currentThread().getName()); threadPoolExecutor.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } } static class ThreadTest extends Thread { @Override public void run() { System.out.printf("thread-name = [%s] \n", Thread.currentThread().getName()); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("thread-name =[%s] 執行 \n", Thread.currentThread().getName()); // 數值減一 countDownLatch.countDown(); } } }
返回結果:
thread-name = [pool-1-thread-1] thread-name = [pool-1-thread-3] thread-name = [pool-1-thread-2] thread-name =[pool-1-thread-1] 執行 thread-name =[pool-1-thread-3] 執行 thread-name = [pool-1-thread-1] thread-name = [pool-1-thread-3] thread-name =[pool-1-thread-2] 執行 thread-name = [pool-1-thread-2] thread-name =[pool-1-thread-1] 執行 thread-name =[pool-1-thread-3] 執行 thread-name =[pool-1-thread-2] 執行 thread main = [{main}]
- 執行緒池中阻塞佇列為3,啟動了6個執行緒。阻塞佇列中阻塞3個,核心池2個在執行任務,最大池的大小為6,那麼就會新建一個執行緒來處理該任務。
- 主執行緒最後執行,wait等待其他執行緒任務執行完畢,再繼續執行主執行緒。
CyclicBarrier
在一組執行緒中允許多個執行緒相互等待。就是,每個執行緒都先執行一下,然後互相等待到一個點。然後再執行。
檢視該類的資料結構圖
有參構造方法,設定屏障點是多少,建立執行緒,等待。直到執行緒數量大於等於該屏障點。處於該屏障點等待得執行緒全部喚醒。
使用例子如下:
/** * @ClassName CyclicBarrierTest * @Description 屏障,允許多個執行緒相互等待。到達了某一個臨界點,就喚醒所有執行緒 * @Author ouyangkang * @Date 2018/10/23 15:00 **/ public class CyclicBarrierTest { private static int SIZE = 5; private static CyclicBarrier cyclicBarrier; public static void main(String[] args) { cyclicBarrier = new CyclicBarrier(SIZE); for (int i = 0; i < 5 ;i++){ new ThreadTest().start(); } } static class ThreadTest extends Thread { @Override public void run() { System.out.printf("thread=[%s] 開始-- \n", Thread.currentThread().getName()); try { Thread.sleep(1000); cyclicBarrier.await(); System.out.printf("thread=[%s] 繼續--- \n", Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }
結果如下:
thread=[Thread-0] 開始--
thread=[Thread-4] 開始--
thread=[Thread-3] 開始--
thread=[Thread-1] 開始--
thread=[Thread-2] 開始--
thread=[Thread-2] 繼續---
thread=[Thread-4] 繼續---
thread=[Thread-0] 繼續---
thread=[Thread-1] 繼續---
thread=[Thread-3] 繼續---
Semaphore
在一組執行緒中,執行緒持有訊號量,如果資訊量得大小為10,那麼所有執行緒能夠持有得訊號量不能超過10,如果3個執行緒分別持有得訊號量是3 4 5 。 那麼只能是兩個執行緒執行,當其中一個釋放了該訊號量,其他執行緒才可以執行。
類圖結構如下:
具體例子如下:
/**
* @ClassName SemaphoreTest
* @Description 訊號量
* @Author ouyangkang
* @Date 2018/10/23 16:34
**/
public class SemaphoreTest {
private static int SIZE = 10;
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(SIZE);
ExecutorService executorService = new ThreadPoolExecutor(2,3,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(5));
executorService.execute(new ThreadTest(3,semaphore));
executorService.execute(new ThreadTest(4,semaphore));
executorService.execute(new ThreadTest(5,semaphore));
executorService.shutdown();
}
static class ThreadTest extends Thread{
private Integer count ;
private Semaphore semaphore;
public ThreadTest(Integer count , Semaphore semaphore){
this.count = count ;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire(count);
System.out.printf("thread=[%s] 擁有訊號量 semaphore=[%d] 開始---- \n",Thread.currentThread().getName(), count);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release(count);
System.out.printf("thread=[%s] 釋放了 semaphore=[%d] --\n",Thread.currentThread().getName(),count);
}
}
}
}
具體結果如下:
thread=[pool-1-thread-2] 擁有訊號量 semaphore=[4] 開始----
thread=[pool-1-thread-1] 擁有訊號量 semaphore=[3] 開始----
thread=[pool-1-thread-2] 釋放了 semaphore=[4] --
thread=[pool-1-thread-2] 擁有訊號量 semaphore=[5] 開始----
thread=[pool-1-thread-1] 釋放了 semaphore=[3] --
thread=[pool-1-thread-2] 釋放了 semaphore=[5] --