1. 程式人生 > >IEAD中加入阿里的程式碼檢查外掛後Executors提示需手動建立

IEAD中加入阿里的程式碼檢查外掛後Executors提示需手動建立

轉自:https://blog.csdn.net/w605283073/article/details/80259493

最近了解一下執行緒池,下載其中的程式碼並執行。

https://howtodoinjava.com/core-java/multi-threading/when-to-use-countdownlatch-java-concurrency-example-tutorial/

其中ApplicationStartupUtil這個類

 


  
  1. package com.chujianyun;
  2. import com.chujianyun.verifier.BaseHealthChecker;
  3. import com.chujianyun.verifier.CacheHealthChecker;
  4. import com.chujianyun.verifier.DatabaseHealthChecker;
  5. import com.chujianyun.verifier.NetworkHealthChecker;
  6. import java.util.concurrent.*;
  7. public class ApplicationStartupUtil
  8. {
  9. private static BlockingQueue <Runnable
    >
    services;
  10. private static CountDownLatch latch;
  11. private ApplicationStartupUtil()
  12. {
  13. }
  14. private final static ApplicationStartupUtil INSTANCE = new ApplicationStartupUtil();
  15. public static ApplicationStartupUtil getInstance()
  16. {
  17. return INSTANCE;
  18. }
  19. public static boolean checkExternalServices() throws Exception
  20. {
  21. latch = new CountDownLatch(3);
  22. services = new ArrayBlockingQueue <>(3);
  23. services.add(new NetworkHealthChecker(latch));
  24. services.add(new CacheHealthChecker(latch));
  25. services.add(new DatabaseHealthChecker(latch));
  26. ExecutorService executorService = Executors.newFixedThreadPool(services.size());
  27. for(final Runnable v : services)
  28. {
  29. executorService.execute(v);
  30. }
  31. latch.await();
  32. for(final Runnable v : services)
  33. {
  34. BaseHealthChecker baseHealthChecker = (BaseHealthChecker) v;
  35. if( ! baseHealthChecker.isServiceUp())
  36. {
  37. return false;
  38. }
  39. }
  40. return true;
  41. }
  42. }

其中有下面程式碼:

 

ExecutorService executorService = Executors.newFixedThreadPool(services.size());
  

由於IDEA安裝了阿里的Java程式設計規範檢查外掛,提示讓手動建立執行緒池

1、修改程式碼

檢視newFixedThreadPool函式原始碼:

 


  
  1. /**
  2. * Creates a thread pool that reuses a fixed number of threads
  3. * operating off a shared unbounded queue. At any point, at most
  4. * {@code nThreads} threads will be active processing tasks.
  5. * If additional tasks are submitted when all threads are active,
  6. * they will wait in the queue until a thread is available.
  7. * If any thread terminates due to a failure during execution
  8. * prior to shutdown, a new one will take its place if needed to
  9. * execute subsequent tasks. The threads in the pool will exist
  10. * until it is explicitly {@link ExecutorService#shutdown shutdown}.
  11. *
  12. * @param nThreads the number of threads in the pool
  13. * @return the newly created thread pool
  14. * @throws IllegalArgumentException if {@code nThreads <= 0}
  15. */
  16. public static ExecutorService newFixedThreadPool(int nThreads) {
  17. return new ThreadPoolExecutor(nThreads, nThreads,
  18. 0L, TimeUnit.MILLISECONDS,
  19. new LinkedBlockingQueue<Runnable>());
  20. }

得知該函式最終呼叫的還是ThreadPoolExecutor構造方法。

因此上面一句可以改成:

 


  
  1. int size = services.size();
  2. ExecutorService executorService = new ThreadPoolExecutor(size,size,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue <Runnable>());

但是又有提示,建議要為執行緒池中的執行緒設定名稱:

僅此在構造方法後加入TreadFactory,大功告成 

 


  
  1. ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-call-runner-%d").build();
  2. int size = services.size();
  3. ExecutorService executorService = new ThreadPoolExecutor(size,size,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue <Runnable>(),namedThreadFactory);

2、為什麼要這麼做呢?

我們參考阿里巴巴的Java開發手冊內容:

8.   【強制】執行緒池不允許使用Executors去建立,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確執行緒池的執行規則,規避資源耗盡的風險。

說明:Executors各個方法的弊端:

1)    newFixedThreadPool和newSingleThreadExecutor:  主要問題是堆積的請求處理佇列可能會耗費非常大的記憶體,甚至OOM。

2)    newCachedThreadPool和newScheduledThreadPool:  主要問題是執行緒數最大數是Integer.MAX_VALUE,可能會建立數量非常多的執行緒,甚至OOM。

9. 【強制】建立執行緒或執行緒池時請指定有意義的執行緒名稱,方便出錯時回溯。

我在此簡單進一步解讀一下:

[1] newFixedThreadPool和newSingleThreadExecutor 由於最後一個引數即工作佇列是:


  
  1. /**
  2. * Creates a thread pool that reuses a fixed number of threads
  3. * operating off a shared unbounded queue. At any point, at most
  4. * {@code nThreads} threads will be active processing tasks.
  5. * If additional tasks are submitted when all threads are active,
  6. * they will wait in the queue until a thread is available.
  7. * If any thread terminates due to a failure during execution
  8. * prior to shutdown, a new one will take its place if needed to
  9. * execute subsequent tasks. The threads in the pool will exist
  10. * until it is explicitly {@link ExecutorService#shutdown shutdown}.
  11. *
  12. * @param nThreads the number of threads in the pool
  13. * @return the newly created thread pool
  14. * @throws IllegalArgumentException if {@code nThreads <= 0}
  15. */
  16. public static ExecutorService newFixedThreadPool(int nThreads) {
  17. return new ThreadPoolExecutor(nThreads, nThreads,
  18. 0L, TimeUnit.MILLISECONDS,
  19. new LinkedBlockingQueue<Runnable>());
  20. }

連結串列型別的阻塞佇列,而我們看其建構函式發現,預設佇列大小是整數的最大值!!


  
  1. /**
  2. * Creates a {@code LinkedBlockingQueue} with a capacity of
  3. * {@link Integer#MAX_VALUE}.
  4. */
  5. public LinkedBlockingQueue() {
  6. this(Integer.MAX_VALUE);
  7. }
  8. /**
  9. * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
  10. *
  11. * @param capacity the capacity of this queue
  12. * @throws IllegalArgumentException if {@code capacity} is not greater
  13. * than zero
  14. */
  15. public LinkedBlockingQueue(int capacity) {
  16. if (capacity <= 0) throw new IllegalArgumentException();
  17. this.capacity = capacity;
  18. last = head = new Node<E>( null);
  19. }

所以如果請求太多,佇列很可能就耗費記憶體非常大導致OOM.

但是他們的執行緒數是固定的,而且一般不會太大,所以不會因為建立過多執行緒而導致OOM。

[2]newCachedThreadPool和newScheduledThreadPool:


  
  1. /**
  2. * Creates a thread pool that creates new threads as needed, but
  3. * will reuse previously constructed threads when they are
  4. * available. These pools will typically improve the performance
  5. * of programs that execute many short-lived asynchronous tasks.
  6. * Calls to {@code execute} will reuse previously constructed
  7. * threads if available. If no existing thread is available, a new
  8. * thread will be created and added to the pool. Threads that have
  9. * not been used for sixty seconds are terminated and removed from
  10. * the cache. Thus, a pool that remains idle for long enough will
  11. * not consume any resources. Note that pools with similar
  12. * properties but different details (for example, timeout parameters)
  13. * may be created using {@link ThreadPoolExecutor} constructors.
  14. *
  15. * @return the newly created thread pool
  16. */
  17. public static ExecutorService newCachedThreadPool() {
  18. return new ThreadPoolExecutor( 0, Integer.MAX_VALUE,
  19. 60L, TimeUnit.SECONDS,
  20. new SynchronousQueue<Runnable>());
  21. }

其中第最大執行緒池大小是整數的最大值,因此執行緒可能不斷建立,乃至到整數的最大值個執行緒,很容易導致OOM.

其中工作佇列使用的是 SynchronousQueue<E>

原始碼頭部的註釋中有說明


  
  1. * A { @linkplain BlockingQueue blocking queue} in which each insert
  2. * operation must wait for a corresponding remove operation by another
  3. * thread, and vice versa. A synchronous queue does not have any
  4. * internal capacity, not even a capacity of one. You cannot
  5. * { @code peek} at a synchronous queue because an element is only
  6. * present when you try to remove it; you cannot insert an element
  7. * (using any method) unless another thread is trying to remove it;
  8. * you cannot iterate as there is nothing to iterate. The
  9. * <em>head</em> of the queue is the element that the first queued
  10. * inserting thread is trying to add to the queue; if there is no such
  11. * queued thread then no element is available for removal and
  12. * { @code poll()} will return { @code null}. For purposes of other
  13. * { @code Collection} methods ( for example { @code contains}), a
  14. * { @code SynchronousQueue} acts as an empty collection. This queue
  15. * does not permit { @code null} elements.

可以看出

A {@linkplain BlockingQueue blocking queue} in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa.

該型別的阻塞佇列每一個插入操作必須等待對應的元素被另一個執行緒所移除,反之亦然。

因此阻塞佇列不會無限拓展而導致OOM。

因此我們理解一些原則的時候,學習的時候多注重原始碼分析非常有必要,其他細節有待以後深入研究。

參考文章:http://www.crazyant.net/2124.html

最近了解一下執行緒池,下載其中的程式碼並執行。