Concurrent.util 常用類的介紹和使用
阿新 • • 發佈:2018-11-19
Concurrent.util 常用類的介紹和使用
CyclicBarrier
假設一個場景: 每個執行緒都代表一個跑步運動員,當運動員都準備好後才一起出發,只要有一個沒準備好,大家都等待。
package org.mulity.demo;
import java.util.Random;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* CyclicBarrier 使用示例
*
* Created by jun on 2017/10/14.
*/
public class UseCyclicBarrier {
static class Runner implements Runnable {
private CyclicBarrier barrier;
private String name;
public Runner(CyclicBarrier barrier, String name) {
this.barrier = barrier;
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000 * (new Random()).nextInt(5 ));
System.out.println(name + ", ready");
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(name + "GO!");
}
}
public static void main(String[] args) {
// new CyclicBarrier(3); 必須有 3 個barrier.await(); 才能繼續往下執行
CyclicBarrier barrier = new CyclicBarrier(3);
// 返回一個固定數量的執行緒池
ExecutorService executor = Executors.newFixedThreadPool(3);
executor.submit(new Thread(new Runner(barrier, "運動員1")));
executor.submit(new Thread(new Runner(barrier, "運動員2")));
executor.submit(new Thread(new Runner(barrier, "運動員3")));
}
}
// 輸出:
運動員2, ready
運動員3, ready
運動員1, ready
運動員1GO!
運動員2GO!
運動員3GO!
CountDownLacth
它常用於監聽某些初始化操作,等初始化操作執行完畢後,通知主執行緒繼續工作
package org.mulity.demo;
import java.util.concurrent.CountDownLatch;
/**
* CountDownLatch 使用示例
* <p>
* Created by jun on 2017/10/14.
*/
public class UserCountDownLatch {
public static void main(String[] args) {
final CountDownLatch countDown = new CountDownLatch(2);
// CountDownLatch(2) 代表一個 countDown.await(); 需要兩個countDown.countDown();來喚醒
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("進入執行緒1:等待其他執行緒處理完成");
countDown.await();
System.out.println("t1 執行緒繼續執行");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("進入執行緒2:執行緒開始初始化");
Thread.sleep(3000);
countDown.countDown();
System.out.println("t2 執行緒初始化完畢");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t2");
Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("進入執行緒3:執行緒開始初始化");
Thread.sleep(3000);
countDown.countDown();
System.out.println("t3 執行緒初始化完畢");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t3");
t1.start();
t2.start();
t3.start();
}
}
// 輸出:
進入執行緒2:執行緒開始初始化
進入執行緒3:執行緒開始初始化
進入執行緒1:等待其他執行緒處理完成
t2 執行緒初始化完畢
t3 執行緒初始化完畢
t1 執行緒繼續執行
Callable 和 Future
Future 模式 有點類似於 ajax 非同步的呼叫,非常適合在處理耗時很長的業務邏輯的時候使用,可以有效的減小系統的響應時間,提高系統的吞吐量
package org.mulity.demo;
import java.util.concurrent.*;
/**
* Future 使用示例
*
* Created by jun on 2017/10/14.
*/
public class UseFuture implements Callable<String>{
private String para;
public UseFuture(String para) {
this.para = para;
}
/**
* 這裡是真實的業務邏輯,執行起來可能比較慢
*/
@Override
public String call() throws Exception {
// 模擬執行耗時
Thread.sleep(3000);
return this.para + "處理完成";
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
String queryStr = "query";
FutureTask future = new FutureTask<String>(new UseFuture(queryStr));
// 該方法會建立一個執行緒的執行緒池,若空閒則執行,若沒有空閒執行緒,則快取在任務佇列中
ExecutorService executor = Executors.newSingleThreadExecutor();
Future f = executor.submit(future);
System.out.println("請求完畢");
// 主程式可以執行其他業務邏輯
Thread.sleep(1000);
// future.get() 沒得到資料前會阻塞
System.out.println("資料:" + future.get());
executor.shutdown();
}
}
// 輸出
請求完畢
資料:query處理完成
Samaphore 訊號量
常用於高併發下,Java 層面,系統的限流。
package org.mulity.demo;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* Semaphore 示例
*
* Created by jun on 2017/10/14.
*/
public class UseSemaphore {
public static void main(String[] args) {
// 返回一個可以根據實際情況調整執行緒個數的執行緒池,不限制最大的執行緒數量,若有空閒的執行緒,則執行任務,若無任務則不建立執行緒,並且每一個空閒執行緒會在60秒後自動回收
ExecutorService executor = Executors.newCachedThreadPool();
// 同時只能有 5 個執行緒同時訪問
final Semaphore semp = new Semaphore(5);
// 模擬 20個執行緒同時訪問
for (int index = 0; index < 20; index++) {
final int NO = index;
Runnable run = new Runnable() {
@Override
public void run() {
try {
// 獲取許可
semp.acquire();
System.out.println(NO + "已經獲取許可");
// 模擬實際業務邏輯
Thread.sleep(1000 * (new Random()).nextInt(10));
// 釋放許可
semp.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
executor.submit(run);
}
}
}
// 輸出:
0已經獲取許可
1已經獲取許可
2已經獲取許可
3已經獲取許可
4已經獲取許可
5已經獲取許可
6已經獲取許可
7已經獲取許可
...