HashMap原理認識
1.執行緒池的作用
1. 降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。
2. 提高響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行。
3. 提高執行緒的可管理性。
2.執行緒池原理
分析:我們如何理解這幅圖呢,以及我們可以簡單對它們進行簡單的模仿?
1. 核心執行緒池就像相當於我們網際網路公司的核心成員。
2. 當任務來的時候,核心執行緒池進行消費,當任務比較多的時候,由產品經理把任務統計起來(就是任務佇列),如果產品經理髮現任務多的已經統計不過來,說明任務佇列已經滿了。
3. 在任務佇列裡面任務太多的時候,公司就會進行招聘一些外包人員,這時的執行緒就是核心執行緒以外的執行緒,並且會把它們放到一個執行緒池中,讓這些外包也來完成產生的任務,但是這時候外包人員也不能處理產生的任務
4.就會產生一種策略,其實這些策略就是異常處理,在jdk提供了四種處理異常的策略
異常策略:
- CallerRunsPolicy:只要執行緒池沒關閉,就直接用呼叫者所線上程來執行任務
- AbortPolicy:直接丟擲 RejectedExecutionException 異常
- DiscardPolicy:悄悄把任務放生,不做了
- DiscardOldestPolicy:把佇列裡待最久的那個任務扔了,然後再呼叫 execute() 試試看能行不
3. jdk提供四種執行緒池
ThreadPoolExecutor:
分析:我們可以看到jdk裡面提供的四種執行緒池裡面最後都是呼叫threadpoolexecutor這個類,但是這個類裡面有7個引數,分別
corePoolSize:核心執行緒大小(及網際網路公司的核心成員)
maximumPoolSize:最大執行緒池的大小(外包人員)
keepAliveTime:除過核心執行緒池外,其他執行緒的存活時間。(及外包人員的存活時間)
unit:建立執行緒的存活時間
workQueue:任務佇列,在jdk裡面提供四種任務佇列
- ArrayBlockingQueue:基於陣列、有界,按 FIFO(先進先出)原則對元素進行排序
- LinkedBlockingQueue:基於連結串列,按FIFO (先進先出) 排序元素
吞吐量通常要高於 ArrayBlockingQueue
Executors.newFixedThreadPool() 使用了這個佇列
- SynchronousQueue:不儲存元素的阻塞佇列
每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態
吞吐量通常要高於 LinkedBlockingQueue
Executors.newCachedThreadPool使用了這個佇列
- PriorityBlockingQueue:具有優先順序的、無限阻塞佇列
threadFactory:執行緒工廠,對執行緒進行管理
策略模式:出現異常時,使用的策略。
1. newFixedThreadPool:
分析:keepAliveTime為0,說明只有核心執行緒池,產生的核心執行緒池以外的執行緒,它們的存活時間將變為0,那麼在極端情況下,如果任務比較多的話,會很快使任務佇列已滿,可以用於負載比較重的伺服器。
2. newSingleThreadExecutor:
分析:是上面newFixThreadPool的一個特例,裡面核心執行緒只有一個,沒有其他多餘的執行緒, 用於序列執行任務的場景,每個任務必須按順序執行,不需要併發執行。
3. newCachedThreadPool:
分析:可以看出corePoolSize的大小為0,可以說全是外包,極端情況下會建立過多的執行緒,耗盡 CPU 和記憶體資源。
CachedThreadPool
用於併發執行大量短期的小任務,或者是負載較輕的伺服器。
4. newScheduledThreadPool:
分析:ScheduledThreadPoolExecutor
用於需要多個後臺執行緒執行週期任務,同時需要限制執行緒數量的場景。
4.簡單模仿執行緒池
1. 首先一定這個執行緒池的行為規範有哪些
- 執行執行緒的介面
- 得到當前任務佇列(這個任務佇列就是池子,讓生產者生產任務,讓消費者任務)裡面的任務數量
- 得到當前任務佇列裡面已經消費的任務數量
- 得到當前的工作執行緒數量
2. 定義ThreadManagePool這個類
- 首先需要實現ThreadPool的行為規範
- 需要定義消費者類,這個類主要是消費任務,但是必須把它和生產者結合起來,它必須只要什麼時候需要消費,在這裡使用一個連結串列,把兩者結合起來,消費者必須每一次檢查連結串列是不是空,如果是空的,就進行等待,直到連結串列不為空,進行消費。
- 銷燬執行緒,首先要檢查任務佇列裡面的任務是否消費完畢,如果沒有完畢進行等待處理,如果消費完畢,關閉每一個消費者,在關閉執行緒池。
public class ThreadManagePool implements ThreadPool {
//預設工作執行緒個數
private static int workerNum = 5;
//工作執行緒數量
public WorkThread[] workThreads;
//執行任務數量
private static volatile int executeTaskNumber = 0;
private static ThreadManagePool threadManagePool;
private List<Runnable> taskQueue = new LinkedList<Runnable>();
private ThreadManagePool(){
this(workerNum);
}
private ThreadManagePool(int workerNum){
if(workerNum > 0) {
ThreadManagePool.workerNum = workerNum;
}
//工作執行緒的初始化
workThreads = new WorkThread[workerNum];
for(int i = 0;i<ThreadManagePool.workerNum;i++) {
workThreads[i] = new WorkThread();
Thread thread = new Thread(workThreads[i]);
System.out.println("初始化執行緒總數" + (i + 1) + "------當前執行緒名稱是:" + thread.getName());
thread.start();
}
}
public static ThreadPool getThreadPool(){
return getTheadPool(ThreadManagePool.workerNum);
}
public static ThreadPool getTheadPool(int workNum){
if(workNum <= 0) {
workNum = ThreadManagePool.workerNum;
}
if(threadManagePool == null) {
threadManagePool = new ThreadManagePool(workNum);
}
return threadManagePool;
}
/**
* 執行緒池銷燬
*/
@Override
public void destory() {
while (!taskQueue.isEmpty()) {
try {
//不斷檢測任務佇列是否全部執行
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (int i = 0; i < workerNum; i++) {
workThreads[i].stopWorker();
workThreads[i] = null;
}
threadManagePool = null;
taskQueue.clear();
}
@Override
public void execute(Runnable runnable) {
synchronized (taskQueue) {
taskQueue.add(runnable);
//通知 啟動的執行緒
taskQueue.notifyAll();
}
}
@Override
public void execute(List<Runnable> runnables) {
synchronized (taskQueue) {
for (Runnable task : runnables) {
taskQueue.add(task);
}
taskQueue.notifyAll();
}
}
@Override
public void execute(Runnable[] runnables) {
synchronized (taskQueue) {
for (Runnable task : runnables) {
taskQueue.add(task);
}
taskQueue.notifyAll();
}
}
@Override
public int getExecuteTaskCount() {
return executeTaskNumber;
}
@Override
public int getWorkTaskCount() {
return taskQueue.size();
}
@Override
public int getWorkTheadCount() {
return workerNum;
}
@Override
public String toString() {
return "當前工作執行緒數:" + workerNum +
", 已完成任務數:" + executeTaskNumber +
", 等待任務數量:" + getWorkTaskCount();
}
private class WorkThread extends Thread{
//該工作執行緒是否有效 用於結束該工作執行緒
private boolean isRunning = true;
Runnable r = null;
@Override
public void run() {
super.run();
while(isRunning){
synchronized (taskQueue) {
while(isRunning&&taskQueue.isEmpty()){
try {
taskQueue.wait(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(!taskQueue.isEmpty()) {
r = taskQueue.remove(0);
}
if(r!=null) {
r.run();
}
executeTaskNumber++;
r = null;
}
}
}
//停止工作 讓該執行緒自然執行完run方法 自然結束
public void stopWorker() {
isRunning = false;
}
}
}
public class ThreadTest {
public static void main(String[] args) {
ThreadPool t = ThreadManagePool.getTheadPool(6);
List<Runnable> taskList = new ArrayList<Runnable>();
for (int i = 0; i < 10; i++) {
taskList.add(new task());
}
t.execute(taskList);
System.out.println(t);
t.destory();//所有執行緒執行完才destory
System.out.println(t);
}
//任務類
public static class task implements Runnable{
private static volatile int i = 1;
@Override
public void run() {
System.out.println("當前處理的執行緒是:" + Thread.currentThread().getName() + " 執行任務" + (i++) + " 完成");
}
}
}
分析:發現雖然我們初始化許多執行緒,但是隻有一個執行緒在消費佇列裡面的任務,主要是因為使用的同一把鎖,一次只能讓一個執行緒進入進行消費,是一個序列的過程,有點像jdk裡面的singleNewThreadPool這個執行緒池,還有問題就是我們解耦,通過連結串列把消費者和生成者聯絡起來,所以我們應該用一個工具來管理執行緒,這個類ThreadFactory。