1. 程式人生 > 程式設計 >JMX視覺化監控執行緒池

JMX視覺化監控執行緒池

前兩天閱讀公司程式碼看到了用JMX監控定時任務資訊和狀態,JMX這個單詞感覺很熟於是便去查閱了一下,並寫了監控執行緒池的Demo

通過閱讀本篇文章你將瞭解到:

  • JMX介紹
  • 執行緒池介紹
  • JMX監控執行緒池應用

什麼是JMX

JMX簡介

JMX(Java Management Extensions),監控管理框架,通過使用JMX可以監控和管理應用程式。JMX最常見的場景是監控Java程式的基本資訊和執行情況,任何Java程式都可以開啟JMX,然後使用JConsoleVisual VM進行預覽

JMX架構

總共分為三層,分發層、代理層、裝置層
分發層:根據不同的協議定義了對代理層進行各種操作的管理介面,簡單的來說是監控指標的檢視方式,可以是HTTP
連線、RMI連線、SNMP連線
代理層:管理MBean,通過將MBean註冊到代理層實現MBean的管理,除了註冊MBean,還可以註冊Adapter,代理層在應用中一般都是MBeanService
裝置層:監控指標抽象出的類,可以分為以下幾種:

  • Standard MBean
  • Dynamic MBean
  • Open MBean
  • Model MBean
  • MXBean

應用中一般使用Standard MBean比較多,所以這裡只介紹Standard MBean,使用Standard MBean需要滿足一定的規則,規則如下:

  • 定義一個介面,介面名必須為XXXXMBean的格式,必須MBean結尾
  • 如果介面為XXXXMBean
    ,則介面實現類必須為MBean,否則程式將報錯
  • 介面中通過getset方法表示監控指標是否可讀、可寫。比如getXXX()抽象方法,則XXX就是監控的指標,getXXX()表示XXX效能指標可讀,setXXX()方法表示該監控指標可寫
  • 引數和返回型別只能是簡單的引用型別(如String)和基本資料型別,不可以是自定義型別,如果返回值為自定義型別可以選擇MXBean

執行緒池簡單介紹

執行緒池是執行緒的管理工具,通過使用執行緒池可以複用執行緒降低資源消耗、提高響應速度、提高執行緒的可管理性。如果在系統中大量使用執行緒池,就必須對執行緒池進行監控方便出錯時定位問題。可以通過執行緒池提供的引數進行監控,執行緒池提供的引數如下:

方法 含義
getActiveCount 執行緒池中正在執行任務的執行緒數量
getCompletedTaskCount 執行緒池已完成的任務數量
getCorePoolSize 執行緒池的核心執行緒數量
getLargestPoolSize 執行緒池曾經建立過的最大執行緒數量
getMaximumPoolSize 執行緒池的最大執行緒數量
getPoolSize 執行緒池當前的執行緒數量
getTaskCount 執行緒池需要執行的任務數量

應用

介紹完JMX及執行緒池以後,寫一個JMX監控執行緒池的Demo,總不能紙上談兵吧

  • 定義執行緒池監控類:ThreadPoolMonitor.java

    public class ThreadPoolMonitor extends ThreadPoolExecutor {
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        /**
         * ActiveCount
         * */
        int ac = 0;
    
        /**
         * 當前所有執行緒消耗的時間
         * */
        private AtomicLong totalCostTime = new AtomicLong();
    
        /**
         * 當前執行的執行緒總數
         * */
        private AtomicLong totalTasks = new AtomicLong();
    
        /**
         * 執行緒池名稱
         */
        private String poolName;
    
        /**
         * 最短 執行時間
         * */
        private long minCostTime;
    
        /**
         * 最長執行時間
         * */
        private long maxCostTime;
    
    
        /**
         * 儲存任務開始執行的時間
         */
        private ThreadLocal<Long> startTime = new ThreadLocal<>();
    
        public ThreadPoolMonitor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,String poolName) {
            this(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,Executors.defaultThreadFactory(),poolName);
        }
    
        public ThreadPoolMonitor(int corePoolSize,ThreadFactory threadFactory,String poolName) {
            super(corePoolSize,threadFactory);
            this.poolName = poolName;
        }
    
        public static ExecutorService newFixedThreadPool(int nThreads,String poolName) {
            return new ThreadPoolMonitor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),poolName);
        }
    
        public static ExecutorService newCachedThreadPool(String poolName) {
            return new ThreadPoolMonitor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue<>(),poolName);
        }
    
        public static ExecutorService newSingleThreadExecutor(String poolName) {
            return new ThreadPoolMonitor(1,1,poolName);
        }
    
        /**
         * 執行緒池延遲關閉時(等待執行緒池裡的任務都執行完畢),統計執行緒池情況
         */
        @Override
        public void shutdown() {
            // 統計已執行任務、正在執行任務、未執行任務數量
            logger.info("{} Going to shutdown. Executed tasks: {},Running tasks: {},Pending tasks: {}",this.poolName,this.getCompletedTaskCount(),this.getActiveCount(),this.getQueue().size());
            super.shutdown();
        }
    
        @Override
        public List<Runnable> shutdownNow() {
            // 統計已執行任務、正在執行任務、未執行任務數量
            logger.info("{} Going to immediately shutdown. Executed tasks: {},this.getQueue().size());
            return super.shutdownNow();
        }
    
        /**
         * 任務執行之前,記錄任務開始時間
         */
        @Override
        protected void beforeExecute(Thread t,Runnable r) {
            startTime.set(System.currentTimeMillis());
        }
    
        /**
         * 任務執行之後,計算任務結束時間
         */
        @Override
        protected void afterExecute(Runnable r,Throwable t) {
            long costTime = System.currentTimeMillis() - startTime.get();
            startTime.remove();  //刪除,避免佔用太多記憶體
            //設定最大最小執行時間
            maxCostTime = maxCostTime > costTime ? maxCostTime : costTime;
            if (totalTasks.get() == 0) {
                minCostTime = costTime;
            }
            minCostTime = minCostTime < costTime ? minCostTime : costTime;
            totalCostTime.addAndGet(costTime);
            totalTasks.incrementAndGet();
    
            logger.info("{}-pool-monitor: " +
                          "Duration: {} ms,PoolSize: {},CorePoolSize: {},ActiveCount: {}," +
                          "Completed: {},Task: {},Queue: {},LargestPoolSize: {}," +
                          "MaximumPoolSize: {},KeepAliveTime: {},isShutdown: {},isTerminated: {}",costTime,this.getPoolSize(),this.getCorePoolSize(),super.getActiveCount(),this.getTaskCount(),this.getQueue().size(),this.getLargestPoolSize(),this.getMaximumPoolSize(),this.getKeepAliveTime(TimeUnit.MILLISECONDS),this.isShutdown(),this.isTerminated());
        }
    
        public int getAc() {
            return ac;
        }
    
        /**
         * 執行緒平均耗時
         *
         * @return
         * */
        public float getAverageCostTime() {
            return totalCostTime.get() / totalTasks.get();
        }
    
        /**
         * 執行緒最大耗時
         * */
        public long getMaxCostTime() {
            return maxCostTime;
        }
    
        /**
         * 執行緒最小耗時
         * */
        public long getMinCostTime() {
            return minCostTime;
        }
    
        /**
         * 生成執行緒池所用的執行緒,改寫了執行緒池預設的執行緒工廠
         */
        static class EventThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;
    
            /**
             * 初始化執行緒工廠
             *
             * @param poolName 執行緒池名稱
             */
            EventThreadFactory(String poolName) {
                SecurityManager s = System.getSecurityManager();
                group = Objects.nonNull(s) ? s.getThreadGroup() :   Thread.currentThread().getThreadGroup();
                namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
            }
    
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group,r,namePrefix + threadNumber.getAndIncrement(),0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
    }
    複製程式碼

    通過繼承執行緒池來自定義執行緒池,並在建構函式中加入了poolName標明是哪一個執行緒池,同時重寫了beforeExecuteafterExecuteterminated等方法,在beforeExecute方法中記錄執行緒池執行的時間,在afterExecute方法中計算執行緒執行的耗時、最大耗時、最小耗時、平均耗時。重寫執行緒池生成執行緒的方法,指定了生成的執行緒名

  • 定義一個MBeanThreadPoolParamMBean.java
    MBean其實並不是一個實體類而是一個介面,裡面定義了監控的指標

    public interface ThreadPoolParamMBean {
        /**
         * 執行緒池中正在執行任務的執行緒數量
         *
         * @return
         */
        int getActiveCount();
    
        /**
         * 執行緒池已完成的任務數量
         *
         * @return
         */
        long getCompletedTaskCount();
    
        /**
         * 執行緒池的核心執行緒數量
         *
         * @return
         */
        int getCorePoolSize();
    
        /**
         * 執行緒池曾經建立過的最大執行緒數量
         *
         * @return
         */
        int getLargestPoolSize();
    
        /**
         * 執行緒池的最大執行緒數量
         *
         * @return
         */
        int getMaximumPoolSize();
    
        /**
         * 執行緒池當前的執行緒數量
         *
         * @return
         */
        int getPoolSize();
    
        /**
         * 執行緒池需要執行的任務數量
         *
         * @return
         */
        long getTaskCount();
    
        /**
         * 執行緒最大耗時
         *
         * @return
         * */
        long getMaxCostTime();
    
        /**
         * 執行緒最小耗時
         *
         * @return
         * */
        long getMinCostTime();
    
        /**
         * 執行緒平均耗時
         *
         * @return
         * */
        float getAverageCostTime();
    }
    複製程式碼
  • 定義一個MBean實現類:ThreadPoolParam.java
    定義的是靜態MBean,所以介面實現類必須滿足規定,即xxxMBean,實現類為xxx

    public class ThreadPoolParam implements ThreadPoolParamMBean  {
        private ThreadPoolMonitor threadPoolMonitor;
    
        public ThreadPoolParam(ExecutorService es) {
            this.threadPoolMonitor = (ThreadPoolMonitor) es;
        }
    
        /**
         * 執行緒池中正在執行任務的執行緒數量
         *
         * @return
         */
        @Override
        public int getActiveCount() {
            return threadPoolMonitor.getAc();
        }
    
        /**
         * 執行緒池已完成的任務數量
         *
         * @return
         */
        @Override
        public long getCompletedTaskCount() {
            return threadPoolMonitor.getCompletedTaskCount();
        }
    
        /**
         * 執行緒池的核心執行緒數量
         *
         * @return
         */
        @Override
        public int getCorePoolSize() {
            return threadPoolMonitor.getCorePoolSize();
        }
    
        /**
         * 執行緒池曾經建立過的最大執行緒數量
         *
         * @return
         */
        @Override
        public int getLargestPoolSize() {
            return threadPoolMonitor.getLargestPoolSize();
        }
    
        /**
         * 執行緒池的最大執行緒數量
         *
         * @return
         */
        @Override
        public int getMaximumPoolSize() {
            return threadPoolMonitor.getMaximumPoolSize();
        }
    
        /**
         * 執行緒池當前的執行緒數量
         *
         * @return
         */
        @Override
        public int getPoolSize() {
            return threadPoolMonitor.getPoolSize();
        }
    
        /**
         * 執行緒池需要執行的任務數量
         *
         * @return
         */
        @Override
        public long getTaskCount() {
            return threadPoolMonitor.getTaskCount();
        }
    
        /**
         * 執行緒最大耗時
         *
         * @return
         * */
        @Override
        public long getMaxCostTime() {
            return threadPoolMonitor.getMaxCostTime();
        }
    
        /**
         * 執行緒最小耗時
         *
         * @return
         * */
        @Override
        public long getMinCostTime() {
            return threadPoolMonitor.getMinCostTime();
        }
    
        /**
         * 執行緒平均耗時
         *
         * @return
         * */
        @Override
        public float getAverageCostTime() {
            return threadPoolMonitor.getAverageCostTime();
        }
    }
    複製程式碼

    監控的引數指標通過執行緒池得到

  • 測試類:Test.java

    public class Test {
        private static Random random = new Random();
        public static void main(String[] args) throws MalformedObjectNameException,InterruptedException {
            ExecutorService es1 = ThreadPoolMonitor.newCachedThreadPool("test-pool-1");
            ThreadPoolParam threadPoolParam1 = new ThreadPoolParam(es1);
    
            ExecutorService es2 = ThreadPoolMonitor.newCachedThreadPool("test-pool-2");
            ThreadPoolParam threadPoolParam2 = new ThreadPoolParam(es2);
    
            MBeanServerUtil.registerMBean(threadPoolParam1,new ObjectName("test-pool-1:type=threadPoolParam"));
            MBeanServerUtil.registerMBean(threadPoolParam2,new ObjectName("test-pool-2:type=threadPoolParam"));
    
            //http連線的方式檢視監控任務
            HtmlAdaptor.start();
    
            executeTask(es1);
            executeTask(es2);
            Thread.sleep(1000 * 60 * 60);
        }
    
        private static void executeTask(ExecutorService es) {
            new Thread(() -> {
                for (int i = 0; i < 10; i++) {
                    int temp = i;
                    es.submit(() -> {
                        //隨機睡眠時間
                        try {
                            Thread.sleep(random.nextInt(60) * 1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(temp);
                    });
                }
            }).start();
        }
    }
    複製程式碼

    說明:

    • MBeanServerUtil.registerMBean()註冊監控的類
    • HtmlAdaptor.start()開啟HTTP連線的方式檢視監控任務

    啟動程式後開啟http://localhost:8082/如下圖:

點選test-pool-1下的type=threadPoolParam

通過重新整理獲取執行緒池最新的監控指標 test-pool-1type=threadPoolParam這些屬性是在ObjectName中定義的屬性值

總結

使用JMX監控執行緒池只是JMX一個功能,本篇文章只是學以致用,更多有關JMX以及執行緒池的內容可以查閱其他資料。文章若有錯誤歡迎指正

最後附:專案程式碼,歡迎forkstar,【我都劃重點了就star一下】