1. 程式人生 > 其它 >execute taskaction$gradle怎麼解決_如何解決 Java 執行緒池佇列過飽問題?

execute taskaction$gradle怎麼解決_如何解決 Java 執行緒池佇列過飽問題?

技術標籤:execute taskaction$gradle怎麼解決

3e3e182414180c0c9e701f87d26eaf4c.png
public static ExecutorService newFixedThreadPool(int nThreads) {    return new ThreadPoolExecutor(nThreads, nThreads,                   0L, TimeUnit.MILLISECONDS,                   new LinkedBlockingQueue());}

Java的Executors框架提供的定長執行緒池內部預設使用LinkedBlockingQueue作為任務的容器,這個佇列是沒有限定大小的,可以無限向裡面submit任務。

當執行緒池處理的太慢的時候,佇列裡的內容會積累,積累到一定程度就會記憶體溢位。即使沒有記憶體溢位,佇列的延遲勢必會變大,而且如果程序突然遇到退出訊號,佇列裡的訊息還沒有被處理就被丟棄了,那必然會對系統的訊息可靠性造成重大影響。

那如何解決執行緒池的過飽問題呢?從佇列入手,無外乎兩種方法

  1. 增加消費者,增加消費者處理效率
  2. 限制生產者生產速度

增加消費者就是增加執行緒池大小,增加消費者處理效率就是優化邏輯處理。但是如果遇到了IO瓶頸,消費者處理的效率完全取決於IO效率,在消費能力上已經優化到了極限還是處理不過來怎麼辦?或者系統突然遇到使用者高峰,我們所配置的執行緒池大小不夠用怎麼辦?

這時候我們就只能從生產者入手,限制生產者的生產速度。那如何限制呢?

public LinkedBlockingQueue(int capacity) {    if (capacity <= 0) throw new IllegalArgumentException();    this.capacity = capacity;    last = head = new Node(null); }

LinkedBlockingQueue提供了capacity引數可以限制佇列的大小,當佇列元素達到上線的時候,生產者執行緒會阻塞住,直到佇列被消費者消費到有空槽的時候才會繼續下去。這裡似乎只要給佇列設定一個大小就ok了。

但是實際情況並不是我們所想的那樣。

public void execute(Runnable command) {    int c = ctl.get();    if (workerCountOf(c) < corePoolSize) {      if (addWorker(command, true))        return;      c = ctl.get();    }    if (isRunning(c) && workQueue.offer(command)) { # here      int recheck = ctl.get();      if (! isRunning(recheck) && remove(command))        reject(command);      else if (workerCountOf(recheck) == 0)        addWorker(null, false);    }    else if (!addWorker(command, false))      reject(command); # here  }

翻開原始碼可以發現生產者向佇列裡塞任務用的方法是workQueue.offer(),這個方法在遇到佇列滿時是不會阻塞的,而是直接返回一個false,表示拋棄了這個任務。然後生產者呼叫reject方法,進入拒絕處理邏輯。

接下來我們看看這個reject方法到底幹了什麼

final void reject(Runnable command) {    handler.rejectedExecution(command, this);}
998e56ab9b6e7a56738bb8c21d7d3622.png

我們看到JDK預設提供了4中拒絕策略的實現。

  1. AbortPolicy 預設策略,丟擲RejectedExecutionException異常
  2. CallerRunsPolicy 讓任務在生產者執行緒裡執行,這樣可以降低生產者的生產速度也不會將生產者的執行緒堵住
  3. DiscardPolicy 直接拋棄任務,不拋異常
  4. DiscardOldestPolicy 直接拋棄舊任務,不拋異常

一般比較常用的是CallerRunPolicy,比較優雅的解決了過飽問題。如果你覺得這種方式不那麼優雅的話,還可以使用下面的幾種方式。這幾種方式都是通過處理RejectExecution來實現生產者的阻塞的目的。

public class BlockWhenQueueFullHandler implements RejectedExecutionHandler {  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {    pool.getQueue().put(new FutureTask(r));  }}

這種方案是使用put方法會阻塞生產者執行緒的原理達到了目的。

public class BlockWhenQueueFull implements RejectedExecutionHandler {  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {    try {      long waitMs = 10;      Thread.sleep(waitMs);    } catch (InterruptedException e) {}    executor.execute(r);  }}

這種方案顯而易見,用sleep達到了阻塞的目的。

public class BoundedExecutor {  private final Executor exec;  private final Semaphore semaphore;  public BoundedExecutor(Executor exec, int bound) {    this.exec = exec;    this.semaphore = new Semaphore(bound);  }  public void submitTask(final Runnable command)      throws InterruptedException, RejectedExecutionException {    semaphore.acquire();    try {      exec.execute(new Runnable() {        public void run() {          try {            command.run();          } finally {            semaphore.release();          }        }      });    } catch (RejectedExecutionException e) {      semaphore.release();      throw e;    }  }}

這種方案是通過訊號量的大小都限制佇列的大小,也不需要特別限定executor佇列大小了

同樣的原理還可以使用wait/notifyAll機制來達到一樣的目的。

END