使用Semaphore控制執行緒池任務提交的速率
阿新 • • 發佈:2018-12-01
使用Semaphore控制執行緒池任務提交的速率
歡迎關注作者部落格
簡書傳送門
介紹
當執行緒池的工作佇列被填滿後,如果沒有預定義的飽和策略來阻塞任務的執行,則可以通過訊號量Semaphore來限制任務的到達率。Semaphore是一個同步工具類,用來控制同時訪問某個特定資源的運算元量。它的acquire方法返回一個虛擬的許可,如果沒有可用的許可,則阻塞該方法的呼叫執行緒直到有可用許可為止。如果執行緒池使用無界佇列緩衝任務時,如果任務在某一時間增長數量過快,容易導致記憶體耗盡。
無界佇列和Semaphore搭配使用,通過設定訊號量的上界,來控制任務的提交速率。
四種飽和策略
- static class ThreadPoolExecutor.AbortPolicy
用於被拒絕任務的處理程式,它將丟擲 RejectedExecutionException. - static class ThreadPoolExecutor.CallerRunsPolicy
用於被拒絕任務的處理程式,它直接在 execute 方法的呼叫執行緒中執行被拒絕的任務;如果執行程式已關閉,則會丟棄該任務。 - static class ThreadPoolExecutor.DiscardOldestPolicy
用於被拒絕任務的處理程式,它放棄最舊的未處理請求,然後重試 execute;如果執行程式已關閉,則會丟棄該任務。 - static class ThreadPoolExecutor.DiscardPolicy
用於被拒絕任務的處理程式,預設情況下它將丟棄被拒絕的任務。
原始碼
下面使用Semaphore來控制執行緒池任務提交的速率:
/**
* @program:
* @description: 使用Semaphore控制執行緒池任務提交速率
* @author: zhouzhixiang
* @create: 2018-11-13 20:48
*/
@ThreadSafe
public class BoundedExecutor {
private final ExecutorService executor;
private final Semaphore semaphore;
public BoundedExecutor(ExecutorService executor, int bound) {
this.executor = executor;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command) {
try {
semaphore.acquire();
executor.execute(new Runnable() {
@Override
public void run() {
try {
command.run();
}finally {
semaphore.release();
}
}
});
} catch (InterruptedException e) {
semaphore.release();
}
}
public void stop(){
this.executor.shutdown();
}
static class MyThread extends Thread {
public String name;
public MyThread(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("Thread-"+name+" is running....");
try {
Thread.sleep(new Random().nextInt(10000));
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(2,2,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(5));
BoundedExecutor executor = new BoundedExecutor(executorService, 5);
for (int i = 0; i < 100; i++) {
executor.submitTask(new MyThread(""+i));
}
executor.stop();
}
}
歡迎加入Java猿社群