併發程式設計-阻塞佇列&JUC常用工具
併發程式設計-阻塞佇列&JUC常用工具
本章主要和大家聊聊之前的阻塞佇列,並且聊聊他如何使用,以及JUC中常用的一些工具,例如【CountDownLatch】、【Semaphore】、【CyclicBarrier】這些都是控制執行緒的一些工具,我們會聊聊他們如何使用,以及實現原理。
阻塞佇列(之前聊過一下,現在再次補充)
【LinkedTransferQueue】:由一個連結串列構建的一個無界阻塞佇列(實際上是一個Interger.maxvalue實際上還是有大小的,只不過他真的足夠大):他既是一個佇列,也有一個生產者對應一個消費者的功能,因為繼承了AbstractQueue並且實現了TransferQueue(這個在SynchronousQueue
中有使用,他的特性就是一個生產者對應一個消費者),我們可以認為他是一個連結串列和SynchronousQueue特性和合體。【LinkedBlockingDeque】:是一個雙向連結串列的佇列,他可以支援兩端的插入和移除(這就在一定程度上解決了多執行緒新增元素的競爭問題,因為這樣可以減少一半的競爭),只不過增加了幾個方法(借鑑網上的圖片)
阻塞佇列的使用(實際上他相當於非同步mq)
我們能想到用非同步mq解決問題的地方,都可以使用阻塞佇列。大家都瞭解責任鏈模式,想當於一個流水線,每個環節處理相關的請求,但是這個責任鏈點如果太多,勢必返回的時間就就會很長,我們就可以利用阻塞佇列來提升一下這個效能,減少同步請求帶來的損耗。這裡寫一個責任鏈中中使用阻塞佇列的demo去削峰
首先我們定義四個責任鏈點,我們知道,普通的責任鏈點是某個鏈路點執行完成他自己的任務後,才把責任交給下一個鏈路,咱們這裡,讓每個責任鏈路直接放行,把任務放到阻塞佇列中,用一個自旋去消費,這就可以達到削峰的目的,切記,這個只能是不需要立馬返回值的業務情況。這些阻塞佇列的底層大部分都使用了lock和condition萬變不離其宗
//檢驗資料責任鏈 public class ValidateProcessor extends Thread implements IRequestProcessor { public ValidateProcessor(IRequestProcessor iRequestProcessor) {this.iRequestProcessor = iRequestProcessor; } BlockingQueue<Request> requests = new LinkedBlockingDeque<>(); //下一個執行者 IRequestProcessor iRequestProcessor; @Override public void doMyDuty(Request request) { //當某個請求經過我這裡的時候,我先不處理,把他放在佇列中,然後放行(流量削峰) requests.add(request); } @Override public void run() { // 這裡不斷的對資料進行消費 while (true) { try { // 非同步進行請求的處理,其實這裡的底層都是使用的lock Request request = requests.take(); System.out.println(this.getClass().getSimpleName() + "處理" + request.getName()); if (iRequestProcessor != null) { iRequestProcessor.doMyDuty(request); } } catch (InterruptedException e) { e.printStackTrace(); } } } } // 儲存資料責任鏈 public class SaveProcessor extends Thread implements IRequestProcessor { public SaveProcessor(IRequestProcessor iRequestProcessor) { this.iRequestProcessor = iRequestProcessor; } BlockingQueue<Request> requests = new LinkedBlockingDeque<>(); //下一個執行者 IRequestProcessor iRequestProcessor; @Override public void doMyDuty(Request request) { //當某個請求經過我這裡的時候,我先不處理,把他放在佇列中,然後放行(流量削峰) requests.add(request); } @Override public void run() { while (true) { try { // 非同步進行請求的處理 Request request = requests.take(); System.out.println(this.getClass().getSimpleName()+"處理"+request.getName()); if (iRequestProcessor!=null){ iRequestProcessor.doMyDuty(request); } } catch (InterruptedException e) { e.printStackTrace(); } } } } // 列印資料責任鏈 public class PrintProcessor extends Thread implements IRequestProcessor { public PrintProcessor(IRequestProcessor iRequestProcessor) { this.iRequestProcessor = iRequestProcessor; } BlockingQueue<Request> requests = new LinkedBlockingDeque<>(); //下一個執行者 IRequestProcessor iRequestProcessor; @Override public void doMyDuty(Request request) { //當某個請求經過我這裡的時候,我先不處理,把他放在佇列中,然後放行(流量削峰) requests.add(request); } @Override public void run() { while (true) { try { // 非同步進行請求的處理 Request request = requests.take(); System.out.println(this.getClass().getSimpleName()+"處理"+request.getName()); if (iRequestProcessor!=null){ iRequestProcessor.doMyDuty(request); } } catch (InterruptedException e) { e.printStackTrace(); } } } } //最後一個責任鏈 public class FinalProcessor extends Thread implements IRequestProcessor { @Override public void doMyDuty(Request request) { // you can do whatever you want here } }對責任鏈進行組裝和測試
public class ChainExample { public static void main(String[] args) { FinalProcessor finalProcessor = new FinalProcessor(); finalProcessor.start(); SaveProcessor saveProcessor = new SaveProcessor(finalProcessor); saveProcessor.start(); PrintProcessor printProcessor = new PrintProcessor(saveProcessor); printProcessor.start(); ValidateProcessor validateProcessor = new ValidateProcessor(printProcessor); validateProcessor.start(); Request request=new Request(); request.setName("Glen"); // 這裡就把問的請求傳遞給每個消費者,那我們就可以使用 validateProcessor.doMyDuty(request); } }這些責任鏈都需要實現同一個介面,同時有一個dao去傳遞資料
public interface IRequestProcessor { void doMyDuty(Request request); } @Data public class Request { String name; }
JUC常用併發工具
【CountDownLatch】:是一個同步工具,允許一個或者多個執行緒一直等待。然後通過某個執行緒的執行完畢而喚醒其他等待中的執行緒。他主要提供兩方法【await】【countdown】,簡而言之他就是一個倒計時的計數器,我們定義一個數字,比如三,那有三個執行緒都呼叫他的countdown方法,他的底層是每次一個執行緒呼叫一下countdown方法,他體內的數字就減去一,直到數字為0,則被阻塞的執行緒被喚醒。demo(其實他的作用點像JOIN)->我們看到,其實他就類似於一個訊號,當一個執行緒執行後,告訴下個執行緒我執行完了,然後在總數中減一,當總數為零則喚醒被阻塞的執行緒
public class CountDownExample { static CountDownLatch countDownLatch=new CountDownLatch(3); static class Thread1 extends Thread{ @Override public void run() { System.out.println("作為自己的事情"+Thread.currentThread().getName()); countDownLatch.countDown(); } } static class Thread2 extends Thread{ @Override public void run() { System.out.println("作為自己的事情"+Thread.currentThread().getName()); countDownLatch.countDown(); } } static class Thread3 extends Thread{ @Override public void run() { System.out.println("作為自己的事情"+Thread.currentThread().getName()); countDownLatch.countDown(); } } public static void main(String[] args) throws InterruptedException { Thread thread1=new Thread1(); thread1.start(); Thread thread2=new Thread2(); thread2.start(); Thread thread3=new Thread3(); thread3.start(); countDownLatch.await(); System.out.println("執行main,所有執行緒執行玩成"); } }我們可以用它來做服務校驗,當所有我們依賴的服務都正常啟動後,我們在啟動我們的主執行緒。這裡我們使用一個模板模式來模擬這個流程。
首先我們定義一個模板方法類,在這個類中使用執行緒呼叫各個子類的驗證服務方法。
@Data public abstract class BaseHealthChecker implements Runnable { String serviceName; CountDownLatch countDownLatch; public BaseHealthChecker(String serviceName, CountDownLatch countDownLatch) { this.serviceName = serviceName; this.countDownLatch=countDownLatch; } abstract void verifyService() throws InterruptedException; //非同步驗證 @Override public void run() { try {
//呼叫子類的方法 verifyService();
//對計數器進行減一操作 countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }這裡是各個子類的執行邏輯(執行緒休眠這裡只是模擬他去發包和收包的過程)
public class CacheServiceChecker extends BaseHealthChecker { public CacheServiceChecker(CountDownLatch countDownLatch) { super("CacheServiceChecker",countDownLatch); } @Override void verifyService() throws InterruptedException { System.out.println("checking..."+this.getServiceName()); Thread.sleep(3000); System.out.println(this.getServiceName()+"all things are ok"); } } public class DataBaseServiceChecker extends BaseHealthChecker { public DataBaseServiceChecker(CountDownLatch countDownLatch) { super("DataBaseServiceChecker",countDownLatch); } @Override void verifyService() throws InterruptedException { System.out.println("checking..."+this.getServiceName()); Thread.sleep(3000); System.out.println(this.getServiceName()+"all things are ok"); } }這裡去組裝以及啟動各個模板子類的執行緒
public class ApplicationStartUp { static List<BaseHealthChecker> checkers; private static CountDownLatch countDownLatch=new CountDownLatch(2); static { checkers = new ArrayList<>(); checkers.add(new CacheServiceChecker(countDownLatch)); checkers.add(new DataBaseServiceChecker(countDownLatch)); } static ApplicationStartUp INSTANCE = new ApplicationStartUp(); ApplicationStartUp getInstance() { return INSTANCE; } static boolean CheckDependentServices() throws InterruptedException { for (BaseHealthChecker checker : checkers) { //每一個服務都採用執行緒去執行 new Thread(checker).start(); }
//這裡對主執行緒進行阻塞 countDownLatch.await(); return true; } }這裡對外暴露一個服務,我們的主執行緒去呼叫他進行校驗各個服務的可用性
public class StartUpMain { public static void main(String[] args) { try { ApplicationStartUp.CheckDependentServices(); } catch (InterruptedException e) { e.printStackTrace(); }
//可以列印這句話就證明主線前面的各個校驗執行緒已經執行完成 System.err.println("all dependent services are checked and are all available"); } }阻塞多個執行緒的意思就是,我們在每個執行緒執行完成後都呼叫【await】在main中呼叫【countDown】,然後給countdownlatch初始為0,這樣main就相當於一個發令槍,當mian執行了countdown,所有被阻塞的執行緒也就活了。
CountDownLatch總結:
這就是一種共享鎖,可以允許多個執行緒同時搶到鎖,然後等到計數器歸零,則同時喚醒。我們來看一下他類的關係圖,我們看到他的底層實際上還是用AQS實現的,只不過他走的是一個共享鎖
,大概流程就是,
- 他維護了一個state的數字,每個執行緒執行完成後,他的state就--
- 直到state為0,他則喚醒佇列中的所有執行緒,
- 這點和我們之前講到的不同,之前講到的是隻喚醒頭節點後的下一個節點
await(原始碼解析)
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //如果他小於零(計數器不為零),那當前執行緒就應該被阻塞 if (tryAcquireShared(arg) < 0) //這裡進行共享鎖的搶佔 doAcquireSharedInterruptibly(arg); } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //把當前執行緒加入到一個節點中,在這裡面構建一個雙向鏈 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); //如果當前節點是頭結點那就進行搶佔 if (p == head) { //共享鎖的方式進行搶佔,如果大於零則說明搶佔鎖成功 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //判斷自己是否應該掛起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }countDown(原始碼解析)
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; //每次對原來的數字減一 int nextc = c-1; //如果數字為零,則喚醒所有節點 if (compareAndSetState(c, nextc)) return nextc == 0; } } } //對節點進行喚醒 private void doReleaseShared() { //自旋喚醒 for (;;) { //不斷的取下一個節點 Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //只要節點狀態是signal則進行喚醒 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; //這裡進行喚醒,喚醒後就回到await中上次執行緒被阻塞的地方,在await的自旋中佇列中的執行緒逐個被喚醒 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }【Semaphore】:我們可以把它叫做新訊號燈,實際上我們可以把它理解成一個限流器,它可以限制資源的訪問,就是限流,例如【sentinel】,搶佔一個令牌,搶佔到就通訊,沒有搶佔到就阻塞,它裡面主要有兩個方法
- acquire(搶佔令牌數):這裡是搶佔一個令牌,一次可以搶佔多個,沒有傳遞引數則只搶佔一個
- realise:釋放令牌
如何使用semaphore(這裡模擬一個停車場,一共有10個車位,也就是一次只能執行10個執行緒,當10箇中的一個釋放,其他才能進行資源的獲取)
public class SemaphoreExample { public static void main(String[] args) { // 限制資源訪問的併發數量 Semaphore semaphore=new Semaphore(10); for (int i = 0; i <20 ; i++) { new Thread(new car(i,semaphore)).start(); } } static class car extends Thread{ private int num; private Semaphore semaphore; public car(int num, Semaphore semaphore) { this.num = num; this.semaphore = semaphore; } @Override public void run() { try { //獲得一個令牌 semaphore.acquire(); System.out.println("第"+num+"車搶到一個車位"); TimeUnit.SECONDS.sleep(2); System.out.println("第"+num+"釋放車位!"); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 釋放一個令牌 semaphore.release(); } } } }結果:
【Semaphore】實現原理:
acquire(總數-1):
- 當總數為0的時候則阻塞,就類似咱們上面的場景(車庫滿了)
- 可能同阻塞n個執行緒(別的車就無法進入車庫)
realist(總數+1):
- 有令牌就阻塞的執行緒中喚醒(那肯定有一個佇列來儲存這些個被阻塞的佇列)
原始碼解析(我們發現它這裡還用的是aqs,,所以這裡就不贅述了,他底層用的是共享鎖):
問題:為什麼他要使用共享鎖呢?這裡好像用同步鎖更好點,其實想一下,用共享鎖就意味著他可以一次喚醒多個執行緒,那就意味著多個執行緒可以同時執行,那就提升了效能。
CyclicBarrier:他是一個可以重複的柵欄,簡而言之就是他有類似與一個閥門,當到達一個極值或者頂點的時候,允許多個執行緒同時執行,這就有點像【countdownlaunch】多個執行緒等於呼叫了【await】方法,然後一個執行緒使用【countdown】方法去喚醒,就等於說是一個投票,當所有人投完票,才能公佈結果。一個例子來看一下如何使用。
使用方法:
public class CyclicBarrierExample { public static void main(String[] args) { int n=3; CyclicBarrier cyclicBarrier=new CyclicBarrier(n,()->{ System.out.println("所有執行緒執行完成"); }); for (int i = 0; i <n ; i++) { new ballot(cyclicBarrier).start(); } } static class ballot extends Thread{ CyclicBarrier cyclicBarrier; public ballot(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName()+"投票完成,等待其他人進行投票。。"); cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } } }實現原理(基於【ReentrantLock】和【Condition】實現):
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); //參與的執行緒數 this.parties = parties; //用於記錄當前已經執行的數 this.count = parties; //執行完成後的毀掉方法 this.barrierCommand = barrierAction; }private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //首先加鎖,保證執行緒安全性 lock.lock(); try { //這裡就是柵欄,比如當你的計數器成為0,它有可以回到你的初始數值 final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; //如果計數器為零 if (index == 0) { // tripped boolean ranAction = false; try { //這裡執行咱們傳遞過去的action final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } for (;;) { try { if (!timed) //trip是一個condition trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) {
//這裡是signal breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }