1. 程式人生 > 實用技巧 >java併發工具類

java併發工具類

一、CountDownLatch

字面意思:倒計時鎖閂,該類可以實現一個執行緒在等其他多個執行緒執行完之後,繼續執行。

入參是一個計數器的值,當一個執行緒執行完畢時呼叫countDown()方法,計數器值會減1,當計數器值為0時,被await()阻塞的執行緒將被喚醒。

CountDownLatch latch = new CountDownLatch(10);

大家都玩過王者榮耀的5V5排位吧,當己方5個人準備就緒,對方5人也準備就緒時,才可以進入B/P環節,也就是王者榮耀這個執行緒需要等待10位玩家的執行緒都準備完畢,然後才出發進入遊戲的操作。

package com.duchong.concurrent;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 模擬5V5排位,10個玩家都準備就緒,才開始進入遊戲
 * CountDownLatch :阻塞主執行緒,等子執行緒完成
 * @author DUCHONG
 * @since 2020-09-03 17:43:13
 */
public class CountDownLatchDemo {


    public static final int playerNum=10;

    public static void main(String[] args) {

        final CountDownLatch latch = new CountDownLatch(10);

        GameThread subThread = new GameThread(latch);
        try {

            //模擬5V5排位
            for (int i = 1; i <= playerNum; i++) {
                new Thread(subThread,"player-"+i).start();
                TimeUnit.SECONDS.sleep(1L);
            }
            //阻塞主執行緒
            latch.await();
        }
        catch (InterruptedException e) {
        }

        System.out.println("王者榮耀:玩家全部準備就緒,開始進入遊戲");
    }


    /**
     * 遊戲子執行緒
     */
    static class GameThread implements Runnable {

        private CountDownLatch latch;

        public GameThread(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {

            try {
                System.out.println(Thread.currentThread().getName()+"---準備就緒");
            }
            finally {
                latch.countDown();
            }
        }
    }
}

結果

二、CyclicBarrier

字面意思: 迴圈屏障,多個執行緒到達一個屏障點時被阻塞,直到最後一個執行緒到達屏障點時,屏障才會解除,所有被屏障攔截的執行緒繼續執行。

第一個入參代表屏障接觸時阻塞執行緒的數量。第二個入參代表屏障解除時要進行的操作

CyclicBarrier c = new CyclicBarrier(10, ()->System.out.println("屏障解除"));

await()方法其實是呼叫Conditionawait()

public class CyclicBarrier {
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();

    //....省略
    public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }

    //wait方法
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
    TimeoutException {
        //重入鎖
        final ReentrantLock lock = this.lock;
        //加鎖
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // 迴圈
            for (;;) {
                try {
                    if (!timed)
                        // 呼叫condition的await()方法
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            //解鎖
            lock.unlock();
        }
    }
    //....省略
}

該類同樣可以實現與CountDownLatch相同的效果

package com.duchong.concurrent;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

/**
 * 模擬5V5排位,10個玩家都準備就緒,才開始進入遊戲
 * CyclicBarrier :阻塞子執行緒,當等待中的子執行緒數到達一定數量時,跳閘。
 * @author DUCHONG
 * @since 2020-09-03 17:41:35
 */
public class CyclicBarrierDemo {

    public static final int playerNum=10;
    /**
     * 屏障,初始10 當await()的執行緒數量達到10時,跳閘。
     */
    static CyclicBarrier c = new CyclicBarrier(playerNum, ()->System.out.println("王者榮耀:玩家全部準備就緒,開始進入遊戲"));


    public static void main(String[] args) {

        try {
            //
            for (int i = 1; i <= playerNum; i++) {
                new Thread(new GameThread(),"player-"+i).start();
                TimeUnit.SECONDS.sleep(1L);
            }
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 遊戲子執行緒
     */
    static class GameThread implements Runnable {

        @Override
        public void run() {

            try {
                System.out.println(Thread.currentThread().getName()+"---準備就緒");
                //阻塞子執行緒
                c.await();

                System.out.println(Thread.currentThread().getName()+"---進入遊戲");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }

        }
    }

}

結果

三、Semaphore

字面意思:訊號量,多個執行緒訪問一個共享資源時,如果想控制對該資源訪問的執行緒的數量,可以用這個工具類。

入參物件是一個許可的數量,如果數量大於1,則可以作為共享鎖來使用,如果數量等於1,則可以作為排他鎖來使用

acquire()方法表示得到一個許可,可以對共享資源進行操作, 如果許可數量分配完了,其他執行緒將阻塞, 直到已得到許可的執行緒釋放許可後,才有機會再獲取許可。

release()方法表示釋放一個許可。

Semaphore semaphore=new Semaphore(5);

接著王者榮耀排位5V5的例子來講,當玩家進入B/P環節,5個人,但是地圖只有上中下三路,也就是說最多2個人去打野位置

package com.duchong.concurrent;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * 模擬進入5V5排位遊戲後的B/P環節,五個人,上中下三路,最多2個人去打野位置
 * Semaphore,對資源的併發數控制
 * @author DUCHONG
 * @since 2020-09-03 15:46
 **/
public class SemaphoreDemo {

    //打野人數
    public static final int wildNum=2;
    //總共人數
    public static final int totalNum=5;

    static Semaphore semaphore=new Semaphore(wildNum);

    public static void main(String[] args) {

        try {
            for (int i = 1; i <=totalNum; i++) {

                new Thread(new GameThread(semaphore),"player-"+i).start();
                TimeUnit.SECONDS.sleep(1L);
            }
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    /**
     * 遊戲子執行緒
     */
    static class GameThread implements Runnable {

        private Semaphore semaphore;

        public GameThread(Semaphore semaphore){
            this.semaphore=semaphore;
        }
        @Override
        public void run() {

            try {
                //搶到打野位置,最多兩個人,其他人想選打野位時阻塞,除非搶到的人選擇其他路
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+"---搶到打野位");

                TimeUnit.SECONDS.sleep(3L);
                System.out.println(Thread.currentThread().getName()+"---猶豫了一下,選擇了其他路");
            }
            catch (Exception e){

            }
            finally {
                //新增一個許可
                semaphore.release();
            }

        }
    }
}

結果

同一時刻只有兩個player獲取到打野位置(共享資源)符合預期。