apache效能優化
多執行緒筆記(四)
1. Atomic框架包
Atomic包裡放著所以保證執行緒安全的原子類
大致分為7類
- 基本資料型別的原子操作類
- 引用型別的原子操作類
- 陣列型別的原子操作類
- 修改屬性欄位的原子操作類
- 帶版本的引用型別的原子操作類
- 增強的基本資料型別的原子操作類
- 增強操作的公共輔助類
2. CountDownLatch
CountDownLatch是一個輔助同步器類,是同步器框架裡常用的一個類,主要作用是用於計數用的,類似於倒計時。
同步器框架:用來輔助實現同步功能的一些類。例如:CountDownLatch,CyclicBarrier,Semaphore,Exchanger
CountDownLatch的計數器減為0的時候是不能再次使用的,必須重新再new一個CountDownLatch。
CountDownLatch基於AQS實現,內部使用共享鎖,因為要允許多個執行緒同時執行。
程式碼示例:
public class CountDownLatchTest { //多執行緒,統計給定的內容裡面,字母a出現的次數,不區分大小寫 public static void main(String[] args) { String[] contents = { "sadasgadnqdhqamdjawqeqa", "jodasjoqwehmkldasmdqaiwpmclz" }; int[] results = new int[contents.length]; CountDownLatch cdLatch = new CountDownLatch(contents.length); //啟動統計的計算執行緒 for(int i = 0; i < contents.length; i++){ new CountDownWorker(contents,i , results, cdLatch).start(); } //等待所有的統計的計算執行緒執行結束過後,才計算最後的統計結果 try { cdLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } //計算最後的統計結果 new CountDownWorkerSum(results).start(); } } class CountDownWorkerSum extends Thread{ private int[] results = null; public CountDownWorkerSum(int[] results){ this.results = results; } public void run(){ int sum = 0; for(int i = 0; i < results.length; i++){ sum += results[i]; } System.out.println("a總共出現的次數為 " + sum); } } class CountDownWorker extends Thread{ //需要統計的內容 private String[] contents = null; //本執行緒需要統計的索引 private int index = 0; //用引用傳遞的方式,返回統計的結構資料 private int[] results; private CountDownLatch cdLatch; public CountDownWorker(String[] contents, int index, int[] results, CountDownLatch cdLatch){ this.contents = contents; this.index = index; this.results = results; this.cdLatch = cdLatch; } public void run(){ System.out.println("Thread=" + Thread.currentThread().getName()); String str = contents[index]; str = str.trim(); int count = 0; for(int i = 1; i <= str.length(); i++){ if("a".equalsIgnoreCase(str.substring(i - 1, i))){ count++; } } System.out.println("第"+(index+1) +"行,共有字母a " + count + " 個"); results[index] = count; //計數 cdLatch.countDown(); } }
3. CyclicBarrier
CyclicBarrier,迴圈柵欄,也可稱為迴圈屏障,是一個輔助同步器類,功能和CountDownLatch類似。
使用Condition條件佇列,在沒有滿足要求之前讓執行緒進行等待。
程式碼示例:
public class CyclicBarrierTest { //多執行緒,統計給定的內容裡面,字母a出現的次數,不區分大小寫 public static void main(String[] args) { String[] contents = { "sadasgadnqdhqamdjawqeqa", "jodasjoqwehmkldasmdqaiwpmclz" }; int[] results = new int[contents.length]; //執行完的執行緒達到了指定數量之後,自動執行CyclicBarrierSum(results) CyclicBarrier cb = new CyclicBarrier(contents.length, new CyclicBarrierSum(results) ); //啟動統計的計算執行緒 for(int i = 0; i < contents.length; i++){ new CyclicBarrierWorker(contents,i , results, cb).start(); } } } class CyclicBarrierSum extends Thread{ private int[] results = null; public CyclicBarrierSum(int[] results){ this.results = results; } public void run(){ int sum = 0; for(int i = 0; i < results.length; i++){ sum += results[i]; } System.out.println("a總共出現的次數為 " + sum); } } class CyclicBarrierWorker extends Thread{ //需要統計的內容 private String[] contents = null; //本執行緒需要統計的索引 private int index = 0; //用引用傳遞的方式,返回統計的結構資料 private int[] results; private CyclicBarrier cb; public CyclicBarrierWorker(String[] contents, int index, int[] results, CyclicBarrier cb){ this.contents = contents; this.index = index; this.results = results; this.cb = cb; } public void run(){ System.out.println("Thread=" + Thread.currentThread().getName()); String str = contents[index]; str = str.trim(); int count = 0; for(int i = 1; i <= str.length(); i++){ if("a".equalsIgnoreCase(str.substring(i - 1, i))){ count++; } } System.out.println("第"+(index+1) +"行,共有字母a " + count + " 個"); results[index] = count; //到達柵欄,等待 try { cb.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
4. Semaphore
Semaphore的意思就是訊號量,也是一個輔助同步器類
Semaphore支援公平鎖與非公平鎖
Semaphore維護一定數量的“許可證”。當有執行緒想要訪問共享資源的時候,首先就要去獲得許可,如果許可證不夠了,執行緒就一直等待,直到獲得許可證。
執行緒使用完共享資源過後,應該歸還“許可證”,以供其他的執行緒來獲得許可證。當執行緒使用完共享資源過後,應該歸還許可證,以供其他的執行緒來獲得許可證。
程式碼示例:
public class SemaphoreTest {
//模擬前面走了一個人,後一個人才能進入地鐵站
public static void main(String[] args) {
//假設地鐵站容量為3,但是要來10倍容量的人
int maxNum = 3;
Semaphore sp = new Semaphore(maxNum);
for(int i = 0; i < maxNum * 10; i++){
new WeAreProgrammer(sp).start();
}
}
}
class WeAreProgrammer extends Thread{
private Semaphore sp = null;
public WeAreProgrammer(Semaphore sp){
this.sp = sp;
}
public void run(){
System.out.println("到達地鐵口,請求獲取進入地鐵站的許可,執行緒= "+ Thread.currentThread().getName());
//請求得到許可
try {
sp.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("獲取到許可,終於可以進入到地鐵站了,開始等地鐵,執行緒= "+ Thread.currentThread().getName());
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("上車了,真開心,釋放許可,執行緒= "+ Thread.currentThread().getName());
//釋放許可
sp.release();
}
}
5. Exchanger
Exchanger是交換器的意思,也是一個輔助同步器類。
Exchanger可用於兩個執行緒間交換資料(單槽位交換資料)。也支援多個執行緒間交換資料(多槽位交換資料),但是多少執行緒交換資料無法保證把某一執行緒的資料與另一指定執行緒交換。
程式碼示例:
public class ExchangerTest {
//實現兩個執行緒之間交換資料
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new ExchangerA(exchanger).start();
new ExchangerB(exchanger).start();
}
}
class ExchangerA extends Thread{
//假定交換字串型別的資料,所有泛型為String
private Exchanger<String> exchanger = null;
public ExchangerA(Exchanger<String> exchanger){
this.exchanger = exchanger;
}
public void run(){
System.out.println("ExchangerA在做之間的業務,產生了資料,執行緒= "+ Thread.currentThread().getName());
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ExchangerA等待與ExchangerB交換資料");
//交換資料
try {
String exchangeMsg = exchanger.exchange("《這是ExchangerA的資訊》");
System.out.println("獲得ExchangerB交換過來的資料=" + exchangeMsg);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ExchangerA接著執行自己的業務");
}
}
class ExchangerB extends Thread{
//假定交換字串型別的資料,所有泛型為String
private Exchanger<String> exchanger = null;
public ExchangerB(Exchanger<String> exchanger){
this.exchanger = exchanger;
}
public void run(){
System.out.println("ExchangerB在做之間的業務,產生了資料,執行緒= "+ Thread.currentThread().getName());
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ExchangerB等待與ExchangerB交換資料");
//交換資料
try {
String exchangeMsg = exchanger.exchange("《這是ExchangerB的資訊》");
System.out.println("獲得ExchangerA交換過來的資料=" + exchangeMsg);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ExchangerB接著執行自己的業務");
}
}
6. Executors框架
Executors框架是用來對多個執行緒,按照一定的策略,進行排程,執行和控制的框架。主要用於對執行緒池的管理
Executor介面
該介面內只有一個方法void execute(Runnable command);
用來在任務建立和任務執行之間進行解耦,用來提交可執行任務
ExecutorService介面
Executor的增強介面,繼承了Executor介面,在Executor的基礎上,增加了幾類實用的方法。提供對執行緒池宣告週期的管理,增加了對非同步任務,批處理任務的支援。
-
關閉執行器:
-
void shutdown();
:在呼叫該方法的時候,已經提交給執行器的任務會繼續執行,不會接受新任務的提交 -
List<Runnable> shutdownNow();
:不會接受新任務的提交,也會嘗試中斷現在正在執行的任務(需要任務支援響應中斷)
-
-
監視執行器的狀態:
-
boolean isShutdown();
:如果執行器關閉了,返回true -
boolean isTerminated();
:如果執行器終止(執行器關閉,並且所有的任務都已經完成)了,返回true
-
-
提供了對非同步任務的支撐:
-
<T> Future<T> submit(Callable<T> task);
:提交有返回值的任務 -
<T> Future<T> submit(Runnable task, T result);
:提交任務,result指向返回值,可在外部通過get獲得 -
Future<?> submit(Runnable task);
:提交沒有返回值的任務
-
-
提供了對批處理任務的支援:
-
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
:執行任務集合,返回Future集合 -
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
:執行任務集合,返回Future集合,加入了超時限制 -
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
:隨機執行任務集合中的任務,有一個任務執行完畢直接返回,同時終止其他沒有執行完畢的任務。 -
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
:隨機執行任務集合中的任務,有一個任務執行完畢直接返回,同時終止其他沒有執行完畢的任務,加入了超時限制。
-
ScheduledExecutorService介面
繼承了ExecutorService介面,用來定時執行或者週期性執行任務。
-
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
:提交一個任務,延時一段時間執行 -
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
:提交一個任務,延時一段時間執行 -
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
:提交一個任務,時間等於或超過time首次執行任務,之後每隔period*unit重複執行任務。 -
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
:建立並執行一個在給定初始延遲後首次啟用的定期操作,隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲。
Executors
Executors用於輔助建立Executor介面的實現類的例項
Executors是一個簡單工廠,大體包括
-
建立固定執行緒數的執行緒池
-
ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
-
-
建立可快取的執行緒池
-
ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
-
-
建立可延時,可週期執行的執行緒池
-
ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
-
-
建立單個執行緒數的執行緒池
-
ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
-
-
建立Fork/Join框架
-
ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
-
ThreadPoolExecutor
ThreadPoolExecutor繼承AbstractExecutorService類,而AbstractExecutorService又實現了ExecutorService介面
為什麼引入執行緒池:
- 減少因為頻繁建立和銷燬檢查所帶來的開銷
- 提高程式的響應速度
- 自動管理執行緒,排程任務的執行,使得呼叫者只用關注任務的建立
構造方法引數解釋
執行緒池的構造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
引數
corePoolSize:核心執行緒池數量
maximumPoolSize:執行緒池最大執行緒數量(核心執行緒池數量+非核心執行緒池數量)
keepAliveTime:非核心執行緒存活時間
unit:存活時間的單位
workQueue:任務佇列,當執行緒池的執行緒滿了之後,任務會進入任務佇列等待提交
handler:拒絕策略
拒絕策略
有兩種情況會執行拒絕策略:
- 核心執行緒池滿了,任務佇列也滿了,同時非核心執行緒池也滿了,這個時候再提交新的任務就會拒絕。
- 提交任務的時候,執行緒池關閉了。
一共有四種拒絕策略:
- AbortPolicy:丟擲一個異常
- DiscardPolicy:什麼都不做,等任務自己被回收
- DiscardOldestPolicy:丟棄佇列中最先入隊的一個任務,並執行當前任務
- CallerRunsPolicy:用呼叫者的執行緒來執行任務
執行緒池狀態
用AtomicInteger型別的變數ctl來表示執行緒池的狀態,高3位儲存執行緒池的狀態,低29位儲存執行緒的數量
一共五種狀態:
-
RUNNING(-1):接受新任務,而且會處理已經進入阻塞佇列的任務
-
SHUTDOWN(0):不接受新任務,但會處理已經進入阻塞佇列的任務
-
STOP(1):不再接受新的任務,而且不再處理已經進入阻塞佇列的任務,同時會中斷正在執行的任務
-
TIDYING(2):所有任務都已經終止,工作執行緒數量為0時,準備呼叫terminated方法
-
TERMINATED(3):terminated方法已經執行完成
執行緒池狀態變化示意圖:
7. Future介面
Future模式是多執行緒中一種常見的設計模式,用來實現非同步執行任務。呼叫方拿到的是憑證,拿到憑證之後就可以去處理其他的任務,在需要使用結果的時候再使用憑證還獲取呼叫結果。Future介面的具體實現類是FutureTask
FutureTask
狀態:
- NEW(0):表示任務的初始化狀態
- COMPLETING(1):表示任務已經執行完成,屬於中間狀態
- NORMAL(2):表示任務正常完成,屬於最終狀態
- EXCEPTIONAL(3):表示任務異常完成,屬於最終狀態
- CANCELLED(4):表示任務還沒有開始執行就被取消了(非中斷方式),屬於最終狀態
- INTERRUPTING(5):表示任務還沒有開始執行就被取消了(中斷方式),屬於中間狀態
- INTERRUPTED(6):表示任務還沒有開始執行就被取消了(中斷方式),已經被中斷,屬於最終狀態
狀態變化示意圖