java之JUC系列-外部Tools-Executors|Semaphor|Exchanger|CyclicBarrier|CountDownLatch
前面寫了兩篇JDBC原始碼的文章,自己都覺得有點枯燥,先插一段JUC系列的文章來換換胃口,前面有文章大概介紹過J U C包含的東西,JUC體系包含的內容也是非常的多,不是一兩句可以說清楚的,我這首先列出將會列舉的JUC相關的內容,然後介紹本文的版本:Tools部分
J.U.C體系的主要大板塊包含內容,如下圖所示:
注意這個裡面每個部分都包含很多的類和處理器,而且是相互包含,相互引用的,相互實現的。
說到J UC其實就是說java的多執行緒等和鎖,前面說過一些狀態轉換,中斷等,我們今天來用它的tools來實現一些有些小意思的東西,講到其他內容的時候,再來想想這寫tools是怎麼實現的。
tools是本文說要講到的重點,而tools主要包含哪些東西呢:
Tools也包含了5個部分的知識:Executors、Semaphor、Exchanger、CyclicBarrier、CountDownLatch,其實也就是五個工具類,這5個工具類有神馬用途呢,就是我們接下來要將的內容了。
Executors:
其實它主要用來建立執行緒池,代理了執行緒池的建立,使得你的建立入口引數變得簡單,通過方法名便知道了你要建立的執行緒池是什麼樣一個執行緒池,功能大概是什麼樣的,其實執行緒池內部都是統一的方法來實現,通過構造方法過載,使得實現不同的功能,但是往往這種方式很多時候不知道具體入口引數的改變有什麼意思,除非讀了原始碼才知道,此時builder模式的方式來完成,builder什麼樣的東西它告訴你就可以。
常見的方法有(都是靜態方法):
1、建立一個指定大小的執行緒池,如果超過大小,放入blocken佇列中,預設是LinkedBlockingQueue,預設的ThreadFactory為:Executors.defaultThreadFactory(),是一個Executors的一個內部類。
Executors.newFixedThreadPool(int)
內部實現是:
2、建立一個指定大小的執行緒池,如果超過大小,放入blocken佇列中,預設是LinkedBlockingQueue,自己指定ThreadFactory,自己寫的ThreadFactory,必須implements ThreadFactory,實現方法:newThread(Runnable)。public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
Executors.newFixedThreadPool(int,ThreadFactory)
內部實現是:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
3、建立執行緒池長度為1的,也就是隻有一個長度的執行緒池,多餘的必須等待,它和呼叫Executors.newFixedThreadPool(1)得到的結果一樣:
Executors.newSingleThreadExecutor()
內部實現是:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
是不是蠻簡單的,就是在變引數,你自己也可以new的。
4、和方法3類似,可以自定義ThreadFactory,這裡就不多說了!
5、建立可以進行快取的執行緒池,預設快取60s,資料會放在一個SynchronousQueue上,而不會進入blocken佇列中,也就是隻要有執行緒進來就直接進入排程,這個不推薦使用,因為容易出問題,除非用來模擬一些併發的測試:
Executors.newCachedThreadPool();
內部實現為:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
6、和方法5類似,增加自定義ThreadFactory
7、新增一個Schedule的排程器的執行緒池,預設只有一個排程:
Executors.newSingleThreadScheduledExecutor();
內部實現為(這裡可以看到不是用ThreadPoolExector了,schedule換了一個類,內部實現通過ScheduledThreadPoolExecutor類裡面的內部類ScheduledFutureTask來實現的,這個內部類是private,預設是引用不到的哦):
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
8、和7一樣,增加自己定義的ThreadFactory
9、新增一個schedule的執行緒池排程器,和newFixedThreadPool有點類似:
Executors.newScheduledThreadPool();
內部程式碼為:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
其實內部Exectors裡面還有一些其他的方法,我們就不多說明了,另外通過這裡,大家先可以瞭解一個大概,知道Exectors其實是一個工具類,提供一系列的靜態方法,來完成對對應執行緒池的形象化建立,所以不用覺得很神奇,神奇的是內部是如何實現的,本文我們不闡述文章中各種執行緒池的實現,只是大概上有個認識,等到我們專門將Exector系列的時候,我們會詳細描述這些細節。
OK,我們繼續下一個話題:
Semaphor,這個鳥東西是敢毛吃的呢?
答:通過名字就看出來了,是訊號量。
訊號量可以幹什麼呢?
答:根據一些閥值做訪問控制。
OK,我們這裡模擬一個當多個執行緒併發一段程式碼的時候,如何控制其訪問速度:
import java.util.Random;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
private final static Semaphore MAX_SEMA_PHORE = new Semaphore(10);
public static void main(String []args) {
for(int i = 0 ; i < 100 ; i++) {
final int num = i;
final Random radom = new Random();
new Thread() {
public void run() {
boolean acquired = false;
try {
MAX_SEMA_PHORE.acquire();
acquired = true;
System.out.println("我是執行緒:" + num + " 我獲得了使用權!" + DateTimeUtil.getDateTime());
long time = 1000 * Math.max(1, Math.abs(radom.nextInt() % 10));
Thread.sleep(time);
System.out.println("我是執行緒:" + num + " 我執行完了!" + DateTimeUtil.getDateTime());
}catch(Exception e) {
e.printStackTrace();
}finally {
if(acquired) {
MAX_SEMA_PHORE.release();
}
}
}
}.start();
}
}
}
這裡是簡單模擬併發100個執行緒去訪問一段程式,此時要控制最多同時執行的是10個,用到了這個訊號量,執行程式用了一個執行緒睡眠一個隨機的時間來代替,你可以看到後面有執行緒說自己釋放了,就有執行緒獲得了,沒釋放是獲取不到的,內部實現方面,我們暫時不管,暫時知道這樣用就OK。
接下來:
Exchanger十個神馬鬼東西呢?
答:執行緒之間互動資料,且在併發時候使用,兩兩交換,交換中不會因為執行緒多而混亂,傳送出去沒接收到會一直等,由互動器完成互動過程。
啥時候用,沒想到案例?
答:的確很少用,而且案例很少,不過的確有這種案例,Exchanger
import java.util.concurrent.Exchanger;
public class ExchangerTest {
public static void main(String []args) {
final Exchanger <Integer>exchanger = new Exchanger<Integer>();
for(int i = 0 ; i < 10 ; i++) {
final Integer num = i;
new Thread() {
public void run() {
System.out.println("我是執行緒:Thread_" + this.getName() + "我的資料是:" + num);
try {
Integer exchangeNum = exchanger.exchange(num);
Thread.sleep(1000);
System.out.println("我是執行緒:Thread_" + this.getName() + "我原先的資料為:" + num + " , 交換後的資料為:" + exchangeNum);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
}
}
}
這裡執行你可以看到,如果某個執行緒和另一個執行緒傳送了資料,它接受到的資料必然是另一個執行緒傳遞給他的,中間步驟由Exchanger去控制,其實你可以說,我自己隨機取選擇,不過中間的演算法邏輯就要複雜一些了。
接下來:
CyclicBarrier,關卡模式,搞啥玩意的呢?
答:當你在很多環節需要卡住,要多個執行緒同時在這裡都達到後,再向下走,很有用途。
能否舉個例子,有點抽象?
答:團隊出去旅行,大家一起先達到酒店住宿,然後一起達到遊樂的地方遊玩,然後一起坐車回家,每次需要點名後確認相關人員均達到,然後LZ一聲令下,觸發,大夥就瘋子般的出發了。
下面的例子也是以旅遊的方式來呈現給大家:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class BarrierTest {
private static final int THREAD_COUNT = 10;
private final static CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(THREAD_COUNT ,
new Runnable() {
public void run() {
System.out.println("======>我是導遊,本次點名結束,準備走下一個環節!");
}
}
);
public static void main(String []args)
throws InterruptedException, BrokenBarrierException {
for(int i = 0 ; i < 10 ; i++) {
new Thread(String.valueOf(i)) {
public void run() {
try {
System.out.println("我是執行緒:" + this.getName() + " 我們達到旅遊地點!");
CYCLIC_BARRIER.await();
System.out.println("我是執行緒:" + this.getName() + " 我開始騎車!");
CYCLIC_BARRIER.await();
System.out.println("我是執行緒:" + this.getName() + " 我們開始爬山!");
CYCLIC_BARRIER.await();
System.out.println("我是執行緒:" + this.getName() + " 我們回賓館休息!");
CYCLIC_BARRIER.await();
System.out.println("我是執行緒:" + this.getName() + " 我們開始乘車回家!");
CYCLIC_BARRIER.await();
System.out.println("我是執行緒:" + this.getName() + " 我們到家了!");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}.start();
}
}
}
測試結果中可以發現,大家一起走到某個步驟後,導遊說:“我是導遊,本次點名結束,準備走下一個環節!”,然後才會進入下一個步驟,OK,這個有點意思吧,其實賽馬也是這個道理,只是賽馬通常只有一個步驟,所以我們還有一個方式是:
CountDownLatch的方式來完成賽馬操作,CountDownLatch是用計數器來做的,所以它不可以被複用,如果要多次使用,就要從新new一個出來才可以。我們下面的程式碼中,用兩組賽馬,每組5個參與者來,做一個簡單測試:
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest {
private final static int GROUP_SIZE = 5;
public static void main(String []args) {
processOneGroup("分組1");
processOneGroup("分組2");
}
private static void processOneGroup(final String groupName) {
final CountDownLatch start_count_down = new CountDownLatch(1);
final CountDownLatch end_count_down = new CountDownLatch(GROUP_SIZE);
System.out.println("==========================>\n分組:" + groupName + "比賽開始:");
for(int i = 0 ; i < GROUP_SIZE ; i++) {
new Thread(String.valueOf(i)) {
public void run() {
System.out.println("我是執行緒組:【" + groupName + "】,第:" + this.getName() + " 號執行緒,我已經準備就緒!");
try {
start_count_down.await();//等待開始指令發出即:start_count_down.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我是執行緒組:【" + groupName + "】,第:" + this.getName() + " 號執行緒,我已執行完成!");
end_count_down.countDown();
}
}.start();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("各就各位,預備!");
start_count_down.countDown();//開始賽跑
try {
end_count_down.await();//等待多個賽跑者逐個結束
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("分組:" + groupName + "比賽結束!");
}
}
有點意思哈,如果你自己用多執行緒實現是不是有點麻煩,不過你可以用Thread的join方法來實現,也就是執行緒的發生join的時候,當前執行緒(一般是主執行緒)要等到對應執行緒執行完run方法後才會進入下一步,為了模擬下,我們也來玩玩:
public class ThreadJoinTest {
private final static int GROUP_SIZE = 5;
public static void main(String []args) throws InterruptedException {
Thread []threadGroup1 = new Thread[5];
Thread []threadGroup2 = new Thread[5];
for(int i = 0 ; i < GROUP_SIZE ; i++) {
final int num = i;
threadGroup1[i] = new Thread() {
public void run() {
int j = 0;
while(j++ < 10) {
System.out.println("我是1號組執行緒:" + num + " 這個是我第:" + j + " 次執行!");
}
}
};
threadGroup2[i] = new Thread() {
public void run() {
int j = 0;
while(j++ < 10) {
System.out.println("我是2號組執行緒:" + num + " 這個是我第:" + j + " 次執行!");
}
}
};
threadGroup1[i].start();
}
for(int i = 0 ; i < GROUP_SIZE ; i++) {
threadGroup1[i].join();
}
System.out.println("-==================>執行緒組1執行完了,該輪到俺了!");
for(int i = 0 ; i < GROUP_SIZE ; i++) {
threadGroup2[i].start();
}
for(int i = 0 ; i < GROUP_SIZE ; i++) {
threadGroup2[i].join();
}
System.out.println("全部結束啦!哈哈,回家喝稀飯!");
}
}
程式碼是不是繁雜了不少,呵呵,我們再看看上面的訊號量,如果不用工具,自己寫會咋寫,我們模擬CAS鎖,使用Atomic配合完成咋來做呢。也來玩玩,呵呵:
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadWaitNotify {
private final static int THREAD_COUNT = 100;
private final static int QUERY_MAX_LENGTH = 2;
private final static AtomicInteger NOW_CALL_COUNT = new AtomicInteger(0);
public static void main(String []args) throws InterruptedException {
Thread []threads = new Thread[THREAD_COUNT];
for(int i = 0 ; i < THREAD_COUNT ; i++) {
threads[i] = new Thread(String.valueOf(i)) {
synchronized public void run() {
int nowValue = NOW_CALL_COUNT.get();
while(true) {
if(nowValue < QUERY_MAX_LENGTH && NOW_CALL_COUNT.compareAndSet(nowValue, nowValue + 1)) {
break;//獲取到了
}
try {
this.wait(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
nowValue = NOW_CALL_COUNT.get();//獲取一個數據,用於對比
}
System.out.println(this.getName() + "======我開始做操作了!");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.getName() + "======操作結束了!");
NOW_CALL_COUNT.getAndDecrement();
this.notify();
}
};
}
for(int i = 0 ; i < THREAD_COUNT ; i++) {
threads[i].start();
}
}
}
還是有點意思哈,這樣寫就是大部分人對while迴圈那部分會寫暈掉,主要是要不斷去判定和嘗試,wait()預設是長期等待,但是我們不想讓他長期等待,就等1s然後再嘗試,其例項子還可以改成wait一個隨機的時間範圍,這樣模擬的效果會更加好一些;另外實際的程式碼中,如果獲取到鎖後,notify方法應當放在finally中,才能保證他肯定會執行notify這個方法。
OK,本文就是用,玩,希望玩得有點爽,我們後面會逐步介紹它的實現機制以及一寫執行緒裡頭很好用,但是大家又不是經常用的東西。