第五章 基礎構建模塊
5.1 同步容器類
實現方式 : 將他們的狀態封裝起來,並對每個公有方法都進行同步, 使得每次只有一個線程可以訪問.
5.1.1 存在的問題
復合操作 並非線程安全. 比如 叠代, 條件運算等.
在對同步容器類的復合操作加鎖時一定要以容器對象為鎖對象, 保證復合操作的鎖對象和容器使用的鎖對象一致.才能實現一個線程安全的復合操作
public static void getLast(Vector<?> list) { // 此處的鎖對象必須和 Vector 內部的鎖對象一致 synchronized (list) {int lastIndex = list.size()-1; list.remove(lastIndex); } }
5.1.2 叠代器與ConcurrentModificationException
在容器的叠代過程中被修改(結構上被改變)時會拋出 ConcurrentModificationException
解決方法 : 在叠代過程中持有容器的鎖. 並在所有對共享容器進行叠代的地方加鎖
5.1.3 隱藏叠代器
以下操作也會間接的進行容器的叠代操作
toString() , hashCode() , equals() 等很多方法都出觸發容器的叠代操作.
5.2 並發容器
5.2.1 ConcurrentHashMap
- 加鎖策略 : 分段鎖(粒度更細的加鎖機制), 同步方法塊
- 支持多個線程並發的訪問 ConcurrentHashMap , 實現更高的吞吐量 .
- ConcurrentHashMap 返回的叠代器具有弱一致性 , 可以(但是不保證)將修改操作立即反映給容器
- 叠代過程中不會加鎖 , 也不會拋出 ConcurrentModificationException ,
- 將一些復合操作(putIfAbsent() 若沒有則添加) 實現為原子操作
5.2.3 CopyOnWriteArrayList
- 每次修改容器時先復制數組, 引用依舊指向原來的數組 , 然後修改新的數組, 最後將引用指向新的數組.
- 寫入時復制 也可理解為 修改時復制
- 返回的叠代器不會拋出 ConcurrentModificationException
- 使用場景 : 叠代操作遠遠多於修改操作. 復制數組的操作有一定的開銷.
- 修改操作使用 ReentrantLock 進行加鎖
5.3 阻塞隊列
- 提供阻塞的 put 和 take 方法
- put 方法將阻塞到直到有空間可用 , take 方法將阻塞到直到有元素可用
- 隊列可以有界, 也可以無界
- 修改容器時統一使用創建隊列實例時創建的 ReentrantLock 對象
BlockingQueue(阻塞隊列接口)
LinkedBlockingQueue 類似與 LinkedList
ArrayBlockingQueue 類似與 ArrayList
PriorityBlockingQueue 按優先級排序的隊列
SynchronousQueue 每個插入操作必須等待另一個線程的對應移除操作 ,反之亦然。非常適合做交換工作,生產者的線程和消費者的線程同步以傳遞某些信息、事件或者任務。
5.3.1 生產者與消費者的特點
- 生產者和消費者只需要完成各自的任務
- 阻塞隊列將負責所有的控制流
- 每個功能的代碼和邏輯更加清楚
- 他們整體的並行度取決於兩者中較低的並行度
生產者和消費者設計也可以使用 Executor 任務執行框架來實現, 其本身也使用 生產者--消費者模式
5.3.2 串行線程封閉
- 線程封閉對象只能由單個對象擁有,可以通過安全的發該對象來轉移所有權.並且發布對象的線程不會再訪問它
- 轉移所有權之後,只有另外一個線程獲得這個對象的訪問權限, 它可以對它做任意的修改,因為它有獨占訪問權.
5.3.3 雙端隊列和工作密取
雙端隊列
- ArrayDeque 和 LinkedBlockingDeque
- 實現在隊列頭和隊列尾的高效插入和移除.
工作密取
- 每個消費者有自己的雙端隊列 , 當一個消費者完成自己隊列的所有任務後 , 那麽它可以從其他消費者的雙端隊列秘密的獲取任務 .
5.4 阻塞方法和中斷方法
處理 InterruptedException
- 將 InterruptedException 傳遞給方法的調用者
- 捕獲這個異常, 並恢復中斷狀態
5.5 同步工具類
5.5.1 閉鎖(CountDownLatch)
作用 : 用來確保某些活動直到其他活動都完成後才執行.
- 計數器 : 表示需要等待的事件數量
- await() : 阻塞調用此方法的線程直到計數器為0
- countDown() : 計數器遞減
用法一 : 創建一定數量的線程,多個線程並發的執行任務
- 使用兩個閉鎖, 分別表示起始門和結束門. 起始門初始值為1 , 結束門初始值為等待的事件數量.
- 每個工作線程在起始門等待,
- 所有線程就緒後起始門調用 countDown()直到起始門的值為0, 之後所有線程同時開始執行(因為線程之前都在起始門等待)
- 主線程調用結束門的await(), 主線程阻塞直到所有工作線程結束
- 工作線程全部執行完畢, 主線程運行
package com.pinnet.test; import java.util.concurrent.CountDownLatch; public class CountLatchTest { public void timeTask(int threadNumbers, Runnable task) throws InterruptedException { CountDownLatch start = new CountDownLatch(1); CountDownLatch end = new CountDownLatch(threadNumbers); for (int i = 0; i < threadNumbers; i++) { new Thread() { public void run() { try { // 所有線程在起始門等待 start.await(); // 執行任務 task.run(); // 結束門遞減 end.countDown(); } catch (InterruptedException e) { } } }.start(); } // 所有工作線程開始執行 start.countDown(); // 所有工作線程啟動後主線程立即登待 end.await(); System.out.println("開始主線程"); } }
用法二: 創建一定數量的線程,多個線程依次的執行任務
- 使用一個閉鎖
- 線程依次啟動, 執行完成後countDown()
- 主線程 await() , 直到計數器為0 ,主線程執行
public void timeTask2(int threadNumbers, Runnable task) throws InterruptedException { CountDownLatch start = new CountDownLatch(threadNumbers); // 任務一次執行 for (int i = 0; i < threadNumbers; i++) { new Thread() { public void run() { // 任務開始 task.run(); // 遞減 start.countDown(); } }.start(); } // 主線程阻塞直到計數器為0 start.await(); System.out.println("開始主線程"); }
5.5.2 FutureTask
一種可生成結果,可異步取消的計算.
- 計算通過 Callable 實現
- 可以將計算結果從執行任務的線程 傳遞 到獲取這個結果的線程 , 並保證其安全性
- get() 獲取結果可阻塞到任務完成
public class FutureTaskTest { // 創建任務 private final FutureTask<Integer> future = new FutureTask<>(new Callable<Integer>() { public Integer call() { return 123; } }); // 創建線程 private final Thread thread = new Thread(future); // 對外提供方法啟動線程 public void start() { thread.start(); } // 獲取計算結果 public Integer get() { try { return future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); return null; } } }
5.5.3 信號量(Semaphore)
- 控制同時訪問某個資源或執行某個操作的數量.
- Semaphore 管理一定數量的許可, 執行操作時先獲取許可,執行完操作後釋放許可.
- 獲取許可時可阻塞.
5.5.4 柵欄
閉鎖和柵欄的區別 :
- 閉鎖是一次性對象 , 一旦進入終止狀態 , 就不能被重置
- 柵欄是所有線程必須同時到達柵欄位置 , 才能繼續執行.
- 閉鎖用於等待事件 , 柵欄用於等待線程.
基本使用 :
- 指定數量線程到達柵欄位置後,所有線程被釋放 . 柵欄將被重置
- 成功通過柵欄,await()將會返回一個到達索引號
- await() 超時 或 await()阻塞的線程被中斷 , 則柵欄被打破.所有阻塞的await() 調用被終止 , 並拋出 BrokenBarrierException.
用法一 : CyclicBarrier(int number)
await() 調用 number 次後所有調用 await() 的線程繼續執行 , 否則線程在await() 阻塞
public void barrier() { int number = 6; // 參數表示屏障攔截的線程數量 // barrier.await() 調用 number 次後所有線程的阻塞狀態解除 CyclicBarrier barrier = new CyclicBarrier(number); for (int i = 0; i < number; i++) { new Thread(new Runnable() { @Override public void run() { System.out.println("此線程任務已經完成"); try { // 調用await方法告訴CyclicBarrier我已經到達了屏障,然後當前線程被阻塞。 barrier.await(); System.out.println("所有線程執行完成"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }).start(); } }
用法二 : CyclicBarrier(int parties, Runnable runnable) 指定數量的線程到達屏障點後 執行 runnable
public void barrier2() { int number = 6; // 參數表示屏障攔截的線程數量 // barrier.await() 調用 number 次後所有線程的阻塞狀態解除 CyclicBarrier barrier = new CyclicBarrier(number, new Runnable() { @Override public void run() { // 指定數量的線程到達屏障點後執行 barrier() barrier(); } }); for (int i = 0; i < number; i++) { new Thread(new Runnable() { @Override public void run() { System.out.println("此線程任務已經完成"); try { // 調用await方法告訴CyclicBarrier我已經到達了屏障,然後當前線程被阻塞。 barrier.await(); System.out.println("所有線程執行完成"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }).start(); } }
雙方形式的柵欄 Exchanger
第五章 基礎構建模塊