Semaphore訊號使用
阿新 • • 發佈:2018-12-04
Semaphore(計數訊號量):准許n個任務同時訪問這個資源,正常的鎖(locks或synchronized)在任何時候都是隻允許一個任務訪問一項資源。
訊號的使用場景有:物件池,任務限流等。
下面有兩個簡單模擬:
1.任務限流:
public class TestSemaphore { public static void main(String[] args) { // 執行緒池 ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semp = new Semaphore(5); for (int i = 0; i < 20; i++) { final int NO = i; Runnable runnable = new Runnable() { @Override public void run() { try { //獲取許可 semp.acquire(); System.out.println("獲取任務許可 :" +NO); Thread.sleep((long)(Math.random()*10000)); //訪問完後,釋放 semp.release(); System.out.println("當前可用的訊號量:" + semp.availablePermits()); } catch (Exception e) { e.printStackTrace(); } } }; exec.execute(runnable); } //退出執行緒池 exec.shutdown(); } }
2.物件池
物件池建立
public class Pool<T> { private int size; private List<T> items = new ArrayList<>(); private volatile boolean[] checkedOut; private Semaphore available; public Pool(Class<T> classObject, int size) { this.size = size; checkedOut = new boolean[size]; available = new Semaphore(size, true); for (int i = 0; i < size; i++) { try { items.add(classObject.newInstance()); } catch (Exception e) { e.printStackTrace(); } } } public T checkOut() throws InterruptedException { available.acquire(); return getItem(); } public void checkIn(T x) { if (releaseItem(x)) { available.release(); } } private synchronized T getItem() { for (int i = 0; i < size; i++) { if (!checkedOut[i]) { checkedOut[i] = true; return items.get(i); } } return null; } private synchronized boolean releaseItem(T item) { int index = items.indexOf(item); if (index == -1) return false; if (checkedOut[index]) { checkedOut[index] = false; return true; } return false; } }
物件建立
public class Fat { //保證執行緒可見性 private volatile double d; private static int counter = 0; private final int id = counter++; public Fat() { for (int i = 0; i < 10000; i++) { d += (Math.PI + Math.E) / (double)i; } } public void operation() { System.out.println(this); } public String toString() { return "Fat id:" + id; } }
建立一個任務定時遷入遷出物件
public class CheckOutTask<T> implements Runnable { private static int counter = 0; private final int id = counter++; private Pool<T> pool; public CheckOutTask(Pool<T> pool) { this.pool = pool; } @Override public void run() { try { T item = pool.checkOut(); System.out.println(this + "checked out " + item); TimeUnit.SECONDS.sleep(1); System.out.println(this + "checked in " + item); pool.checkIn(item); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString() { return "CheckOutTask{" + "id=" + id + '}'; } }
測試遷入遷出測試
public class SemaphoreDemo { final static int SIZE = 25; public static void main(String[] args) throws Exception{ //1.建立物件池 final Pool<Fat> pool = new Pool<>(Fat.class, SIZE); //2.先物件池所有物件遷出,過一分中後遷入 ExecutorService exec = Executors.newCachedThreadPool(); for (int i = 0; i < SIZE; i++) { exec.execute(new CheckOutTask<Fat>(pool)); } //3.物件池中物件全部遷出,已遷出的不再遷出 System.out.println("All Checkout Tasks Created"); List<Fat> list = new ArrayList<>(); for (int i = 0; i < SIZE; i++) { Fat f = pool.checkOut(); System.out.println(i + ": main() thread checked out "); f.operation(); list.add(f); } //4.Future阻塞遷出 Future<?> blocked = exec.submit(new Runnable() { @Override public void run() { try { pool.checkOut(); } catch (InterruptedException e) { System.out.println("checkOut Interrupted"); e.printStackTrace(); } } }); //5.中斷阻塞 TimeUnit.SECONDS.sleep(2); blocked.cancel(true); System.out.println("Checking in objects in" + list); for (Fat f: list) { pool.checkIn(f); } //6.第二次不會遷入 for (Fat f: list) { pool.checkIn(f); } exec.shutdown(); } }