多執行緒與高併發(5)
阿新 • • 發佈:2021-10-27
執行緒不安全類與寫法
【執行緒不安全】
:如果一個類類物件同時可以被多個執行緒訪問,如果沒有做同步或者特殊處理就會出現異常或者邏輯處理錯誤。【1. 字串拼接】:
StringBuilder(執行緒不安全)、
StringBuffer(執行緒安全)【2. 日期轉換】:
SimpleDateFormat(執行緒不安全,最好使用區域性變數[堆疊封閉]保證執行緒安全)
JodaTime推薦使用
(執行緒安全)【3. ArrayList、HashSet、HashMap等Collections】:
ArrayList(執行緒不安全)
HashSet(執行緒不安全)
HashMap(執行緒不安全)【**同步容器**synchronized修飾】
Vector、Stack、HashTable
Collections.synchronizedXXX(List、Set、Map)【**併發容器** J.U.C】
ArrayList
->CopyOnWriteArrayList
:(讀時不加鎖,寫時加鎖,避免複製多個副本出來將資料搞亂)寫操作時複製,當有新元素新增到CopyOnWriteArrayList中時,先從原有的陣列中拷貝一份出來,在新的陣列上進行寫操作,寫完之後再將原來的陣列指向新的陣列。
HashSet、TreeSet
->CopyOnWriteArraySet、ConcurrentSkipListSet
:HashMap、TreeMap
ConcurrentHashMap、ConcurrentSkipListMap
:相比ConcurrentHashMap,ConcurrentSkipListMap具有如下優勢:
- ConcurrentSkipListMap的存取速度是ConcurrentSkipListMap的4倍左右
- ConcurrentSkipListMap的key是有序的
- ConcurrentSkipListMap支援更高的併發(它的存取時間和執行緒數幾乎沒有關係,更高併發的場景下越能體現出優勢)
六、安全共享物件策略 - 總結
執行緒限制
:一個被執行緒限制的物件,由執行緒獨佔,並且只能被佔有它的執行緒修改共享只讀
:一個共享只讀的物件,在沒有額外同步的情況下,可以被多個執行緒併發訪問,但是任何執行緒都不能修改它執行緒安全物件
:一個執行緒安全的物件或者容器,在內部通過同步機制來保證執行緒安全,所以其他執行緒無需額外的同步就可以通過公共介面隨意訪問它被守護物件
:被守護物件只能通過獲取特定鎖來訪問
七、J.U.C 之 AQS
7.1、 AQS
AQS:AbstractQueneSynchronizer
- 使用Node實現FIFO佇列,可以用於構建鎖或者其他同步裝置的基礎框架
- 利用int型別表示狀態
- 使用方法是
繼承
- 子類通過繼承並通過實現它的方法管理其狀態{ acquire和release }的方法操縱狀態
- 可以同時實現排它鎖和共享鎖模式(獨佔、共享)
7.2、 AQS的同步元件如下:
7.2.1、CountDownLatch:閉鎖,通過計數來保證執行緒是否一直阻塞.CountDownLatch是通過一個計數器來實現的,計數器的初始值為執行緒的數量。每當一個執行緒完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的執行緒已經完成了任務,然後在閉鎖上等待的執行緒就可以恢復執行任務。構造器中的計數值(count)實際上就是閉鎖需要等待的執行緒數量。這個值只能被設定一次,而且CountDownLatch沒有提供任何機制去重新設定這個計數值。
與CountDownLatch的第一次互動是主執行緒等待其他執行緒。主執行緒必須在啟動其他執行緒後立即呼叫CountDownLatch.await()方法。這樣主執行緒的操作就會在這個方法上阻塞,直到其他執行緒完成各自的任務。
其他N 個執行緒必須引用閉鎖物件,因為他們需要通知CountDownLatch物件,他們已經完成了各自的任務。這種通知機制是通過 CountDownLatch.countDown()方法來完成的;每呼叫一次這個方法,在建構函式中初始化的count值就減1。所以當N個執行緒都調 用了這個方法,count的值等於0,然後主執行緒就能通過await()方法,恢復執行自己的任務。
解釋一下CountDownLatch概念? `CountDownLatch`和 `CyclicBarrier`的不同之處? 給出一些CountDownLatch使用的例子? CountDownLatch類中主要的方法?
public class CountDownLatchExample1 {
// 執行緒數
private final static int threadCount = 200;
public static void main(String[] args) throws InterruptedException{
// 使用執行緒池進行排程
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
System.out.println("exception:" + e);
}finally{
countDownLatch.countDown(); // 計數器減一
}
});
}
countDownLatch.await(10, TimeUnit.MILLISECONDS);
System.out.println("===finished===");
exec.shutdown();
}
private static void test(int threadNum) throws InterruptedException{
Thread.sleep(100);
System.out.println("threadNum:" + threadNum);
}
}
7.2.2、Semaphore
(訊號量):可以控制同一時間併發執行緒的數目
主要函式:acquire、release、tryAcquire
public class SemaphoreExample1 {
// 執行緒數
private final static int threadCount = 20;
public static void main(String[] args) throws InterruptedException{
// 使用執行緒池進行排程
ExecutorService exec = Executors.newCachedThreadPool();
//併發控制(允許併發數20)
final Semaphore semaphore = new Semaphore(3);
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
if(semaphore.tryAcquire(5, TimeUnit.SECONDS)){
test(threadNum);
semaphore.release();
}
/** 多個許可:在程式碼中一共有10個許可,每次執行semaphore.acquire(5);
* 程式碼時耗費掉5個,所以20/5=4,
* 說明同一時間只有4個執行緒允許執行acquire()和release()之間的程式碼。
* */
// semaphore.acquire(3); // 獲取許可
// test(threadNum);
// semaphore.release(3); // 釋放許可
} catch (Exception e) {
System.out.println("exception:" + e);
}finally{
countDownLatch.countDown(); // 計數器減一
}
});
}
// countDownLatch.await(100, TimeUnit.MILLISECONDS);
System.out.println("===finished===");
exec.shutdown();
}
private static void test(int threadNum) throws InterruptedException{
System.out.println("threadNum:" + threadNum);
Thread.sleep(1000);
}
}
7.2.3、CyclicBarrier
:可以完成多個執行緒之間相互等待,只有當每個執行緒都準備就緒後,才能各自繼續往下執行
應用場景:需要所有的子任務都完成時,才執行主任務,這個時候就可以選擇使用CyclicBarrier。
簡單理解【`人滿發車`】:
長途汽車站提供長途客運服務。
當等待坐車的乘客到達20人時,汽車站就會發出一輛長途汽車,讓這20個乘客上車走人。
等到下次等待的乘客又到達20人是,汽車站就會又發出一輛長途汽車。
public class CyclicBarrierExample1 {
// 執行緒數
private final static int threadCount = 10;
// 屏障的執行緒數目 5
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
System.out.println("===continue===");
});
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(500);
executorService.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
System.out.println("===" + threadNum + " is ready.");
try{
barrier.await(2000, TimeUnit.MILLISECONDS);
}catch(Exception e){
System.out.println("e:"+e);
}
System.out.println("===" + threadNum + " continue");
}
}
7.2.4、ReentrantLock
1. api:
- lock()
- unlock()
- tryLock()
private static Lock lock = new ReentrantLock();
private static void test(int threadNum){
lock.lock();
try{
count++;
}finally{
lock.unlock();
}
}
2. ReentrantLock和synchronized的區別
- 1. `可重入性`
- 2. `鎖的實現`:synchronized是jvm實現,ReentrantLock是jdk實現
- 3. `效能區別`
- 4. `功能方面的區別`
3. ReentrantLock獨有的功能
- 1. 可指定是公平鎖還是非公平鎖,synchronized只能是非公平鎖(公平鎖:先等待的執行緒先獲得鎖)
- 2. 提供了一個Condition類,可以分組喚醒需要喚醒的執行緒
- 3. 提供能夠中斷等待鎖的執行緒的機制,lock.lockInterruptibly()
4. ReentrantReadWriteLock
5. StampedLock
6. 鎖的使用
- 當只有少量競爭者執行緒的時候,`synchronized`是一個很好的通用的鎖的實現(synchronized不會引發死鎖,jvm會自動解鎖)
- 競爭者執行緒不少,但是執行緒增長的趨勢是可以預估的,這時候使用`ReentrantLock`是一個很好的通用的鎖的實現
7.2.5、Condition
public class LockExample3 {
public static void main(String[] args){
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
int u=1;
new Thread(() -> {
try{
reentrantLock.lock();
System.out.println("wait signal"); // 1
condition.await();
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println("get signal");
reentrantLock.unlock();
}).start();
new Thread(() -> {
reentrantLock.lock();
System.out.println("get lock");
try{
Thread.sleep(3000);
}catch(InterruptedException e){
e.printStackTrace();
}
condition.signalAll();
System.out.println("send signal");
reentrantLock.unlock();
}).start();
}
}
7.2.6、FutureTask
建立執行緒兩種方式繼承Thread,實現Runnable介面,這兩種方式,在任務執行完畢之後獲取不到執行結果
FutureTask、Callable可以獲取到執行結果
1. Callable和Runnable對比
2. Future介面
3. FutureTask
```
public static void main(String[] args) throws InterruptedException, ExecutionException {
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("do something in callable...");
Thread.sleep(3000);
return "Done";
}
});
new Thread(futureTask).start();
System.out.println("do something in main...");
Thread.sleep(1000);
String result = futureTask.get();
System.out.println("result:"+result);
}
}
7.2.7、Fork/Join框架:將大模組切分成多個小模組進行計算
八、執行緒池
初始化好執行緒池例項之後,將任務丟進去等待排程執行。
8.1、Thread弊端
- 每次new Thread都要新建物件,效能差
- 執行緒缺乏統一管理,可能無限制的新建執行緒,相互競爭,有可能佔用過多的系統資源導致宕機或者OOM
- 缺少更多功能,如更多執行,定期執行,執行緒中斷
8.2、執行緒池的好處
- 可以重用存在的執行緒,減少物件的建立、消亡的開銷,效能佳
- 可以有效的控制最大併發數,提供系統資源利用率,同時可以避免過多的資源競爭,避免阻塞
- 提供定時執行、定期執行、單執行緒、併發數控制等功能
- 【
ThreadPoolExecutor
的初始化引數】corePoolSize
:核心執行緒數量maximumPoolSize
:縣城最大執行緒數workQueue
:阻塞佇列,儲存等待執行的任務,很重要,會對執行緒池執行過程產生重大影響keepAliveTime
:執行緒沒有任務執行時,最多保持多久時間終止unit
:keepAliveTime的時間單位hreadFactory
:執行緒工廠,用來建立執行緒rejectHandler
:當拒絕處理任務時的策略
執行緒池-ThreadPoolExecutor狀態
執行緒池-ThreadPoolExecutor方法
1. execute():提交任務,交給執行緒池執行
2. submit():提交任務能夠返回執行結果execute + Future
3. shutdown():關閉執行緒池,等待任務都執行完
4. shutdownNow():關閉執行緒池,不等待任務執行完
5. getTaskCount():執行緒池已執行和未執行的任務總數
6. getCompletedTaskCount():已完成的任務總數
7. getPoolSize():執行緒池當前的執行緒數量
8. getActiveCount:當前執行緒池中正在執行任務的執行緒數量
8.3、執行緒池 - Executors框架(建立執行緒池)
Executors.newCachedThreadPool
:建立一個可快取的執行緒池,如果執行緒池長度超過了處理的需要可以靈活回收空閒執行緒,如果沒有可以回收的,那麼就新建執行緒
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
// 往執行緒池中放任務
for (int i = 0; i < 10; i++) {
final int index = i; // 任務的序號
executorService.execute(() -> {
System.out.println("===task:"+index);
});
}
executorService.shutdown(); // 關閉執行緒池
}
Executors.newFixedThreadPool
:建立的是一個定長的執行緒池,可以控制執行緒的最大併發數,超出的執行緒會在佇列中等待Executors.newScheduledThreadPool
:建立的也是定長執行緒池,支援定時以及週期性的任務執行
public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
// 往執行緒池中放任務
executorService.scheduleAtFixedRate(() -> {
log.info("===sechedule run");
}, 1, 3, TimeUnit.SECONDS); // 延遲一秒,每隔三秒執行任務
executorService.schedule(() -> {
log.info("===sechedule run");
}, 3, TimeUnit.SECONDS);
executorService.shutdown(); // 關閉執行緒池
}
Executors.newSingleThreadExecutor
:建立的是一個單執行緒化的執行緒池,會用唯一的一個工作執行緒來執行任務,保證所有任務按照指令順序執行(指令順序可以指定它是按照先入先出,優先順序執行)
newSingleThreadExecutor列印結果是按照順序輸出
8.4、執行緒池 - 合理配置
1. CPU密集型任務,就需要儘量壓榨CPU,參考可以設定為NCPU+1
2. IO密集型任務,參考可以設定為2*NCPU
> NCPU = CPU的數量
> UCPU = 期望對CPU的使用率 0 ≤ UCPU ≤ 1
> W/C = 等待時間與計算時間的比率
> 如果希望處理器達到理想的使用率,那麼執行緒池的最優大小為:
> 執行緒池大小=NCPU *UCPU(1+W/C)