1. 程式人生 > >Semaphore訊號使用

Semaphore訊號使用

Semaphore(計數訊號量):准許n個任務同時訪問這個資源,正常的鎖(locks或synchronized)在任何時候都是隻允許一個任務訪問一項資源。

訊號的使用場景有:物件池,任務限流等。

下面有兩個簡單模擬:

1.任務限流:

public class TestSemaphore {
    public static void main(String[] args) {
        // 執行緒池
        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semp = new Semaphore(5);

        for (int i = 0; i < 20; i++) {
            final int NO = i;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        //獲取許可
                        semp.acquire();
                        System.out.println("獲取任務許可 :" +NO);
                        Thread.sleep((long)(Math.random()*10000));
                        //訪問完後,釋放
                        semp.release();
                        System.out.println("當前可用的訊號量:" +  semp.availablePermits());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            exec.execute(runnable);
        }
        //退出執行緒池
        exec.shutdown();
    }
}

2.物件池

物件池建立

public class Pool<T> {
    private int size;
    private List<T> items = new ArrayList<>();
    private volatile boolean[] checkedOut;
    private Semaphore available;

    public Pool(Class<T> classObject, int size) {
        this.size = size;
        checkedOut = new boolean[size];
        available = new Semaphore(size, true);
        for (int i = 0; i < size; i++) {
            try {
                items.add(classObject.newInstance());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public T checkOut() throws InterruptedException {
        available.acquire();
        return getItem();
    }

    public void checkIn(T x) {
        if (releaseItem(x)) {
            available.release();
        }
    }

    private synchronized T getItem() {
        for (int i = 0; i < size; i++) {
            if (!checkedOut[i]) {
                checkedOut[i] = true;
                return items.get(i);
            }
        }
        return null;
    }

    private synchronized boolean releaseItem(T item) {
        int index = items.indexOf(item);
        if (index == -1) return false;
        if (checkedOut[index]) {
            checkedOut[index] = false;
            return true;
        }
        return false;
    }
}

物件建立

public class Fat {
    //保證執行緒可見性
    private volatile  double d;
    private static int counter = 0;
    private final int id = counter++;
    public Fat() {
        for (int i = 0; i < 10000; i++) {
            d += (Math.PI + Math.E) / (double)i;
        }
    }

    public void operation() {
        System.out.println(this);
    }

    public String toString() {
        return "Fat id:" + id;
    }
}

建立一個任務定時遷入遷出物件

public class CheckOutTask<T> implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private Pool<T> pool;

    public CheckOutTask(Pool<T> pool) {
        this.pool = pool;
    }

    @Override
    public void run() {
        try {
            T item = pool.checkOut();
            System.out.println(this + "checked out " + item);
            TimeUnit.SECONDS.sleep(1);
            System.out.println(this + "checked in " + item);
            pool.checkIn(item);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return "CheckOutTask{" +
                "id=" + id +
                '}';
    }
}

測試遷入遷出測試

public class SemaphoreDemo {
    final static int SIZE = 25;

    public static void main(String[] args) throws Exception{
        //1.建立物件池
        final Pool<Fat> pool = new Pool<>(Fat.class, SIZE);

        //2.先物件池所有物件遷出,過一分中後遷入
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < SIZE; i++) {
            exec.execute(new CheckOutTask<Fat>(pool));
        }

        //3.物件池中物件全部遷出,已遷出的不再遷出
        System.out.println("All Checkout Tasks Created");
        List<Fat> list = new ArrayList<>();
        for (int i = 0; i < SIZE; i++) {
            Fat f = pool.checkOut();
            System.out.println(i + ": main() thread checked out ");
            f.operation();
            list.add(f);
        }

        //4.Future阻塞遷出
        Future<?> blocked = exec.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    pool.checkOut();
                } catch (InterruptedException e) {
                    System.out.println("checkOut Interrupted");
                    e.printStackTrace();
                }
            }
        });

        //5.中斷阻塞
        TimeUnit.SECONDS.sleep(2);
        blocked.cancel(true);

        System.out.println("Checking in objects in" + list);
        for (Fat f: list) {
            pool.checkIn(f);
        }

        //6.第二次不會遷入
        for (Fat f: list) {
            pool.checkIn(f);
        }
        exec.shutdown();
    }
}