執行緒池的學習和使用
阿新 • • 發佈:2020-03-06
[toc]
## 什麼是執行緒池
執行緒池的作用是初始化一些執行緒,當有任務的時候,就從中啟動一個來執行相關任務,執行完後,執行緒資源重新回收到執行緒池中,達到複用的效果,從而減少資源的開銷
## 建立執行緒池
在JDK中,`Executors`類已經幫我們封裝了建立執行緒池的方法。
```java
Executors.newFixedThreadPool();
Executors.newCachedThreadPool();
Executors.newScheduledThreadPool();
```
但是點進去看的話,
```java
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
```
它的內部實現還是基於`ThreadPoolExecutor`來實現的。通過阿里程式碼規範外掛掃描會提示我們用`ThreadPoolExecutor`去實現執行緒池。通過檢視`ThreadPoolExecutor`的構造方法
```java
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
do something
...
}
```
我覺得有以下幾方面的原因。
1. 可以靈活設定`keepAliveTime`(當執行緒池中執行緒數大於`corePoolSize`的數m, 為這m個執行緒設定的最長等待時間 ),節約系統資源。
2. `workQueue`:執行緒等待佇列,在`Executors`中預設的是`LinkedBlockingQueue`。可以理解是一種無界的陣列,當有不斷有執行緒來的時候,可能會撐爆機器記憶體。
3. 可以設執行緒工廠,裡面新增自己想要的一些元素,只需要實現JDK的`ThreadFactory`類。
4. 按照自己的業務設定合適的拒絕策略。策略有以下幾種
1. AbortPolicy:直接丟擲拒絕異常(繼承自RuntimeException),會中斷呼叫者的處理過程,所以除非有明確需求,一般不推薦
2. DiscardPolicy:默默丟棄無法載入的任務。
3. DiscardOldestPolicy:丟棄佇列中最老的,然後再次嘗試提交新任務。
4. CallerRunsPolicy:在呼叫者執行緒中(也就是說誰把 r 這個任務甩來的),運行當前被丟棄的任務。只會用呼叫者所線上程來執行任務,也就是說任務不會進入執行緒池。如果執行緒池已經被關閉,則直接丟棄該任務。
## 使用執行緒池
### 宣告`ThreadFactory`
```java
public class NacosSyncThreadFactory implements ThreadFactory {
private final AtomicInteger threadNum = new AtomicInteger(1);
private String threadPrefix = null;
private ThreadGroup threadGroup;
public NacosSyncThreadFactory(String prefix) {
this.threadPrefix = "thread" + "-" + prefix + "-" ;
threadGroup = Thread.currentThread().getThreadGroup();
}
public NacosSyncThreadFactory() {
this("pool");
}
@Override
public Thread newThread(Runnable r) {
String name = threadPrefix + threadNum.incrementAndGet();
Thread thread = new Thread(threadGroup, r, name);
return thread;
}
}
```
### 建立執行緒池類
```java
public class MyThreadPool {
private ThreadFactory threadFactory;
private int threadNum;
private BlockingQueue blockingQueue;
private RejectedExecutionHandler handler;
public MyThreadPool(ThreadFactory threadFactory, int threadNum,
BlockingQueue blockingQueue,
RejectedExecutionHandler handler ) {
this.threadFactory = threadFactory;
this.threadNum = threadNum;
this.blockingQueue = blockingQueue;
this.handler = handler;
}
public MyThreadPool() {
this(Executors.defaultThreadFactory(), 10,
new ArrayBlockingQueue(10), new ThreadPoolExecutor.AbortPolicy());
}
public ThreadPoolExecutor initThreadPool(ThreadFactory threadFactory, int threadNum, BlockingQueue blockingQueue, RejectedExecutionHandler handler) {
if (handler == null) {
handler = new ThreadPoolExecutor.AbortPolicy();
}
return new ThreadPoolExecutor(1, threadNum, 5, TimeUnit.SECONDS, blockingQueue, threadFactory, handler);
}
}
```
### 呼叫執行緒池
1. 初始化執行緒池類
```java
MyThreadPool myThreadPool = new MyThreadPool();
threadPoolExecutor = myThreadPool.initThreadPool(
new NacosSyncThreadFactory("nacos-sync"),
threadNum,
new ArrayBlockingQueue(10),
new ThreadPoolExecutor.DiscardPolicy()
);
}
```
2. 建立Callable(FutureTask)
```java
/**
* 分頁獲取task資訊
* @return
*/
private List getTask(int pageNum) {
IPage page = new Page(pageNum, 25);
IPage taskIPage = this.taskService.page(page);
if (null == taskIPage || CollectionUtils.isEmpty(taskIPage.getRecords())) {
return null;
}
return taskIPage.getRecords();
}
// 執行任務
private FutureTask assembleTaskFuture(Task task) {
FutureTask futureTask = new FutureTask(() -> {
// 執行任務
this.doSyncWork(task);
return "success";
});
return futureTask;
}
```
3. 執行任務(FutureTask)
```java
public void zkSync() {
// 獲取資料總數,得到執行緒數
int count = this.taskService.count();
int pageSize = 25;
int num = count / pageSize;
int pageTotal = count % pageSize == 0 ? num : num + 1;
log.info("========總記錄數:{}=====總頁數:{}", count, pageTotal);
for (int i = 1; i <= pageTotal; i++) {
List taskList = this.getTask(i);
if (CollectionUtils.isEmpty(taskList)) {
break;
}
List collect = taskList.stream().map(task -> task.getId()).collect(Collectors.toList());
taskList.forEach(task -> {
FutureTask futureTask = this.assembleTaskFuture(task);
threadPoolExecutor.execute(futureTask);
});
}
threadPoolExecutor.shutdown();
}