1. 程式人生 > 實用技巧 >併發工具類簡介

併發工具類簡介

併發工具類

  1. CountDownLatch:閉鎖,也叫執行緒遞減鎖。對執行緒進行計數,在計數歸零之前執行緒會陷入阻塞;直到計數歸零為止,才會放開阻塞。

    用給定的計數初始化 CountDownLatch。由於呼叫了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。之後,會釋放所有等待的執行緒,await 的所有後續呼叫都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。

    • 內部採用共享鎖來實現
    import java.util.concurrent.CountDownLatch;
    /**
     * 模擬:考試
     * 需求:考官(執行緒A1,A2)和考生(執行緒B1、B2、B3、B4)
     *      都到考場後才能開始考試
     */
    public class CountDownLatchDemo {
        public static void main(String[] args) throws InterruptedException {
            CountDownLatch cdl = new CountDownLatch(6);
            new Thread(new Teacher(cdl)).start();
            new Thread(new Teacher(cdl)).start();
            new Thread(new Student(cdl) ).start();
            new Thread(new Student(cdl)).start();
            new Thread(new Student(cdl)).start();
            new Thread(new Student(cdl)).start();
    
            //計數減為1時自然喚醒
            cdl.await();
            System.out.println("開始考試");
        }
    }
    
    class Teacher implements Runnable{
        private CountDownLatch cdl;
        public Teacher(CountDownLatch cdl) {
            this.cdl = cdl;
        }
    
        @Override
        public void run() {
            try{
                //放慢速度
                Thread.sleep((long)(Math.random()*10000));
                cdl.countDown();
                System.out.println("考官到了");
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    
    class Student implements Runnable{
        private CountDownLatch cdl;
        public Student(CountDownLatch cdl) {
            this.cdl = cdl;
        }
    
        @Override
        public void run() {
            try{
                Thread.sleep((long)(Math.random()*10000));
                cdl.countDown();
                System.out.println("考生到了");
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    
  2. CyclicBarrier:柵欄。對執行緒進行計數,在計數歸零之前執行緒會陷入阻塞;直到計數歸零為止,才會放開阻塞。一組執行緒到達同一個點後再分別繼續執行。(與閉鎖相比,並沒有結束執行緒)

    它允許一組執行緒互相等待,直到到達某個公共屏障點 (common barrier point)。通俗講:讓一組執行緒到達一個屏障時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續幹活。一組執行緒結束後開啟另一組執行緒。

    • 底層採用ReentrantLock + Condition實現
    • 應用場景:多執行緒結果合併的操作,用於多執行緒計算資料,最後合併計算結果的應用場景
    import java.util.concurrent.CyclicBarrier;
    
    /**
     * 所有運動員跑到起跑線之後聽到命令才能跑出去
     */
    public class CyclicBarrierDemo {
        public static void main(String[] args) {
            CyclicBarrier cb = new CyclicBarrier(5);
    
            new Thread(new Runner(cb),"1號").start();
            new Thread(new Runner(cb),"2號").start();
            new Thread(new Runner(cb),"3號").start();
            new Thread(new Runner(cb),"4號").start();
            new Thread(new Runner(cb),"5號").start();
    
        }
    }
    
    class Runner implements Runnable{
        private CyclicBarrier cb;
        public Runner(CyclicBarrier cb) {
            this.cb = cb;
        }
    
        @Override
        public void run() {
            try{
                //模擬運動員走到起跑線的時間
                Thread.sleep((long)(Math.random()*10000));
                String name = Thread.currentThread().getName();
                System.out.println(name+"運動員到了起跑線");
                /**
                 * 先到的運動員應該阻塞,
                 * 直到所有的運動員都到了起跑線才能往外跑。
                 * 當計數歸零的時候,自然甦醒
                 */
                cb.await();
                System.out.println(name+"運動員跑了出去");
    
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    
  3. Exchanger:交換機。用於交換兩個執行緒之間的資訊。

    具體來說,Exchanger類允許在兩個執行緒之間定義同步點。當兩個執行緒都到達同步點時,他們交換資料結構,因此第一個執行緒的資料結構進入到第二個執行緒中,第二個執行緒的資料結構進入到第一個執行緒中

    import java.util.concurrent.Exchanger;
    
    public class ExchangerDemo {
        public static void main(String[] args) {
            Exchanger<String> stringExchanger = new Exchanger<>();
            new Thread(new Producer(stringExchanger)).start();
            new Thread(new Consumer(stringExchanger)).start();
        }
    }
    
    class Producer implements  Runnable{
        private Exchanger<String> ex;
        public Producer(Exchanger<String> ex) {
            this.ex = ex;
        }
    
        @Override
        public void run() {
            String info="商品";
            try {
                String msg = ex.exchange(info);
                System.out.println("生產者收到消費者的:"+msg);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    class Consumer implements Runnable{
        private Exchanger<String> ex;
        public Consumer(Exchanger<String> ex) {
            this.ex = ex;
        }
    
        @Override
        public void run() {
            String info="錢";
            try {
                // 消費者將錢給生產者,應該收到生產者換過來的商品
                String exchange = ex.exchange(info);
                System.out.println("消費者收到生產者的:"+exchange);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
  4. Semaphore:訊號量。執行緒在獲取訊號之後執行程式碼,而在訊號被全部佔用之後,後來的執行緒需要阻塞,直到前面的執行緒釋放訊號,阻塞的執行緒才能獲取訊號執行邏輯。實際開發中,常用於限流。

    【作用】限制某段程式碼塊的併發數。Semaphore有一個建構函式,可以傳入一個int型整數n,表示某段程式碼最多隻有n個執行緒可以訪問,如果超出了n,那麼請等待,等到某個執行緒執行完畢這段程式碼塊,下一個執行緒再進入。由此可以看出如果Semaphore建構函式中傳入的int型整數n=1,相當於變成了一個synchronized了。

    從概念上講,訊號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然後再獲取該許可。每個 release() 新增一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可物件,Semaphore 只對可用許可的號碼進行計數,並採取相應的行動。

    訊號量Semaphore是一個非負整數(>=1)。當一個執行緒想要訪問某個共享資源時,它必須要先獲取Semaphore,當Semaphore >0時,獲取該資源並使Semaphore – 1。如果Semaphore值 = 0,則表示全部的共享資源已經被其他執行緒全部佔用,執行緒必須要等待其他執行緒釋放資源。當執行緒釋放資源時,Semaphore則+1

    • 內部採用共享鎖實現
    • 應用場景:通常用於限制可以訪問某些資源(物理或邏輯的)的執行緒數目
    import java.util.concurrent.Semaphore;
    
    public class SemaphoreDemo {
        public static void main(String[] args) {
            Semaphore s = new Semaphore(5);
            for(int i=0;i<7;i++){
                new Thread(new Easter(s)).start();
            }
    
        }
    }
    
    //用餐的人
    class Easter implements Runnable{
        private Semaphore s;
        public Easter(Semaphore s) {
            this.s = s;
        }
    
        /**
         * 桌子的數量是有限的
         * 如果桌子被全部佔用,後來的客人就需要等待
         * 桌子相當於訊號,只要有訊號,就可以使用
         */
        @Override
        public void run() {
            try{
                s.acquire();
                System.out.println("來了一波客人,佔用了一張桌子~~~");
                //模擬吃飯的時間
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println("客人買單離開,空出一張桌子~~~");
                // 釋放1個訊號,被阻塞的執行緒就可以獲取訊號執行程式碼
                s.release();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    

CountDownLatch與CyclicBarrier區別

  1. CountDownLatch的作用是允許1或N個執行緒等待其他執行緒完成執行;而CyclicBarrier則是允許N個執行緒相互等待。
  2. CountDownLatch的計數器無法被重置;CyclicBarrier的計數器可以被重置後使用,因此它被稱為是迴圈的barrier。
  3. CyclicBarrier只能喚起一個任務,CountDownLatch可以喚起多個任務。