手寫簡單的執行緒池
阿新 • • 發佈:2020-09-02
關於執行緒池的詳細分析,具體請看這篇帖子:https://www.cnblogs.com/reecelin/p/12334107.html
在開始之前,我們還是再來複習一下執行緒池的工作流程吧,如下所示:
現在,我們可以根據上面的流程簡單寫個執行緒池,如下:
public interface ThreadPool { /** * 新增任務 * @param command * @return */ boolean submit(Runnable command); /** * 關閉執行緒池,若有任務,則等待 任務執行完畢 */ void shutDown(); /** * 立即關閉執行緒池 */ void shutDownNow(); } public class MyThreadPool implements ThreadPool { //當前正在工作的執行緒數 private int currentNum; //核心執行緒數 private int corePoolSize; //最大核心執行緒數 private static final int MAX_CORE_POOL_SIZE = 3; //執行緒池能容納的最大執行緒數 private int maxPoolSize; //最大執行緒數 private static final int MAX_POOL_SIZE = 6; //執行緒池是否啟動 private volatile boolean isRunning = true; //存放新增進執行緒池的任務佇列 private BlockingQueue<Runnable> queue; private int queueSize; //任務佇列最大長度 這裡設定的比較小是為了後面可以使用拒絕策略 private static final int MAX_QUEUE_SIZE = 20; //儲存用於執行新增進執行緒池中任務的工作執行緒佇列 private List<Worker> workers; private ReentrantLock lock=new ReentrantLock(); public MyThreadPool() { this(MAX_CORE_POOL_SIZE, MAX_POOL_SIZE, MAX_QUEUE_SIZE); } public MyThreadPool(int corePoolSize, int maxPoolSize, int queueSize) { this.corePoolSize = corePoolSize > MAX_CORE_POOL_SIZE ? MAX_CORE_POOL_SIZE : corePoolSize; this.maxPoolSize = maxPoolSize > MAX_POOL_SIZE ? MAX_POOL_SIZE : maxPoolSize; this.queueSize = queueSize > MAX_QUEUE_SIZE ? MAX_QUEUE_SIZE : queueSize; queue = new LinkedBlockingQueue<>(this.queueSize); workers = Collections.synchronizedList(new ArrayList<>(this.maxPoolSize)); intialThreadPool(); } private void intialThreadPool() { for (int i = 1; i <= this.corePoolSize; i++) { Worker worker = new Worker("核心執行緒-" + i); workers.add(worker); worker.start(); currentNum++; System.out.println(DateUtil.getFormat().format(new Date())+" 核心執行緒-" + i + " 啟動,等待執行任務"); } } @Override public boolean submit(Runnable command) { if (isRunning) { //若是核心執行緒數還沒達到最大,則新建核心執行緒執行任務 if (currentNum < MAX_CORE_POOL_SIZE) { String threadName = "新建核心執行緒-" + ++this.corePoolSize; Worker worker = new Worker(threadName, command); workers.add(worker); worker.start(); currentNum++; return true; } else if (currentNum >= MAX_CORE_POOL_SIZE && currentNum < MAX_POOL_SIZE) { //若是佇列未滿,則直接新增進佇列 if (queue.offer(command)) { return true; //若是佇列已滿,則建立非核心執行緒去執行任務 } else { String threadName = "非核心執行緒-" + (currentNum - MAX_CORE_POOL_SIZE); Worker worker = new Worker(threadName, command); workers.add(worker); worker.start(); currentNum++; return true; } //若是執行緒數已到最大,且佇列也滿了,則直接執行拒絕策略 } else if (currentNum >= MAX_POOL_SIZE && !queue.offer(command)) { System.out.println(DateUtil.getFormat().format(new Date())+" 執行緒池已滿負載執行,拒絕了該任務,當前任務佇列大小為:"+queue.size()); return false; } } return false; } @Override public void shutDown() { while (!queue.isEmpty()) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } isRunning = false; for (Worker worker : workers) { worker.interrupt(); // help gc worker = null; } queue.clear(); } @Override public void shutDownNow() { isRunning = false; for (Worker worker : workers) { worker.interrupt(); // help gc worker = null; } queue.clear(); } private class Worker extends Thread { private Runnable command; public Worker(String name) { super(name); } public Worker(String name, Runnable command) { super(name); this.command = command; } @Override public void run() { while (isRunning || !queue.isEmpty()) { if (command != null) { command.run(); // help gc command = null; } else { command = queue.poll(); if (command != null) { command.run(); // help gc command = null; } } } } } } public class MyTask implements Runnable { private int id; public MyTask(int id) { this.id = id; } @Override public void run() { System.out.println(Thread.currentThread().getName()+"執行任務:"+id+" 完成"); } } public class DateUtil { private static final String pattern="yyyy-MM-dd HH:mm:ss"; private static volatile SimpleDateFormat format; private DateUtil(){} public static SimpleDateFormat getFormat(){ if (format==null){ synchronized (DateUtil.class){ if (format==null){ format= new SimpleDateFormat(pattern); } } } return format; } } public class Test { public static void main(String[] args) { MyThreadPool pool=new MyThreadPool(2, 6, 20); for (int i = 1; i <=40 ; i++) { pool.submit(new MyTask(i)); } pool.shutDown(); } }
執行測試類,控制檯輸出:
2020-02-22 20:29:56 核心執行緒-1 啟動,等待執行任務 2020-02-22 20:29:56 核心執行緒-2 啟動,等待執行任務 2020-02-22 20:29:56 執行緒池已滿負載執行,拒絕了該任務,當前任務佇列大小為:20 2020-02-22 20:29:56 執行緒池已滿負載執行,拒絕了該任務,當前任務佇列大小為:20 2020-02-22 20:29:56 執行緒池已滿負載執行,拒絕了該任務,當前任務佇列大小為:20 2020-02-22 20:29:56 執行緒池已滿負載執行,拒絕了該任務,當前任務佇列大小為:20 2020-02-22 20:29:56 執行緒池已滿負載執行,拒絕了該任務,當前任務佇列大小為:20 2020-02-22 20:29:56 執行緒池已滿負載執行,拒絕了該任務,當前任務佇列大小為:20 2020-02-22 20:29:56 執行緒池已滿負載執行,拒絕了該任務,當前任務佇列大小為:20 2020-02-22 20:29:56 執行緒池已滿負載執行,拒絕了該任務,當前任務佇列大小為:20 2020-02-22 20:29:56 執行緒池已滿負載執行,拒絕了該任務,當前任務佇列大小為:20 2020-02-22 20:29:56 執行緒池已滿負載執行,拒絕了該任務,當前任務佇列大小為:20 2020-02-22 20:29:56 執行緒池已滿負載執行,拒絕了該任務,當前任務佇列大小為:20 2020-02-22 20:29:56 執行緒池已滿負載執行,拒絕了該任務,當前任務佇列大小為:20 2020-02-22 20:29:56 執行緒池已滿負載執行,拒絕了該任務,當前任務佇列大小為:20 2020-02-22 20:29:56 執行緒池已滿負載執行,拒絕了該任務,當前任務佇列大小為:20 2020-02-22 20:29:56 執行緒池已滿負載執行,拒絕了該任務,當前任務佇列大小為:20 2020-02-22 20:29:56 執行緒池已滿負載執行,拒絕了該任務,當前任務佇列大小為:20 2020-02-22 20:29:56 核心執行緒-1執行任務:2 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:3 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:4 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:5 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:6 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:7 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:8 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:9 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:10 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:11 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:12 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:13 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:14 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:15 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:16 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:17 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:18 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:19 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:20 完成 2020-02-22 20:29:56 核心執行緒-1執行任務:21 完成 2020-02-22 20:29:56 新建核心執行緒-3執行任務:1 完成 2020-02-22 20:29:56 非核心執行緒-1執行任務:23 完成 2020-02-22 20:29:56 非核心執行緒-0執行任務:22 完成 2020-02-22 20:29:56 非核心執行緒-2執行任務:24 完成
這個只是簡單的執行緒池,許多功能都沒有完善,但對於瞭解執行緒池的執行流程有一定幫助。後面會繼續完善這個demo,使其更加完整。