execute taskaction$gradle怎麼解決_如何解決 Java 執行緒池佇列過飽問題?
技術標籤:execute taskaction$gradle怎麼解決
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());}
Java的Executors框架提供的定長執行緒池內部預設使用LinkedBlockingQueue作為任務的容器,這個佇列是沒有限定大小的,可以無限向裡面submit任務。
當執行緒池處理的太慢的時候,佇列裡的內容會積累,積累到一定程度就會記憶體溢位。即使沒有記憶體溢位,佇列的延遲勢必會變大,而且如果程序突然遇到退出訊號,佇列裡的訊息還沒有被處理就被丟棄了,那必然會對系統的訊息可靠性造成重大影響。
那如何解決執行緒池的過飽問題呢?從佇列入手,無外乎兩種方法
- 增加消費者,增加消費者處理效率
- 限制生產者生產速度
增加消費者就是增加執行緒池大小,增加消費者處理效率就是優化邏輯處理。但是如果遇到了IO瓶頸,消費者處理的效率完全取決於IO效率,在消費能力上已經優化到了極限還是處理不過來怎麼辦?或者系統突然遇到使用者高峰,我們所配置的執行緒池大小不夠用怎麼辦?
這時候我們就只能從生產者入手,限制生產者的生產速度。那如何限制呢?
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node(null); }
LinkedBlockingQueue提供了capacity引數可以限制佇列的大小,當佇列元素達到上線的時候,生產者執行緒會阻塞住,直到佇列被消費者消費到有空槽的時候才會繼續下去。這裡似乎只要給佇列設定一個大小就ok了。
但是實際情況並不是我們所想的那樣。
public void execute(Runnable command) { int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { # here int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); # here }
翻開原始碼可以發現生產者向佇列裡塞任務用的方法是workQueue.offer(),這個方法在遇到佇列滿時是不會阻塞的,而是直接返回一個false,表示拋棄了這個任務。然後生產者呼叫reject方法,進入拒絕處理邏輯。
接下來我們看看這個reject方法到底幹了什麼
final void reject(Runnable command) { handler.rejectedExecution(command, this);}
我們看到JDK預設提供了4中拒絕策略的實現。
- AbortPolicy 預設策略,丟擲RejectedExecutionException異常
- CallerRunsPolicy 讓任務在生產者執行緒裡執行,這樣可以降低生產者的生產速度也不會將生產者的執行緒堵住
- DiscardPolicy 直接拋棄任務,不拋異常
- DiscardOldestPolicy 直接拋棄舊任務,不拋異常
一般比較常用的是CallerRunPolicy,比較優雅的解決了過飽問題。如果你覺得這種方式不那麼優雅的話,還可以使用下面的幾種方式。這幾種方式都是通過處理RejectExecution來實現生產者的阻塞的目的。
public class BlockWhenQueueFullHandler implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { pool.getQueue().put(new FutureTask(r)); }}
這種方案是使用put方法會阻塞生產者執行緒的原理達到了目的。
public class BlockWhenQueueFull implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { long waitMs = 10; Thread.sleep(waitMs); } catch (InterruptedException e) {} executor.execute(r); }}
這種方案顯而易見,用sleep達到了阻塞的目的。
public class BoundedExecutor { private final Executor exec; private final Semaphore semaphore; public BoundedExecutor(Executor exec, int bound) { this.exec = exec; this.semaphore = new Semaphore(bound); } public void submitTask(final Runnable command) throws InterruptedException, RejectedExecutionException { semaphore.acquire(); try { exec.execute(new Runnable() { public void run() { try { command.run(); } finally { semaphore.release(); } } }); } catch (RejectedExecutionException e) { semaphore.release(); throw e; } }}
這種方案是通過訊號量的大小都限制佇列的大小,也不需要特別限定executor佇列大小了
同樣的原理還可以使用wait/notifyAll機制來達到一樣的目的。
END