1. 程式人生 > 程式設計 >JVM優先順序執行緒池做任務佇列的實現方法

JVM優先順序執行緒池做任務佇列的實現方法

前言

我們都知道 web 服務的工作大多是接受 http 請求,並返回處理後的結果。伺服器接受的每一個請求又可以看是一個任務。一般而言這些請求任務會根據請求的先後有序處理,如果請求任務的處理比較耗時,往往就需要排隊了。而同時不同的任務直接可能會存在一些優先順序的變化,這時候就需要引入任務佇列並進行管理了。可以做任務佇列的東西有很多,Java 自帶的執行緒池,以及其他的訊息中介軟體都可以。

同步與非同步

這個問題在之前已經提過很多次了,有些任務是需要請求後立即返回結果的,而有的則不需要。設想一下你下單購物的場景,付完錢後,系統只需要返回一個支付成功即可,後續的積分增加、優惠券發放、安排發貨等等業務都不需要實時返回給使用者的,這些就是非同步的任務。大量的非同步任務到達我們部署的服務上,由於處理效率的瓶頸,無法達到實時處理,因此與需要用佇列將他們暫時儲存起來,排隊處理。

執行緒池

在 Java 中提到佇列,我們除了想到基本的資料結構之外,應該還有執行緒池。執行緒池自帶一套機制可以實現任務的排隊和執行,可以滿足單點環境下絕大多數非同步化的場景。下面是典型的一個處理流程:

// 注入合適型別的執行緒池
@Autowired
private final ThreadPoolExecutor asyncPool;
@RequestMapping(value = "/async/someOperate",method = RequestMethod.POST)
public RestResult someOperate(HttpServletRequest request,String params,String callbackUrl {
  // 接受請求後 submit 到執行緒池排隊處理
  asyncPool.submit(new Task(params,callbackUrl);
  return new RestResult(ResultCode.SUCCESS.getCode(),null) {{
    setMsg("successful!" + prop.getShowMsg());
  }};
}

// 非同步任務處理
@Slf4j
public class Task extends Callable<RestResult> {
  private String params;
  private String callbackUrl;
  private final IAlgorithmService algorithmService = SpringUtil.getBean(IAlgorithmServiceImpl.class);
  private final ServiceUtils serviceUtils = SpringUtil.getBean(ServiceUtils.class);
  public ImageTask(String params,String callbackUrl) {
    this.params = params;
    this.callbackUrl = callbackUrl;
  }

  @Override
  public RestResult call() {
    try {
      // 業務處理
      CarDamageResult result = algorithmService.someOperate(this.params);
      // 回撥
      return serviceUtils.callback(this.callbackUrl,this.caseNum,ResultCode.SUCCESS.getCode(),result,this.isAsync);
    } catch (ServiceException e) {
      return serviceUtils.callback(this.callbackUrl,e.getCode(),null,this.isAsync);
    }
  }
}

對於執行緒池這裡就不具體展開講了,僅僅簡單理了下具體的流程:

  1. 收到請求後,引數校驗後傳入執行緒池排隊。
  2. 返回結果:“請求成功,正在處理”。
  3. 任務排到後由相應的執行緒處理,處理完後進行介面回撥。

上面的例子描述了一個生產速度遠遠大於消費速度的模型,普通面向資料庫開發的企業級應用,由於資料庫的連線池開發的連線數較大,一般不需要這樣通過執行緒池來處理,而一些 GPU 密集型的應用場景,由於視訊記憶體的瓶頸導致消費速度慢時,就需要佇列來作出調整了。

帶優先順序的執行緒池

更復雜的,例如考慮到任務的優先順序,還需要對執行緒池進行重寫,通過 PriorityBlockingQueue 來替換預設的阻塞佇列。直接上程式碼。

import lombok.Data;

import java.util.concurrent.Callable;

/**
 * @author Fururur
 * @create 2020-01-14-10:37
 */
@Data
public abstract class PriorityCallable<T> implements Callable<T> {
  private int priority;
}

import lombok.Getter;
import lombok.Setter;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 優先順序執行緒池的實現
 *
 * @author Fururur
 * @create 2019-07-23-10:19
 */
public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {

  private ThreadLocal<Integer> local = ThreadLocal.withInitial(() -> 0);

  public PriorityThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit) {
    super(corePoolSize,maximumPoolSize,keepAliveTime,unit,getWorkQueue());
  }

  public PriorityThreadPoolExecutor(int corePoolSize,TimeUnit unit,ThreadFactory threadFactory) {
    super(corePoolSize,getWorkQueue(),threadFactory);
  }

  public PriorityThreadPoolExecutor(int corePoolSize,RejectedExecutionHandler handler) {
    super(corePoolSize,handler);
  }

  public PriorityThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,threadFactory,handler);
  }

  private static PriorityBlockingQueue getWorkQueue() {
    return new PriorityBlockingQueue();
  }

  @Override
  public void execute(Runnable command) {
    int priority = local.get();
    try {
      this.execute(command,priority);
    } finally {
      local.set(0);
    }
  }

  public void execute(Runnable command,int priority) {
    super.execute(new PriorityRunnable(command,priority));
  }

  public <T> Future<T> submit(PriorityCallable<T> task) {
    local.set(task.getPriority());
    return super.submit(task);
  }

  public <T> Future<T> submit(Runnable task,T result,int priority) {
    local.set(priority);
    return super.submit(task,result);
  }

  public Future<?> submit(Runnable task,int priority) {
    local.set(priority);
    return super.submit(task);
  }

  @Getter
  @Setter
  protected static class PriorityRunnable implements Runnable,Comparable<PriorityRunnable> {
    private final static AtomicLong seq = new AtomicLong();
    private final long seqNum;
    private Runnable run;

    private int priority;

    PriorityRunnable(Runnable run,int priority) {
      seqNum = seq.getAndIncrement();
      this.run = run;
      this.priority = priority;
    }

    @Override
    public void run() {
      this.run.run();
    }

    @Override
    public int compareTo(PriorityRunnable other) {
      int res = 0;
      if (this.priority == other.priority) {
        if (other.run != this.run) {
          // ASC
          res = (seqNum < other.seqNum ? -1 : 1);
        }
      } else {
        // DESC
        res = this.priority > other.priority ? -1 : 1;
      }
      return res;
    }
  }
}

要點如下:

  1. 替換執行緒池預設的阻塞佇列為 PriorityBlockingQueue,響應的傳入的執行緒類需要實現 Comparable<T> 才能進行比較。
  2. PriorityBlockingQueue 的資料結構決定了,優先順序相同的任務無法保證 FIFO,需要自己控制順序。
  3. 需要重寫執行緒池的 execute() 方法。看過執行緒池原始碼的會發現,執行 submit(task) 方法後,都會轉化成 RunnableFuture<T> 再進一步執行,由於傳入的 task 雖然實現了 Comparable<T> 到,但是內部轉換成的 RunnableFuture<T> 並未實現,因此直接 submit 會丟擲 Caused by: java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable 這樣一個異常,所以需要重寫 execute() 方法,構造一個 PriorityRunnable 作為中轉。

總結

JVM 執行緒池是實現非同步任務佇列最簡單最原生的一種方式,本文介紹了基本的使用流程和帶有優先佇列需求的用法。這種方法可有滿足到一些簡單的業務場景,但也存在一定的侷限性:

  1. JVM 執行緒池是單機的,橫向擴充套件多個服務下做負載均衡時,就會存在多個執行緒池了他們是分開工作的,無法很好的統一和管理,不太適合分散式場景。
  2. JVM 執行緒池是基於記憶體的,一旦服務掛了,會出現任務丟失的情況,可靠性低。
  3. 缺少作為任務佇列的 ack 機制,一旦任務失敗不會重新執行,且無法很好地對執行緒池佇列進行監控。

顯然簡單的 JVM 執行緒池是無法 handle 到負載的業務場景的,這就需要引入其他中介軟體了,在接下來的文章中我們會繼續探討。

參考文獻

ThreadPoolExecutor 優先順序的執行緒池

implementing PriorityQueue on ThreadPoolExecutor

ThreadPoolExecutor 的 PriorityBlockingQueue 型別轉化問題

大搜車非同步任務佇列中介軟體的建設實踐

到此這篇關於JVM優先順序執行緒池做任務佇列的實現方法的文章就介紹到這了,更多相關java執行緒池優先順序內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!