ExecutorService-10個要訣和技巧
原文連結 作者:Tomasz Nurkiewicz 譯者:simonwang
ExecutorService抽象概念自Java5就已經提出來了,現在是2014年。順便提醒一下:Java5和Java6都已不被支援,Java7在半年內也將會這樣。我提出這個的原因是許多Java程式設計師仍然不能完全明白ExecutorService到底是怎樣工作的。還有很多地方要去學習,今天我會分享一些很少人知道的特性和實踐。然而這篇文章仍然是面向中等程式設計師的,沒什麼特別高階的地方。
1. Name pool threads
我想強調一點的是,當在執行JVM或除錯期間建立執行緒時,預設的執行緒池命名規則是pool-N-thread-M,這裡N代表執行緒池的序列數(每一次你建立一個執行緒池的時候,全域性計數N就加1),而M則是某一個執行緒池的執行緒序列數。例如,pool-2-thread-3就意味著JVM生命週期中第2執行緒池的第3執行緒。具體可以檢視:ThreadFactory內部。幸運地是Guava有一個很有用的類:
import com.google.common.util.concurrent.ThreadFactoryBuilder; final ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("Orders-%d") .setDaemon(true) .build(); final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);
執行緒池預設創造的是非守護執行緒,由你來決定是否合適。
2. Switch names according to context
有一個我從 Supercharged jstack: How to Debug Your Servers at 100mph學到的小技巧。一旦我們記住了執行緒的名字,那麼在任何時刻我們都能夠改變它們!這是有道理的,因為執行緒轉儲顯示了類名和方法名,沒有引數和區域性變數。通過調整執行緒名保留一些必要的事務識別符號,我們可以很容易追蹤某一條執行緩慢或者造成死鎖的資訊/記錄/查詢等。例如:
private void process(String messageId) { executorService.submit(() -> { final Thread currentThread = Thread.currentThread(); final String oldName = currentThread.getName(); currentThread.setName("Processing-" + messageId); try { //real logic here... } finally { currentThread.setName(oldName); } }); }
在try-finally塊內部,當前執行緒被命名為Processing-WHATEVER-MESSAGE-ID-IS,當通過系統追蹤資訊流時這可能會派上用場。
3. Explicit and safe shutdown
在客戶端執行緒和執行緒池之間有一個任務佇列,當你的應用關閉時,你必須關心兩件事:任務佇列會發生什麼;正在執行的任務會怎樣(這個時候將詳細介紹)。令人感到吃驚的是許多程式設計師並不會適當地或有意識地關閉執行緒池。這有兩個方法:要麼讓所有的任務佇列全都執行完(shutdown()
),要麼捨棄它們(shutdownNow()
),這依賴你使用的具體情況。例如如果我們提交一連串的任務並且想要它們在完成後儘可能快的返回,可以使用shutdown():
private void sendAllEmails(List<String> emails) throws InterruptedException { emails.forEach(email -> executorService.submit(() -> sendEmail(email))); executorService.shutdown(); final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES); log.debug("All e-mails were sent so far? {}", done); }
在這個例子中我們傳送了一堆e-mail,每一個都作為一個獨立的任務交給執行緒池。在提交了所有的任務之後我們執行shutdown使執行緒池不再接收新的任務。然後最多等待1minute直到所有的任務都完成。然而如果有些任務仍然處於掛起狀態,awaitTermination()將返回false,而那些在等待的任務會繼續執行。我知道一些人會使用新潮的用法:
emails.parallelStream().forEach(this::sendEmail);
你可能會覺得我太保守,但我喜歡去控制並行執行緒的數量。不用介意,還有一種優雅的shutdown()方法shutdownNow():
final List<Runnable> rejected = executorService.shutdownNow(); log.debug("Rejected tasks: {}", rejected.size());
這樣一來佇列中還在等待的任務將會被捨棄並被返回,但已經在執行的任務將會繼續。
4. Handle interruption with care
很少人知道Future介面的cancel,這裡我不想重複說明,你可以去看我以前的文章:
5. Monitor queue length and keep it bounded
不合適的執行緒池大小可能會造成執行緩慢、不穩定以及記憶體洩漏。如果你配置太少的執行緒,那麼任務佇列就會變大,消耗太多記憶體。另一方面太多的執行緒又會由於過度頻繁的上下文切換而造成整個系統執行緩慢。所以觀察佇列的長度並將其限定在一定範圍內是很重要的,這樣的話過載的執行緒池會短暫拒絕新任務的提交:
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100); executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue);
上面的程式碼和Executors.newFixedThreadPool(n)是等價的,然而不同的是預設情況下固定執行緒池使用的是無限制的LinkedBlockingQueue ,我們使用的是固定容量100的ArrayBlockingQueue。這就意味著如果已經有100個任務在排隊(其中有n個任務正在執行),那麼新的任務就將被駁回並丟擲RejectedExecutionException。一旦在外部可以訪問queue ,那麼我們就可以週期性地呼叫size(),並把它提交到logs/JMX或其他任何你使用的監視器中。
6. Remember about exception handling
下面程式碼段的結果是什麼?
executorService.submit(() -> { System.out.println(1 / 0); });
我深受其苦:它不會列印任何東西。不會丟擲java.lang.ArithmeticException: / by zero,什麼也沒有。執行緒池將忽略這個異常,就像它從來沒發生過。如果上面的程式碼是用java.lang.Thread偶然創造的,那麼UncaughtExceptionHandler可能會起作用。但線上程池裡你就要多加小心了。如果你正在提交Runnable (沒有返回結果,就像上面),那麼你必須將整個程式碼塊用try-catch包起來,至少要log一下。如果你提交的是Callable,確保你總是使用阻塞的get()方法來重拋異常:
final Future<Integer> division = executorService.submit(() -> 1 / 0); //below will throw ExecutionException caused by ArithmeticException division.get();
有趣的是就算是Spring框架在處理這個bug的時候會使用@Async,詳細: SPR-8995和SPR-12090。
7. Monitor waiting time in a queue
監控工作佇列深度又是一個層面,在排除單個事務或任務的故障時,有必要了解從任務的提交到實際執行耗時多長。這種等待時間最好趨近於零(當執行緒池中有空閒的執行緒時),但任務又不得不在佇列中排隊導致等待時間變長。而且如果池內沒有一定數量的執行緒,在執行新任務時可能需要創造新的執行緒,而這個過程也是要消耗少量時間的。為了能夠清楚地監測這個時間,我們使用類似下面的程式碼包裝原始的ExecutorService :
public class WaitTimeMonitoringExecutorService implements ExecutorService { private final ExecutorService target; public WaitTimeMonitoringExecutorService(ExecutorService target) { this.target = target; } @Override public <T> Future<T> submit(Callable<T> task) { final long startTime = System.currentTimeMillis(); return target.submit(() -> { final long queueDuration = System.currentTimeMillis() - startTime; log.debug("Task {} spent {}ms in queue", task, queueDuration); return task.call(); } ); } @Override public <T> Future<T> submit(Runnable task, T result) { return submit(() -> { task.run(); return result; }); } @Override public Future<?> submit(Runnable task) { return submit(new Callable<Void>() { @Override public Void call() throws Exception { task.run(); return null; } }); } //... }
這並不是完整的實現,但你得知道這個基本概念。當我們向執行緒池提交任務的那一刻,就立馬開始測量時間,而任務一開始被執行就停止測量。不要被上面原始碼中很接近的startTime 和queueDuration 所迷惑了,事實上這兩行是在不同的執行緒中執行的,可能有數毫秒甚至數秒的差別,例如:
Task [email protected] spent 9883ms in queue
8. Preserve client stack trace
java.lang.NullPointerException: null at com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na] at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0] at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]
我們很容易就發現MyTask在76行丟擲了空指標異常,但我們並不知道是誰提交了這個任務,因為堆疊跟蹤僅僅只是告訴你Thread和 ThreadPoolExecutor的資訊。我們能通過原始碼從技術上定位MyTask被創造的位置,不需要執行緒(更不必說事件驅動、響應式程式設計)我們就能夠馬上看到全面資訊。如果我們保留客戶端程式碼(提交任務的程式碼)的堆疊跟蹤並在出現故障的時候將其打印出來會怎麼樣?這不是什麼新想法,例如Hazelcast會將當前點發生的異常傳送回客戶端程式碼,下面就看看保持客戶端堆疊跟蹤是怎樣實現的:
public class ExecutorServiceWithClientTrace implements ExecutorService { protected final ExecutorService target; public ExecutorServiceWithClientTrace(ExecutorService target) { this.target = target; } @Override public <T> Future<T> submit(Callable<T> task) { return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName())); } private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack, String clientThreadName) { return () -> { try { return task.call(); } catch (Exception e) { log.error("Exception {} in task submitted from thrad {} here:", e, clientThreadName, clientStack); throw e; } }; } private Exception clientTrace() { return new Exception("Client stack trace"); } @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return tasks.stream().map(this::submit).collect(toList()); } //... }
這次一旦出現異常我們將檢索任務被提交地方的所有堆疊跟蹤和執行緒名,和標準異常相比下面的異常資訊更有價值:
Exception java.lang.NullPointerException in task submitted from thrad main here: java.lang.Exception: Client stack trace at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na] at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na] at com.nurkiewicz.Main.main(Main.java:31) ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0] at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0] at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]
9. Prefer CompletableFuture
Java 8提出了強大的CompletableFuture,請儘可能的使用它。ExecutorService並沒有擴充套件支援這個強大的抽象,所以你要小心使用它。用:
final CompletableFuture<BigDecimal> future = CompletableFuture.supplyAsync(this::calculate, executorService);
代替:
final Future<BigDecimal> future = executorService.submit(this::calculate);
CompletableFuture繼承了Future及其所有功能,而且CompletableFuture所提供的擴充套件功能極大地豐富了我們的API。
10. Synchronous queue
SynchronousQueue是一種有趣的BlockingQueue但真正意義上並不是queue,事實上它連資料結構都算不上。要解釋的話它算是0容量的佇列,引用JavaDoc:
each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. […]
Synchronous queues are similar to rendezvous channels used in CSP and Ada.
這和執行緒池有什麼關係呢?試著在ThreadPoolExecutor中使用SynchronousQueue:
BlockingQueue<Runnable> queue = new SynchronousQueue<>(); ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, queue);
我們創造了有兩個執行緒的執行緒池和一個SynchronousQueue,因為SynchronousQueue本質上是零容量的佇列,因此如果有空閒執行緒,ExecutorService只會執行新的任務。如果所有的執行緒都被佔用,新任務會被立刻拒絕不會等待。當程序背景要求立刻啟動或者被丟棄時,這種機制是可取的。
以上,希望你們能夠找到至少一個有用的!