1. 程式人生 > 其它 >併發程式設計-阻塞佇列&JUC常用工具

併發程式設計-阻塞佇列&JUC常用工具

併發程式設計-阻塞佇列&JUC常用工具

本章主要和大家聊聊之前的阻塞佇列,並且聊聊他如何使用,以及JUC中常用的一些工具,例如【CountDownLatch】、【Semaphore】、【CyclicBarrier】這些都是控制執行緒的一些工具,我們會聊聊他們如何使用,以及實現原理。

阻塞佇列(之前聊過一下,現在再次補充

LinkedTransferQueue】:由一個連結串列構建的一個無界阻塞佇列(實際上是一個Interger.maxvalue實際上還是有大小的,只不過他真的足夠大):他既是一個佇列,也有一個生產者對應一個消費者的功能,因為繼承了AbstractQueue並且實現了TransferQueue(這個在SynchronousQueue

中有使用,他的特性就是一個生產者對應一個消費者),我們可以認為他是一個連結串列和SynchronousQueue特性和合體。

LinkedBlockingDeque】:是一個雙向連結串列的佇列,他可以支援兩端的插入和移除(這就在一定程度上解決了多執行緒新增元素的競爭問題,因為這樣可以減少一半的競爭),只不過增加了幾個方法(借鑑網上的圖片)

阻塞佇列的使用(實際上他相當於非同步mq)

我們能想到用非同步mq解決問題的地方,都可以使用阻塞佇列。大家都瞭解責任鏈模式,想當於一個流水線,每個環節處理相關的請求,但是這個責任鏈點如果太多,勢必返回的時間就就會很長,我們就可以利用阻塞佇列來提升一下這個效能,減少同步請求帶來的損耗。這裡寫一個責任鏈中中使用阻塞佇列的demo去削峰

首先我們定義四個責任鏈點,我們知道,普通的責任鏈點是某個鏈路點執行完成他自己的任務後,才把責任交給下一個鏈路,咱們這裡,讓每個責任鏈路直接放行,把任務放到阻塞佇列中,用一個自旋去消費,這就可以達到削峰的目的,切記,這個只能是不需要立馬返回值的業務情況。這些阻塞佇列的底層大部分都使用了lock和condition萬變不離其宗

//檢驗資料責任鏈
public class ValidateProcessor extends Thread implements IRequestProcessor {
        public ValidateProcessor(IRequestProcessor iRequestProcessor) {
                
this.iRequestProcessor = iRequestProcessor; } BlockingQueue<Request> requests = new LinkedBlockingDeque<>(); //下一個執行者 IRequestProcessor iRequestProcessor; @Override public void doMyDuty(Request request) { //當某個請求經過我這裡的時候,我先不處理,把他放在佇列中,然後放行(流量削峰) requests.add(request); } @Override public void run() { // 這裡不斷的對資料進行消費 while (true) { try { // 非同步進行請求的處理,其實這裡的底層都是使用的lock Request request = requests.take(); System.out.println(this.getClass().getSimpleName() + "處理" + request.getName()); if (iRequestProcessor != null) { iRequestProcessor.doMyDuty(request); } } catch (InterruptedException e) { e.printStackTrace(); } } } } // 儲存資料責任鏈 public class SaveProcessor extends Thread implements IRequestProcessor { public SaveProcessor(IRequestProcessor iRequestProcessor) { this.iRequestProcessor = iRequestProcessor; } BlockingQueue<Request> requests = new LinkedBlockingDeque<>(); //下一個執行者 IRequestProcessor iRequestProcessor; @Override public void doMyDuty(Request request) { //當某個請求經過我這裡的時候,我先不處理,把他放在佇列中,然後放行(流量削峰) requests.add(request); } @Override public void run() { while (true) { try { // 非同步進行請求的處理 Request request = requests.take(); System.out.println(this.getClass().getSimpleName()+"處理"+request.getName()); if (iRequestProcessor!=null){ iRequestProcessor.doMyDuty(request); } } catch (InterruptedException e) { e.printStackTrace(); } } } } // 列印資料責任鏈 public class PrintProcessor extends Thread implements IRequestProcessor { public PrintProcessor(IRequestProcessor iRequestProcessor) { this.iRequestProcessor = iRequestProcessor; } BlockingQueue<Request> requests = new LinkedBlockingDeque<>(); //下一個執行者 IRequestProcessor iRequestProcessor; @Override public void doMyDuty(Request request) { //當某個請求經過我這裡的時候,我先不處理,把他放在佇列中,然後放行(流量削峰) requests.add(request); } @Override public void run() { while (true) { try { // 非同步進行請求的處理 Request request = requests.take(); System.out.println(this.getClass().getSimpleName()+"處理"+request.getName()); if (iRequestProcessor!=null){ iRequestProcessor.doMyDuty(request); } } catch (InterruptedException e) { e.printStackTrace(); } } } } //最後一個責任鏈 public class FinalProcessor extends Thread implements IRequestProcessor { @Override public void doMyDuty(Request request) { // you can do whatever you want here } }

對責任鏈進行組裝和測試

public class ChainExample {
    public static void main(String[] args) {
        FinalProcessor finalProcessor = new FinalProcessor();
        finalProcessor.start();
        SaveProcessor saveProcessor = new SaveProcessor(finalProcessor);
        saveProcessor.start();
        PrintProcessor printProcessor = new PrintProcessor(saveProcessor);
        printProcessor.start();
        ValidateProcessor validateProcessor = new ValidateProcessor(printProcessor);
        validateProcessor.start();
        Request request=new Request();
        request.setName("Glen");

        // 這裡就把問的請求傳遞給每個消費者,那我們就可以使用
        validateProcessor.doMyDuty(request);
    }
}

這些責任鏈都需要實現同一個介面,同時有一個dao去傳遞資料

public interface IRequestProcessor {
    void  doMyDuty(Request request);
}
@Data
public class Request {
    String name;
}

JUC常用併發工具

【CountDownLatch:是一個同步工具,允許一個或者多個執行緒一直等待。然後通過某個執行緒的執行完畢而喚醒其他等待中的執行緒。他主要提供兩方法【await】【countdown】,簡而言之他就是一個倒計時的計數器,我們定義一個數字,比如三,那有三個執行緒都呼叫他的countdown方法,他的底層是每次一個執行緒呼叫一下countdown方法,他體內的數字就減去一,直到數字為0,則被阻塞的執行緒被喚醒。demo(其實他的作用點像JOIN)->我們看到,其實他就類似於一個訊號,當一個執行緒執行後,告訴下個執行緒我執行完了,然後在總數中減一,當總數為零則喚醒被阻塞的執行緒

public class CountDownExample {
    static CountDownLatch countDownLatch=new CountDownLatch(3);
    static class Thread1 extends Thread{
        @Override
        public void run() {
            System.out.println("作為自己的事情"+Thread.currentThread().getName());
            countDownLatch.countDown();
        }
    }
    static class Thread2 extends Thread{
        @Override
        public void run() {
            System.out.println("作為自己的事情"+Thread.currentThread().getName());
            countDownLatch.countDown();
        }
    }
    static class Thread3 extends Thread{
        @Override
        public void run() {
            System.out.println("作為自己的事情"+Thread.currentThread().getName());
            countDownLatch.countDown();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread1=new Thread1();
        thread1.start();
        Thread thread2=new Thread2();
        thread2.start();
        Thread thread3=new Thread3();
        thread3.start();
        countDownLatch.await();
        System.out.println("執行main,所有執行緒執行玩成");
    }
}

我們可以用它來做服務校驗,當所有我們依賴的服務都正常啟動後,我們在啟動我們的主執行緒。這裡我們使用一個模板模式來模擬這個流程。

首先我們定義一個模板方法類,在這個類中使用執行緒呼叫各個子類的驗證服務方法。

@Data
public abstract  class  BaseHealthChecker implements Runnable {
    String serviceName;
    CountDownLatch countDownLatch;

    public BaseHealthChecker(String serviceName, CountDownLatch countDownLatch) {
        this.serviceName = serviceName;
        this.countDownLatch=countDownLatch;
    }

    abstract void verifyService() throws InterruptedException;

    //非同步驗證
    @Override
    public void run() {
        try {
        //呼叫子類的方法 verifyService();
        //對計數器進行減一操作 countDownLatch.countDown(); }
catch (InterruptedException e) { e.printStackTrace(); } } }

這裡是各個子類的執行邏輯(執行緒休眠這裡只是模擬他去發包和收包的過程)

public class CacheServiceChecker extends BaseHealthChecker {


    public CacheServiceChecker(CountDownLatch countDownLatch) {
        super("CacheServiceChecker",countDownLatch);
    }

    @Override
    void verifyService() throws InterruptedException {
        System.out.println("checking..."+this.getServiceName());
        Thread.sleep(3000);
        System.out.println(this.getServiceName()+"all things are ok");
    }
}
public class DataBaseServiceChecker extends BaseHealthChecker {
    public DataBaseServiceChecker(CountDownLatch countDownLatch) {
        super("DataBaseServiceChecker",countDownLatch);
    }

    @Override
    void verifyService() throws InterruptedException {
        System.out.println("checking..."+this.getServiceName());
        Thread.sleep(3000);
        System.out.println(this.getServiceName()+"all things are ok");
    }
}

這裡去組裝以及啟動各個模板子類的執行緒

public class ApplicationStartUp {
    static List<BaseHealthChecker> checkers;
    private static CountDownLatch countDownLatch=new CountDownLatch(2);
    static {
        checkers = new ArrayList<>();
        checkers.add(new CacheServiceChecker(countDownLatch));
        checkers.add(new DataBaseServiceChecker(countDownLatch));
    }
    static ApplicationStartUp INSTANCE = new ApplicationStartUp();
    ApplicationStartUp getInstance() {
        return INSTANCE;
    }

    static boolean CheckDependentServices() throws InterruptedException {
        for (BaseHealthChecker checker : checkers) {
            //每一個服務都採用執行緒去執行
            new Thread(checker).start();
        }
    //這裡對主執行緒進行阻塞 countDownLatch.await();
return true; } }

這裡對外暴露一個服務,我們的主執行緒去呼叫他進行校驗各個服務的可用性

public class StartUpMain {
    public static void main(String[] args) {
        try {
            ApplicationStartUp.CheckDependentServices();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    //可以列印這句話就證明主線前面的各個校驗執行緒已經執行完成 System.err.println(
"all dependent services are checked and are all available"); } }

阻塞多個執行緒的意思就是我們在每個執行緒執行完成後都呼叫【await】在main中呼叫【countDown】,然後給countdownlatch初始為0,這樣main就相當於一個發令槍,當mian執行了countdown,所有被阻塞的執行緒也就活了。

CountDownLatch總結:

這就是一種共享鎖,可以允許多個執行緒同時搶到鎖,然後等到計數器歸零,則同時喚醒。我們來看一下他類的關係圖,我們看到他的底層實際上還是用AQS實現的,只不過他走的是一個共享鎖

大概流程就是

  • 他維護了一個state的數字,每個執行緒執行完成後,他的state就--
  • 直到state為0,他則喚醒佇列中的所有執行緒,
  • 這點和我們之前講到的不同,之前講到的是隻喚醒頭節點後的下一個節點

await(原始碼解析)

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //如果他小於零(計數器不為零),那當前執行緒就應該被阻塞
    if (tryAcquireShared(arg) < 0)
        //這裡進行共享鎖的搶佔
        doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //把當前執行緒加入到一個節點中,在這裡面構建一個雙向鏈
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //如果當前節點是頭結點那就進行搶佔
            if (p == head) {
                //共享鎖的方式進行搶佔,如果大於零則說明搶佔鎖成功
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //判斷自己是否應該掛起
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

countDown(原始碼解析)

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        //每次對原來的數字減一
        int nextc = c-1;
        //如果數字為零,則喚醒所有節點
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
}
//對節點進行喚醒
private void doReleaseShared() {
    //自旋喚醒
    for (;;) {
        //不斷的取下一個節點
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //只要節點狀態是signal則進行喚醒
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue; 
                //這裡進行喚醒,喚醒後就回到await中上次執行緒被阻塞的地方,在await的自旋中佇列中的執行緒逐個被喚醒
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

Semaphore】:我們可以把它叫做新訊號燈,實際上我們可以把它理解成一個限流器,它可以限制資源的訪問,就是限流,例如【sentinel】,搶佔一個令牌,搶佔到就通訊,沒有搶佔到就阻塞,它裡面主要有兩個方法

  • acquire(搶佔令牌數):這裡是搶佔一個令牌,一次可以搶佔多個,沒有傳遞引數則只搶佔一個
  • realise:釋放令牌

如何使用semaphore(這裡模擬一個停車場,一共有10個車位,也就是一次只能執行10個執行緒,當10箇中的一個釋放,其他才能進行資源的獲取)

public class SemaphoreExample {
    public static void main(String[] args) {
        // 限制資源訪問的併發數量
        Semaphore semaphore=new Semaphore(10);
        for (int i = 0; i <20 ; i++) {
            new Thread(new car(i,semaphore)).start();
        }
    }
    static  class  car extends Thread{
        private int num;
        private Semaphore semaphore;

        public car(int num, Semaphore semaphore) {
            this.num = num;
            this.semaphore = semaphore;
        }
        @Override
        public void run() {
            try {
                //獲得一個令牌
                semaphore.acquire();
                System.out.println(""+num+"車搶到一個車位");
                TimeUnit.SECONDS.sleep(2);
                System.out.println(""+num+"釋放車位!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                // 釋放一個令牌
                semaphore.release();
            }
        }
    }
}

結果:

Semaphore】實現原理:

acquire(總數-1):

  • 當總數為0的時候則阻塞,就類似咱們上面的場景(車庫滿了)
  • 可能同阻塞n個執行緒(別的車就無法進入車庫)

realist(總數+1):

  • 有令牌就阻塞的執行緒中喚醒(那肯定有一個佇列來儲存這些個被阻塞的佇列)

原始碼解析(我們發現它這裡還用的是aqs,,所以這裡就不贅述了,他底層用的是共享鎖):

問題:為什麼他要使用共享鎖呢?這裡好像用同步鎖更好點,其實想一下,用共享鎖就意味著他可以一次喚醒多個執行緒,那就意味著多個執行緒可以同時執行,那就提升了效能。

CyclicBarrier:他是一個可以重複的柵欄,簡而言之就是他有類似與一個閥門,當到達一個極值或者頂點的時候,允許多個執行緒同時執行,這就有點像【countdownlaunch】多個執行緒等於呼叫了【await】方法,然後一個執行緒使用【countdown】方法去喚醒,就等於說是一個投票,當所有人投完票,才能公佈結果。一個例子來看一下如何使用。

使用方法:

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int n=3;
        CyclicBarrier cyclicBarrier=new CyclicBarrier(n,()->{
            System.out.println("所有執行緒執行完成");
        });
        for (int i = 0; i <n ; i++) {
            new ballot(cyclicBarrier).start();
        }
    }
    static  class  ballot extends Thread{
        CyclicBarrier cyclicBarrier;
        public ballot(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName()+"投票完成,等待其他人進行投票。。");
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

實現原理(基於【ReentrantLock】和【Condition】實現):

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        //參與的執行緒數
        this.parties = parties;
        //用於記錄當前已經執行的數
        this.count = parties;
        //執行完成後的毀掉方法
        this.barrierCommand = barrierAction;
    }
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    //首先加鎖,保證執行緒安全性
    lock.lock();
    try {
        //這裡就是柵欄,比如當你的計數器成為0,它有可以回到你的初始數值
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;
        //如果計數器為零
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                //這裡執行咱們傳遞過去的action
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        for (;;) {
            try {
                if (!timed)
                    //trip是一個condition
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
              
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
          //這裡是signal breakBarrier();
throw new TimeoutException(); } } } finally { lock.unlock(); } }