1. 程式人生 > >動態執行緒池管理器

動態執行緒池管理器


程式碼


import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.log4j.Log4j2;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 動態執行緒池管理器
 * - 根據名稱建立執行緒池並快取
 * - 執行緒池為快取執行緒池,每個執行緒存活一定時間
 * - 執行緒池達到最大執行緒數,改為阻塞式,由呼叫執行緒執行
 * - 定時輸出所有快取執行緒池的狀態
 * @author ZhangShuzheng
 * @date 2018/8/23
 */
@Log4j2
public class ThreadPoolManager {

    /**
     * 執行緒池map
     */
    private static ConcurrentHashMap<String, ThreadPoolExecutor> CACHE = new ConcurrentHashMap<>();

    /**
     * 併發操作鎖
     */
    private static ReentrantLock reentrantLock = new ReentrantLock();

    /**
     * 核心執行緒數大小
     */
    private static int corePoolSize = 0;

    /**
     * 最大執行緒數大小
     */
    private static int maximumPoolSize = 50;

    /**
     * 回收等待時間
     */
    private static long keepAliveTime = 60L;

    /**
     * 快取佇列大小
     */
    private static int queueSize = 1000;

    /**
     * 是否開啟監控執行緒
     */
    private static ThreadPoolMonitorThread threadPoolMonitorThread = null;

    /**
     * 監控執行緒列印日誌間隔時間
     */
    private static long monitorIntervalTime = 60 * 1000L;

    /**
     * 根據名稱獲取執行緒池
     * @param poolName
     * @return
     */
    public static ThreadPoolExecutor get(String poolName) {
        reentrantLock.lock();
        ThreadPoolExecutor threadPoolExecutor = null;
        try {
            // 根據名稱獲取快取執行緒池,沒有則新建並快取
            threadPoolExecutor = CACHE.get(poolName);
            if (null == threadPoolExecutor) {
                ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(poolName + "-%d").build();
                threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
                        new ArrayBlockingQueue<>(queueSize), namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
                CACHE.put(poolName, threadPoolExecutor);
            }
            // 開啟監控執行緒
            if (null == threadPoolMonitorThread) {
                threadPoolMonitorThread = new ThreadPoolMonitorThread(CACHE, monitorIntervalTime);
                threadPoolMonitorThread.start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            reentrantLock.unlock();
        }
        return threadPoolExecutor;
    }

}

@Log4j2
class ThreadPoolMonitorThread extends Thread {

    private ConcurrentHashMap<String, ThreadPoolExecutor> cache;

    private long monitorIntervalTime;

    public ThreadPoolMonitorThread(ConcurrentHashMap<String, ThreadPoolExecutor> cache, long monitorIntervalTime) {
        this.cache = cache;
        this.monitorIntervalTime = monitorIntervalTime;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(monitorIntervalTime);
                Iterator<Map.Entry<String, ThreadPoolExecutor>> iterator = cache.entrySet().iterator();
                while (iterator.hasNext()) {
                    Map.Entry<String, ThreadPoolExecutor> entry = iterator.next();
                    String poolName = entry.getKey();
                    ThreadPoolExecutor threadPoolExecutor = entry.getValue();
                    int poolSize = threadPoolExecutor.getPoolSize();
                    int corePoolSize = threadPoolExecutor.getCorePoolSize();
                    int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
                    int largestPoolSize = threadPoolExecutor.getLargestPoolSize();
                    int activeCount = threadPoolExecutor.getActiveCount();
                    long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
                    long taskCount = threadPoolExecutor.getTaskCount();
                    log.info("[ThreadPoolMonitorThread][{}]: " +
                            "poolSize={}, " +
                            "corePoolSize={}, " +
                            "maximumPoolSize={}, " +
                            "largestPoolSize={}, " +
                            "activeCount={}, " +
                            "completedTaskCount={}, " +
                            "taskCount={}",
                            poolName, poolSize, corePoolSize, maximumPoolSize,
                            largestPoolSize,activeCount, completedTaskCount, taskCount);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}