動態執行緒池管理器
阿新 • • 發佈:2019-01-04
import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.log4j.Log4j2; import java.util.Iterator; import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; /** * 動態執行緒池管理器 * - 根據名稱建立執行緒池並快取 * - 執行緒池為快取執行緒池,每個執行緒存活一定時間 * - 執行緒池達到最大執行緒數,改為阻塞式,由呼叫執行緒執行 * - 定時輸出所有快取執行緒池的狀態 * @author ZhangShuzheng * @date 2018/8/23 */ @Log4j2 public class ThreadPoolManager { /** * 執行緒池map */ private static ConcurrentHashMap<String, ThreadPoolExecutor> CACHE = new ConcurrentHashMap<>(); /** * 併發操作鎖 */ private static ReentrantLock reentrantLock = new ReentrantLock(); /** * 核心執行緒數大小 */ private static int corePoolSize = 0; /** * 最大執行緒數大小 */ private static int maximumPoolSize = 50; /** * 回收等待時間 */ private static long keepAliveTime = 60L; /** * 快取佇列大小 */ private static int queueSize = 1000; /** * 是否開啟監控執行緒 */ private static ThreadPoolMonitorThread threadPoolMonitorThread = null; /** * 監控執行緒列印日誌間隔時間 */ private static long monitorIntervalTime = 60 * 1000L; /** * 根據名稱獲取執行緒池 * @param poolName * @return */ public static ThreadPoolExecutor get(String poolName) { reentrantLock.lock(); ThreadPoolExecutor threadPoolExecutor = null; try { // 根據名稱獲取快取執行緒池,沒有則新建並快取 threadPoolExecutor = CACHE.get(poolName); if (null == threadPoolExecutor) { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(poolName + "-%d").build(); threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize), namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); CACHE.put(poolName, threadPoolExecutor); } // 開啟監控執行緒 if (null == threadPoolMonitorThread) { threadPoolMonitorThread = new ThreadPoolMonitorThread(CACHE, monitorIntervalTime); threadPoolMonitorThread.start(); } } catch (Exception e) { e.printStackTrace(); } finally { reentrantLock.unlock(); } return threadPoolExecutor; } } @Log4j2 class ThreadPoolMonitorThread extends Thread { private ConcurrentHashMap<String, ThreadPoolExecutor> cache; private long monitorIntervalTime; public ThreadPoolMonitorThread(ConcurrentHashMap<String, ThreadPoolExecutor> cache, long monitorIntervalTime) { this.cache = cache; this.monitorIntervalTime = monitorIntervalTime; } @Override public void run() { while (true) { try { Thread.sleep(monitorIntervalTime); Iterator<Map.Entry<String, ThreadPoolExecutor>> iterator = cache.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<String, ThreadPoolExecutor> entry = iterator.next(); String poolName = entry.getKey(); ThreadPoolExecutor threadPoolExecutor = entry.getValue(); int poolSize = threadPoolExecutor.getPoolSize(); int corePoolSize = threadPoolExecutor.getCorePoolSize(); int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); int largestPoolSize = threadPoolExecutor.getLargestPoolSize(); int activeCount = threadPoolExecutor.getActiveCount(); long completedTaskCount = threadPoolExecutor.getCompletedTaskCount(); long taskCount = threadPoolExecutor.getTaskCount(); log.info("[ThreadPoolMonitorThread][{}]: " + "poolSize={}, " + "corePoolSize={}, " + "maximumPoolSize={}, " + "largestPoolSize={}, " + "activeCount={}, " + "completedTaskCount={}, " + "taskCount={}", poolName, poolSize, corePoolSize, maximumPoolSize, largestPoolSize,activeCount, completedTaskCount, taskCount); } } catch (InterruptedException e) { e.printStackTrace(); } } } }