1. 程式人生 > >執行緒池的學習和使用

執行緒池的學習和使用

[toc] ## 什麼是執行緒池 執行緒池的作用是初始化一些執行緒,當有任務的時候,就從中啟動一個來執行相關任務,執行完後,執行緒資源重新回收到執行緒池中,達到複用的效果,從而減少資源的開銷 ## 建立執行緒池 在JDK中,`Executors`類已經幫我們封裝了建立執行緒池的方法。 ```java Executors.newFixedThreadPool(); Executors.newCachedThreadPool(); Executors.newScheduledThreadPool(); ``` 但是點進去看的話, ```java public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } ``` 它的內部實現還是基於`ThreadPoolExecutor`來實現的。通過阿里程式碼規範外掛掃描會提示我們用`ThreadPoolExecutor`去實現執行緒池。通過檢視`ThreadPoolExecutor`的構造方法 ```java public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... do something ... } ``` 我覺得有以下幾方面的原因。 1. 可以靈活設定`keepAliveTime`(當執行緒池中執行緒數大於`corePoolSize`的數m, 為這m個執行緒設定的最長等待時間 ),節約系統資源。 2. `workQueue`:執行緒等待佇列,在`Executors`中預設的是`LinkedBlockingQueue`。可以理解是一種無界的陣列,當有不斷有執行緒來的時候,可能會撐爆機器記憶體。 3. 可以設執行緒工廠,裡面新增自己想要的一些元素,只需要實現JDK的`ThreadFactory`類。 4. 按照自己的業務設定合適的拒絕策略。策略有以下幾種 1. AbortPolicy:直接丟擲拒絕異常(繼承自RuntimeException),會中斷呼叫者的處理過程,所以除非有明確需求,一般不推薦 2. DiscardPolicy:默默丟棄無法載入的任務。 3. DiscardOldestPolicy:丟棄佇列中最老的,然後再次嘗試提交新任務。 4. CallerRunsPolicy:在呼叫者執行緒中(也就是說誰把 r 這個任務甩來的),運行當前被丟棄的任務。只會用呼叫者所線上程來執行任務,也就是說任務不會進入執行緒池。如果執行緒池已經被關閉,則直接丟棄該任務。 ## 使用執行緒池 ### 宣告`ThreadFactory` ```java public class NacosSyncThreadFactory implements ThreadFactory { private final AtomicInteger threadNum = new AtomicInteger(1); private String threadPrefix = null; private ThreadGroup threadGroup; public NacosSyncThreadFactory(String prefix) { this.threadPrefix = "thread" + "-" + prefix + "-" ; threadGroup = Thread.currentThread().getThreadGroup(); } public NacosSyncThreadFactory() { this("pool"); } @Override public Thread newThread(Runnable r) { String name = threadPrefix + threadNum.incrementAndGet(); Thread thread = new Thread(threadGroup, r, name); return thread; } } ``` ### 建立執行緒池類 ```java public class MyThreadPool { private ThreadFactory threadFactory; private int threadNum; private BlockingQueue blockingQueue; private RejectedExecutionHandler handler; public MyThreadPool(ThreadFactory threadFactory, int threadNum, BlockingQueue blockingQueue, RejectedExecutionHandler handler ) { this.threadFactory = threadFactory; this.threadNum = threadNum; this.blockingQueue = blockingQueue; this.handler = handler; } public MyThreadPool() { this(Executors.defaultThreadFactory(), 10, new ArrayBlockingQueue(10), new ThreadPoolExecutor.AbortPolicy()); } public ThreadPoolExecutor initThreadPool(ThreadFactory threadFactory, int threadNum, BlockingQueue blockingQueue, RejectedExecutionHandler handler) { if (handler == null) { handler = new ThreadPoolExecutor.AbortPolicy(); } return new ThreadPoolExecutor(1, threadNum, 5, TimeUnit.SECONDS, blockingQueue, threadFactory, handler); } } ``` ### 呼叫執行緒池 1. 初始化執行緒池類 ```java MyThreadPool myThreadPool = new MyThreadPool(); threadPoolExecutor = myThreadPool.initThreadPool( new NacosSyncThreadFactory("nacos-sync"), threadNum, new ArrayBlockingQueue(10), new ThreadPoolExecutor.DiscardPolicy() ); } ``` 2. 建立Callable(FutureTask) ```java /** * 分頁獲取task資訊 * @return */ private List getTask(int pageNum) { IPage page = new Page(pageNum, 25); IPage taskIPage = this.taskService.page(page); if (null == taskIPage || CollectionUtils.isEmpty(taskIPage.getRecords())) { return null; } return taskIPage.getRecords(); } // 執行任務 private FutureTask assembleTaskFuture(Task task) { FutureTask futureTask = new FutureTask(() -> { // 執行任務 this.doSyncWork(task); return "success"; }); return futureTask; } ``` 3. 執行任務(FutureTask) ```java public void zkSync() { // 獲取資料總數,得到執行緒數 int count = this.taskService.count(); int pageSize = 25; int num = count / pageSize; int pageTotal = count % pageSize == 0 ? num : num + 1; log.info("========總記錄數:{}=====總頁數:{}", count, pageTotal); for (int i = 1; i <= pageTotal; i++) { List taskList = this.getTask(i); if (CollectionUtils.isEmpty(taskList)) { break; } List collect = taskList.stream().map(task -> task.getId()).collect(Collectors.toList()); taskList.forEach(task -> { FutureTask futureTask = this.assembleTaskFuture(task); threadPoolExecutor.execute(futureTask); }); } threadPoolExecutor.shutdown(); }