1. 程式人生 > 其它 >java多執行緒之自定義執行緒池

java多執行緒之自定義執行緒池

1.背景

執行緒池.....大家常用....

自己搞一個,順便練習一下多執行緒程式設計

2.自定義執行緒程式碼

2.1.拒絕策略介面

@FunctionalInterface
public interface MyRejectPolicy<T> {
    void reject(MyBlockingQueue<T> queue, T task);
}

2.2.自定義阻塞佇列

@Slf4j
public class MyBlockingQueue<T> {
    // 1.任務佇列
    private Deque<T> queue = new
ArrayDeque<>(); // 2.容量 private int capacity; // 3.鎖 private ReentrantLock lock = new ReentrantLock(); // 4.生產者條件變數 private Condition producerCondition = lock.newCondition(); // 5.消費者條件變數 private Condition consumerCondition = lock.newCondition(); // 構造方法 public MyBlockingQueue(int
capacity) { this.capacity = capacity; } // 阻塞獲取 public T take() { lock.lock(); try { // 判斷是否為空 while (queue.isEmpty()) { try { // 無任務消費者等待 log.info("無任務,消費者等待...."); consumerCondition.await(); }
catch (InterruptedException e) { e.printStackTrace(); } } // 不為空,消費者獲取一個任務 T t = queue.removeFirst(); log.info("消費者已獲取到任務t={}", t); // 通知生產者放入任務 producerCondition.signal(); return t; } finally { lock.unlock(); } } // 阻塞新增 public void put(T t) { lock.lock(); try { while (queue.size() >= capacity) { try { // 佇列已滿,生產者等待 log.info("......佇列已滿,生產者等待"); producerCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 放入佇列 queue.addLast(t); log.info("......生產者,以將任務放入佇列"); // 通知消費者獲取任務 consumerCondition.signal(); } finally { lock.unlock(); } } }

2.3.自定義工作執行緒

@Slf4j
public class MyWorkerThread extends Thread {
    // 執行緒集合
    private HashSet<MyWorkerThread> workerThreadList = new HashSet<>();
    // 任務佇列
    private MyBlockingQueue<Runnable> taskQueue;
    // 任務物件
    private Runnable task;

    /**
     * 構造方法
     *
     * @param task
     */
    public MyWorkerThread(HashSet<MyWorkerThread> workerThreadList, MyBlockingQueue<Runnable> taskQueue, Runnable task) {
        this.workerThreadList = workerThreadList;
        this.taskQueue = taskQueue;
        this.task = task;
    }

    /**
     * 執行任務
     */
    @Override
    public void run() {
        // 當【task不為空】時執行當前任務
        // 當【task為空】時,從佇列中獲取任務再執行
        while (task != null || ((task = taskQueue.take()) != null)) {
            try {
                // 執行任務
                log.info("執行當前任務:{}", task);
                task.run();
            } finally {
                // 將任務設定為空
                task = null;
            }
        }
        // 無任務,釋放執行緒
        synchronized (workerThreadList) {
            log.info("無任務,刪除當前執行緒:{}", this);
            workerThreadList.remove(this);
        }
    }
}

2.4.自定義執行緒池

@Slf4j
public class MyThreadPool {
    // 核心執行緒數
    private int coreSize;
    // 阻塞佇列
    private MyBlockingQueue<Runnable> taskQueue;
    // 執行緒集合
    private HashSet<MyWorkerThread> workerThreadList = new HashSet<>();
    // 拒絕策略
    private MyRejectPolicy<Runnable> rejectPolicy;

    /**
     * 執行緒池構造器
     *
     * @param coreSize
     * @param rejectPolicy
     */
    public MyThreadPool(int coreSize, MyRejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.taskQueue = new MyBlockingQueue<>(coreSize);
        this.rejectPolicy = rejectPolicy;
    }

    /**
     *
     */
    public void execute(Runnable task) {
        // 當任務數【沒有超過】核心執行緒數coreSize時,直接給workerThreadList 執行
        // 當任務數【超過】核心執行緒數coreSize時,放入佇列
        synchronized (workerThreadList) {
            int size = workerThreadList.size();
            if (size < coreSize) {
                MyWorkerThread workerThread = new MyWorkerThread(workerThreadList, taskQueue, task);
                workerThread.setName("工作執行緒-" + (size + 1));
                log.info("建立執行緒物件:{},執行任務:{}", workerThread, task);
                workerThreadList.add(workerThread);
                workerThread.start();
            } else {
                taskQueue.put(task);
            }
        }
    }
}

3.測試

@Slf4j
public class Test01 {
    /**
     * 測試自定義的執行緒池
     * @param args
     */
    public static void main(String[] args) {
        // 建立執行緒池
        MyThreadPool pool = new MyThreadPool(2, (queue, task) -> {
            queue.put(task);
        });
        // 執行任務
        pool.execute(()->{
            log.info("執行任務1...");
        });
        pool.execute(()->{
            log.info("執行任務2...");
        });
        pool.execute(()->{
            log.info("執行任務3...");
        });
    }
}

完美!