1. 程式人生 > >自己手動寫個執行緒池

自己手動寫個執行緒池

前言:

  要自定義一個執行緒池,首先要有存固定的執行緒數目的阻塞佇列,還要有個存放任務的阻塞佇列。

  如果任務放入時有執行緒為空閒直接交付於它執行否則就放入任務佇列中

  在內部有個執行緒會不斷的掃描任務佇列如果既有空閒任務又有空閒執行緒就執行。

實現如下:

package com.yinchong.threadpool.mypool;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author Administrator 當任務數大於執行緒數時放入任務佇列中取 當任務數小於執行緒數時直接執行
 *
 *
 */
public class ThreadPool {
 private int coreCount;
 private int maxCount;
 private long waitTime;
 private TimeUnit unit;
 private boolean start = true;
 private boolean end;
 private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
 // 我的想法是從list移除來一個Thread物件執行完後再新增進去
 private LinkedBlockingQueue<MyThread> tl = new LinkedBlockingQueue<MyThread>();// 用來儲存建立自定義的Thread這裡最好使用阻塞式佇列因為你新增和移除是在不同執行緒中操作

 public ThreadPool(int coreCount, int maxCount, long waitTime, TimeUnit unit) {
  super();
  this.coreCount = coreCount;
  this.maxCount = maxCount;
  this.waitTime = waitTime;
  this.unit = unit;

  // 創造核心數目執行緒數量
  for (int i = 0; i < coreCount; i++) {
   tl.add(new MyThread());
  }
 }

 // 改進
 public void execute(Runnable runnable) {
  if (start) {
   runInQueueTask();
   start = false;
  }
  if (tl.size() > 0) {// 如果還有執行緒空閒就直接執行
   runTask(runnable);
  } else {
   queue.add(runnable);// 新增到任務佇列
  }
 }

 private void runInQueueTask() {//這個方法主要是用來開啟一個執行緒來不斷掃描任務佇列中是否有任務
  new Thread(new Runnable() {

   @Override
   public void run() {
    while (true) {//這裡讓它不斷掃描
     if (end)//如果結受到結束標記就結束
      break;
     if (queue.size() > 0 && tl.size() > 0) {//如果任務佇列不為空並且執行緒佇列也不為空
      MyThread thread = tl.remove();//從執行緒阻塞佇列中獲取一個執行緒來執行任務
      thread.setRunnable(queue.remove());
     } else {
      try {
       Thread.sleep(10);
      } catch (InterruptedException e) {
       // TODO 自動生成的 catch 塊
       e.printStackTrace();
      }
     }
    }
   }
  }).start();
 }

 private void runTask(Runnable runnable) {
  MyThread thread = tl.remove();// 從執行緒列中獲取空餘執行緒

  thread.setRunnable(runnable);// 設定任務到執行緒上
  if (thread.first)// 如果是第一次執行啟動start方法
   thread.start();
 }

 public void shutDown() {//關閉執行緒池中所有的資源來停止執行
  while (true) {
   if (tl.size() == coreCount&&queue.size()==0) {//只有當任務執行完才關閉它們
    end = true;// 修改標記使掃描任務佇列的執行緒停止
    for (MyThread thread : tl) {// 迴圈關閉每個執行緒及讓它的start方法執行完
     thread.closeThread();
    }
    break;
   }else{
    try {
     Thread.sleep(20);
    } catch (InterruptedException e) {
     // TODO 自動生成的 catch 塊
     e.printStackTrace();
    }
   }
  }
 }

 // 改造Thread方法由於呼叫start方法不能呼叫多次
 private class MyThread extends Thread {
  private Runnable runnable;
  private Lock lock = new ReentrantLock();
  private Condition condition = lock.newCondition();
  private boolean first = true;
  private boolean shutdown;

  public MyThread() {
  }

  public void setRunnable(Runnable runnable) {//設定任務,到執行緒中
   lock.lock();
   try {
    this.runnable = runnable;//修改任務
    condition.signal();//喚醒start方法
   } finally {
    lock.unlock();
   }
  }

  public void closeThread() {
   lock.lock();
   try {
    condition.signal();
    shutdown = true;
   } finally {
    lock.unlock();
   }
  }

  @Override
  public void run() {
   lock.lock();
   try {
    first = false;
    while (true) {
      runnable.run();//執行傳人的任務
     tl.add(this);//任務執行完把執行緒自己回新增執行緒阻塞佇列中方便其它任務用

     condition.await();//在沒有結受到任務時讓起阻塞住
     if (shutdown)
      break;
    }
   } catch (Exception e) {
    e.printStackTrace();
   } finally {
    lock.unlock();
   }

  }
 }

}