JMX視覺化監控執行緒池
前兩天閱讀公司程式碼看到了用JMX監控定時任務資訊和狀態,JMX這個單詞感覺很熟於是便去查閱了一下,並寫了監控執行緒池的Demo
通過閱讀本篇文章你將瞭解到:
- JMX介紹
- 執行緒池介紹
- JMX監控執行緒池應用
什麼是JMX
JMX簡介
JMX(Java Management Extensions)
,監控管理框架,通過使用JMX
可以監控和管理應用程式。JMX
最常見的場景是監控Java
程式的基本資訊和執行情況,任何Java
程式都可以開啟JMX
,然後使用JConsole
或Visual VM
進行預覽
JMX架構
總共分為三層,分發層、代理層、裝置層分發層:根據不同的協議定義了對代理層進行各種操作的管理介面,簡單的來說是監控指標的檢視方式,可以是
HTTP
RMI
連線、SNMP
連線代理層:管理
MBean
,通過將MBean
註冊到代理層實現MBean
的管理,除了註冊MBean
,還可以註冊Adapter
,代理層在應用中一般都是MBeanService
裝置層:監控指標抽象出的類,可以分為以下幾種:
- Standard MBean
- Dynamic MBean
- Open MBean
- Model MBean
- MXBean
應用中一般使用Standard MBean
比較多,所以這裡只介紹Standard MBean
,使用Standard MBean
需要滿足一定的規則,規則如下:
- 定義一個介面,介面名必須為
XXXXMBean
的格式,必須以MBean
結尾 - 如果介面為
XXXXMBean
MBean
,否則程式將報錯 - 介面中通過
get
和set
方法表示監控指標是否可讀、可寫。比如getXXX()
抽象方法,則XXX
就是監控的指標,getXXX()
表示XXX
效能指標可讀,setXXX()
方法表示該監控指標可寫 - 引數和返回型別只能是簡單的引用型別(如
String
)和基本資料型別,不可以是自定義型別,如果返回值為自定義型別可以選擇MXBean
執行緒池簡單介紹
執行緒池是執行緒的管理工具,通過使用執行緒池可以複用執行緒降低資源消耗、提高響應速度、提高執行緒的可管理性。如果在系統中大量使用執行緒池,就必須對執行緒池進行監控方便出錯時定位問題。可以通過執行緒池提供的引數進行監控,執行緒池提供的引數如下:
方法 | 含義 |
---|---|
getActiveCount | 執行緒池中正在執行任務的執行緒數量 |
getCompletedTaskCount | 執行緒池已完成的任務數量 |
getCorePoolSize | 執行緒池的核心執行緒數量 |
getLargestPoolSize | 執行緒池曾經建立過的最大執行緒數量 |
getMaximumPoolSize | 執行緒池的最大執行緒數量 |
getPoolSize | 執行緒池當前的執行緒數量 |
getTaskCount | 執行緒池需要執行的任務數量 |
應用
介紹完JMX
及執行緒池以後,寫一個JMX
監控執行緒池的Demo
,總不能紙上談兵吧
-
定義執行緒池監控類:
ThreadPoolMonitor.java
public class ThreadPoolMonitor extends ThreadPoolExecutor { private final Logger logger = LoggerFactory.getLogger(getClass()); /** * ActiveCount * */ int ac = 0; /** * 當前所有執行緒消耗的時間 * */ private AtomicLong totalCostTime = new AtomicLong(); /** * 當前執行的執行緒總數 * */ private AtomicLong totalTasks = new AtomicLong(); /** * 執行緒池名稱 */ private String poolName; /** * 最短 執行時間 * */ private long minCostTime; /** * 最長執行時間 * */ private long maxCostTime; /** * 儲存任務開始執行的時間 */ private ThreadLocal<Long> startTime = new ThreadLocal<>(); public ThreadPoolMonitor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,String poolName) { this(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,Executors.defaultThreadFactory(),poolName); } public ThreadPoolMonitor(int corePoolSize,ThreadFactory threadFactory,String poolName) { super(corePoolSize,threadFactory); this.poolName = poolName; } public static ExecutorService newFixedThreadPool(int nThreads,String poolName) { return new ThreadPoolMonitor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),poolName); } public static ExecutorService newCachedThreadPool(String poolName) { return new ThreadPoolMonitor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue<>(),poolName); } public static ExecutorService newSingleThreadExecutor(String poolName) { return new ThreadPoolMonitor(1,1,poolName); } /** * 執行緒池延遲關閉時(等待執行緒池裡的任務都執行完畢),統計執行緒池情況 */ @Override public void shutdown() { // 統計已執行任務、正在執行任務、未執行任務數量 logger.info("{} Going to shutdown. Executed tasks: {},Running tasks: {},Pending tasks: {}",this.poolName,this.getCompletedTaskCount(),this.getActiveCount(),this.getQueue().size()); super.shutdown(); } @Override public List<Runnable> shutdownNow() { // 統計已執行任務、正在執行任務、未執行任務數量 logger.info("{} Going to immediately shutdown. Executed tasks: {},this.getQueue().size()); return super.shutdownNow(); } /** * 任務執行之前,記錄任務開始時間 */ @Override protected void beforeExecute(Thread t,Runnable r) { startTime.set(System.currentTimeMillis()); } /** * 任務執行之後,計算任務結束時間 */ @Override protected void afterExecute(Runnable r,Throwable t) { long costTime = System.currentTimeMillis() - startTime.get(); startTime.remove(); //刪除,避免佔用太多記憶體 //設定最大最小執行時間 maxCostTime = maxCostTime > costTime ? maxCostTime : costTime; if (totalTasks.get() == 0) { minCostTime = costTime; } minCostTime = minCostTime < costTime ? minCostTime : costTime; totalCostTime.addAndGet(costTime); totalTasks.incrementAndGet(); logger.info("{}-pool-monitor: " + "Duration: {} ms,PoolSize: {},CorePoolSize: {},ActiveCount: {}," + "Completed: {},Task: {},Queue: {},LargestPoolSize: {}," + "MaximumPoolSize: {},KeepAliveTime: {},isShutdown: {},isTerminated: {}",costTime,this.getPoolSize(),this.getCorePoolSize(),super.getActiveCount(),this.getTaskCount(),this.getQueue().size(),this.getLargestPoolSize(),this.getMaximumPoolSize(),this.getKeepAliveTime(TimeUnit.MILLISECONDS),this.isShutdown(),this.isTerminated()); } public int getAc() { return ac; } /** * 執行緒平均耗時 * * @return * */ public float getAverageCostTime() { return totalCostTime.get() / totalTasks.get(); } /** * 執行緒最大耗時 * */ public long getMaxCostTime() { return maxCostTime; } /** * 執行緒最小耗時 * */ public long getMinCostTime() { return minCostTime; } /** * 生成執行緒池所用的執行緒,改寫了執行緒池預設的執行緒工廠 */ static class EventThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; /** * 初始化執行緒工廠 * * @param poolName 執行緒池名稱 */ EventThreadFactory(String poolName) { SecurityManager s = System.getSecurityManager(); group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-"; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group,r,namePrefix + threadNumber.getAndIncrement(),0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } } 複製程式碼
通過繼承執行緒池來自定義執行緒池,並在建構函式中加入了
poolName
標明是哪一個執行緒池,同時重寫了beforeExecute
、afterExecute
、terminated
等方法,在beforeExecute
方法中記錄執行緒池執行的時間,在afterExecute
方法中計算執行緒執行的耗時、最大耗時、最小耗時、平均耗時。重寫執行緒池生成執行緒的方法,指定了生成的執行緒名 -
定義一個
MBean
:ThreadPoolParamMBean.java
MBean
其實並不是一個實體類而是一個介面,裡面定義了監控的指標public interface ThreadPoolParamMBean { /** * 執行緒池中正在執行任務的執行緒數量 * * @return */ int getActiveCount(); /** * 執行緒池已完成的任務數量 * * @return */ long getCompletedTaskCount(); /** * 執行緒池的核心執行緒數量 * * @return */ int getCorePoolSize(); /** * 執行緒池曾經建立過的最大執行緒數量 * * @return */ int getLargestPoolSize(); /** * 執行緒池的最大執行緒數量 * * @return */ int getMaximumPoolSize(); /** * 執行緒池當前的執行緒數量 * * @return */ int getPoolSize(); /** * 執行緒池需要執行的任務數量 * * @return */ long getTaskCount(); /** * 執行緒最大耗時 * * @return * */ long getMaxCostTime(); /** * 執行緒最小耗時 * * @return * */ long getMinCostTime(); /** * 執行緒平均耗時 * * @return * */ float getAverageCostTime(); } 複製程式碼
-
定義一個
MBean
實現類:ThreadPoolParam.java
定義的是靜態MBean,所以介面實現類必須滿足規定,即xxxMBean
,實現類為xxx
public class ThreadPoolParam implements ThreadPoolParamMBean { private ThreadPoolMonitor threadPoolMonitor; public ThreadPoolParam(ExecutorService es) { this.threadPoolMonitor = (ThreadPoolMonitor) es; } /** * 執行緒池中正在執行任務的執行緒數量 * * @return */ @Override public int getActiveCount() { return threadPoolMonitor.getAc(); } /** * 執行緒池已完成的任務數量 * * @return */ @Override public long getCompletedTaskCount() { return threadPoolMonitor.getCompletedTaskCount(); } /** * 執行緒池的核心執行緒數量 * * @return */ @Override public int getCorePoolSize() { return threadPoolMonitor.getCorePoolSize(); } /** * 執行緒池曾經建立過的最大執行緒數量 * * @return */ @Override public int getLargestPoolSize() { return threadPoolMonitor.getLargestPoolSize(); } /** * 執行緒池的最大執行緒數量 * * @return */ @Override public int getMaximumPoolSize() { return threadPoolMonitor.getMaximumPoolSize(); } /** * 執行緒池當前的執行緒數量 * * @return */ @Override public int getPoolSize() { return threadPoolMonitor.getPoolSize(); } /** * 執行緒池需要執行的任務數量 * * @return */ @Override public long getTaskCount() { return threadPoolMonitor.getTaskCount(); } /** * 執行緒最大耗時 * * @return * */ @Override public long getMaxCostTime() { return threadPoolMonitor.getMaxCostTime(); } /** * 執行緒最小耗時 * * @return * */ @Override public long getMinCostTime() { return threadPoolMonitor.getMinCostTime(); } /** * 執行緒平均耗時 * * @return * */ @Override public float getAverageCostTime() { return threadPoolMonitor.getAverageCostTime(); } } 複製程式碼
監控的引數指標通過執行緒池得到
-
測試類:
Test.java
public class Test { private static Random random = new Random(); public static void main(String[] args) throws MalformedObjectNameException,InterruptedException { ExecutorService es1 = ThreadPoolMonitor.newCachedThreadPool("test-pool-1"); ThreadPoolParam threadPoolParam1 = new ThreadPoolParam(es1); ExecutorService es2 = ThreadPoolMonitor.newCachedThreadPool("test-pool-2"); ThreadPoolParam threadPoolParam2 = new ThreadPoolParam(es2); MBeanServerUtil.registerMBean(threadPoolParam1,new ObjectName("test-pool-1:type=threadPoolParam")); MBeanServerUtil.registerMBean(threadPoolParam2,new ObjectName("test-pool-2:type=threadPoolParam")); //http連線的方式檢視監控任務 HtmlAdaptor.start(); executeTask(es1); executeTask(es2); Thread.sleep(1000 * 60 * 60); } private static void executeTask(ExecutorService es) { new Thread(() -> { for (int i = 0; i < 10; i++) { int temp = i; es.submit(() -> { //隨機睡眠時間 try { Thread.sleep(random.nextInt(60) * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(temp); }); } }).start(); } } 複製程式碼
說明:
-
MBeanServerUtil.registerMBean()
註冊監控的類 -
HtmlAdaptor.start()
開啟HTTP
連線的方式檢視監控任務
啟動程式後開啟
http://localhost:8082/
如下圖: -
點選test-pool-1
下的type=threadPoolParam
通過重新整理獲取執行緒池最新的監控指標
test-pool-1
和type=threadPoolParam
這些屬性是在ObjectName
中定義的屬性值
總結
使用JMX
監控執行緒池只是JMX
一個功能,本篇文章只是學以致用,更多有關JMX以及執行緒池的內容可以查閱其他資料。文章若有錯誤歡迎指正
最後附:專案程式碼,歡迎fork與star,【我都劃重點了就star一下】