1. 程式人生 > 程式設計 >Java常用執行緒池原理及使用方法解析

Java常用執行緒池原理及使用方法解析

一、簡介

什麼是執行緒池?

池的概念大家也許都有所聽聞,池就是相當於一個容器,裡面有許許多多的東西你可以即拿即用。java中有執行緒池、連線池等等。執行緒池就是在系統啟動或者例項化池時建立一些空閒的執行緒,等待工作排程,執行完任務後,執行緒並不會立即被銷燬,而是重新處於空閒狀態,等待下一次排程。

執行緒池的工作機制?

線上程池的程式設計模式中,任務提交併不是直接提交給執行緒,而是提交給池。執行緒池在拿到任務之後,就會尋找有沒有空閒的執行緒,有則分配給空閒執行緒執行,暫時沒有則會進入等待佇列,繼續等待空閒執行緒。如果超出最大接受的工作數量,則會觸發執行緒池的拒絕策略。

為什麼使用執行緒池?

執行緒的建立與銷燬需要消耗大量資源,重複的建立與銷燬明顯不必要。而且池的好處就是響應快,需要的時候自取,就不會存在等待建立的時間。執行緒池可以很好地管理系統內部的執行緒,如數量以及排程。

二、常用執行緒池介紹

Java類ExecutorService是執行緒池的父介面,並非頂層介面。以下四種常用執行緒池的型別都可以是ExecutorService。

單一執行緒池 Executors.newSingleThreadExecutor()
內部只有唯一一個執行緒進行工作排程,可以保證任務的執行順序(FIFO,LIFO)

package com.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class PoolTest {
	public static void main(String[] args) {
		// 建立單一執行緒池
		ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
		List<String> list = new ArrayList<String>();
		list.add("first");
		list.add("second");
		list.add("third");
		list.forEach(o -> {
			// 遍歷集合提交任務
			singleThreadExecutor.execute(new Runnable() {

				@Override
				public void run() {
					System.out.println(Thread.currentThread().getName() + " : " + o);
					try {
						// 間隔1s
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			});
		});
	}
}

執行結果:

pool-1-thread-1 : first

pool-1-thread-1 : second

pool-1-thread-1 : third

可快取執行緒池 Executors.newCachedThreadPool()

如果執行緒池中有可使用的執行緒,則使用,如果沒有,則在池中新建一個執行緒,可快取執行緒池中執行緒數量最大為Integer.MAX_VALUE。通常用它來執行一些執行時間短,且經常用到的任務。

package com.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class PoolTest {
	public static void main(String[] args) {
		// 建立可快取執行緒池
		ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
		List<String> list = new ArrayList<String>();
		list.add("first");
		list.add("second");
		list.add("third");
		list.forEach(o -> {

			try {
				// 間隔3s
				Thread.sleep(3000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

			// 遍歷集合提交任務
			cachedThreadPool.execute(new Runnable() {

				@Override
				public void run() {
					System.out.println(Thread.currentThread().getName() + " : " + o);
					try {
						// 間隔1s
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			});
		});
	}
}

執行結果:

pool-1-thread-1 : first

pool-1-thread-1 : second

pool-1-thread-1 : third

因為間隔時間長,下一個任務執行時,上一個任務已經完成,所以執行緒可以繼續複用,如果間隔時間調短,那麼部分執行緒將會使用新執行緒來執行。

把每個任務等待時間從3s調低至1s:

執行結果:

pool-1-thread-1 : first

pool-1-thread-2 : second

pool-1-thread-1 : third

定長執行緒池 Executors.newFixedThreadPool(int nThreads)
建立一個固定執行緒數量的執行緒池,引數手動傳入

package com.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class PoolTest {
	public static void main(String[] args) {
		// 建立可快取執行緒池
		ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
		List<String> list = new ArrayList<String>();
		list.add("first");
		list.add("second");
		list.add("third");
		list.add("fourth");
		list.forEach(o -> {

			try {
				// 間隔1s
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

			// 遍歷集合提交任務
			fixedThreadPool.execute(new Runnable() {

				@Override
				public void run() {
					System.out.println(Thread.currentThread().getName() + " : " + o);
					try {
						// 間隔1s
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			});
		});
	}
}

執行結果:

pool-1-thread-1 : first

pool-1-thread-2 : second

pool-1-thread-3 : third

pool-1-thread-1 : fourth

定時執行緒池 Executors.newScheduledThreadPool(int corePoolSize)
建立一個定長執行緒池,支援定時及週期性任務執行

package com.test;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class PoolTest {
	public static void main(String[] args) {
		// 建立定長執行緒池、支援定時、延遲、週期性執行任務
		ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
		scheduledThreadPool.scheduleAtFixedRate(new Runnable() {

			@Override
			public void run() {
				System.out.println(Thread.currentThread().getName() + " : 1秒後每隔3秒執行一次");
			}
		},1,3,TimeUnit.SECONDS);
	}
}

執行結果:

pool-1-thread-1 : 1秒後每隔3秒執行一次

pool-1-thread-1 : 1秒後每隔3秒執行一次

pool-1-thread-2 : 1秒後每隔3秒執行一次

pool-1-thread-2 : 1秒後每隔3秒執行一次

pool-1-thread-2 : 1秒後每隔3秒執行一次

pool-1-thread-2 : 1秒後每隔3秒執行一次

pool-1-thread-2 : 1秒後每隔3秒執行一次

三、自定義執行緒池

常用建構函式:

ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue)

引數說明:

1、corePoolSize 核心執行緒數大小,當執行緒數<corePoolSize ,會建立執行緒執行runnable

2、maximumPoolSize 最大執行緒數, 當執行緒數 >= corePoolSize的時候,會把runnable放入workQueue中

3、keepAliveTime 保持存活時間,當執行緒數大於corePoolSize的空閒執行緒能保持的最大時間。

4、unit 時間單位

5、workQueue 儲存任務的阻塞佇列

6、threadFactory 建立執行緒的工廠

7、handler 拒絕策略

任務執行順序:

1、當執行緒數小於corePoolSize時,建立執行緒執行任務。

2、當執行緒數大於等於corePoolSize並且workQueue沒有滿時,放入workQueue中

3、執行緒數大於等於corePoolSize並且當workQueue滿時,新任務新建執行緒執行,執行緒總數要小於maximumPoolSize

4、當執行緒總數等於maximumPoolSize並且workQueue滿了的時候執行handler的rejectedExecution。也就是拒絕策略。

ThreadPoolExecutor預設有四個拒絕策略:

1、new ThreadPoolExecutor.AbortPolicy() 直接丟擲異常RejectedExecutionException

2、new ThreadPoolExecutor.CallerRunsPolicy() 直接呼叫run方法並且阻塞執行

3、new ThreadPoolExecutor.DiscardPolicy() 直接丟棄後來的任務

4、new ThreadPoolExecutor.DiscardOldestPolicy() 丟棄在佇列中隊首的任務

緩衝佇列BlockingQueue:

BlockingQueue是雙緩衝佇列。BlockingQueue內部使用兩條佇列,允許兩個執行緒同時向佇列一個儲存,一個取出操作。在保證併發安全的同時,提高了佇列的存取效率。

常用的幾種BlockingQueue:

  • ArrayBlockingQueue(int i):規定大小的BlockingQueue,其構造必須指定大小。其所含的物件是FIFO順序排序的。
  • LinkedBlockingQueue()或者(int i):大小不固定的BlockingQueue,若其構造時指定大小,生成的BlockingQueue有大小限制,不指定大小,其大小有Integer.MAX_VALUE來決定。其所含的物件是FIFO順序排序的。
  • PriorityBlockingQueue()或者(int i):類似於LinkedBlockingQueue,但是其所含物件的排序不是FIFO,而是依據物件的自然順序或者建構函式的Comparator決定。
  • SynchronizedQueue():特殊的BlockingQueue,對其的操作必須是放和取交替完成。
package com.test;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class PoolTest {
	public static void main(String[] args) {
		// 工作佇列
		LinkedBlockingDeque<Runnable> workQueue = new LinkedBlockingDeque<Runnable>();
		// 拒絕策略
		RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,10,20,TimeUnit.MILLISECONDS,workQueue,handler);
		threadPoolExecutor.execute(new Runnable() {

			@Override
			public void run() {
				System.out.println("自定義執行緒池");
			}
		});
	}
}

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。