1. 程式人生 > 其它 >Java 四種執行緒池的使用

Java 四種執行緒池的使用

https://juejin.im/post/59df0c1af265da432f301c8d

https://www.cnblogs.com/java-cxh/p/13431789.html

1,執行緒池的作用
執行緒池作用就是限制系統中執行執行緒的數量。
根據系統的環境情況,可以自動或手動設定執行緒數量,達到執行的最佳效果。
少了浪費了系統資源,多了造成系統擁擠效率不高。
用執行緒池控制執行緒數量,其他執行緒排 隊等候。
一個任務執行完畢,再從佇列的中取最前面的任務開始執行。
若佇列中沒有等待程序,執行緒池的這一資源處於等待。
當一個新任務需要執行時,如果執行緒池 中有等待的工作執行緒,就可以開始運行了;否則進入等待佇列。
2,為什麼要用執行緒池?
1.減少了建立和銷燬執行緒的次數,每個工作執行緒都可以被重複利用,可執行多個任務。
2.可以根據系統的承受能力,調整執行緒池中工作線執行緒的數目,防止因為消耗過多的記憶體,而把伺服器累趴下(每個執行緒需要大約1MB記憶體,執行緒開的越多,消耗的記憶體也就越大,最後宕機)。
Java裡面執行緒池的頂級介面是Executor,但是嚴格意義上講Executor並不是一個執行緒池,而只是一個執行執行緒的工具。真正的執行緒池介面是ExecutorService。
3,比較重要的幾個類


描述

ExecutorService
真正的執行緒池介面。

ScheduledExecutorService
能和Timer/TimerTask類似,解決那些需要任務重複執行的問題。

ThreadPoolExecutor
ExecutorService的預設實現。

ScheduledThreadPoolExecutor
繼承ThreadPoolExecutor的ScheduledExecutorService介面實現,週期性任務排程的類實現。

4,new Thread的弊端

  1. public class TestNewThread {
  2. public static void main(String[] args) {
  3. new Thread(new Runnable() {
  4. @Override
  5. public void run() {
  6. System.out.println("start");
  7. }
  8. }).start();
  9. }
  10. }

執行一個非同步任務你還只是如下new Thread嗎?
那你就out太多了,new Thread的弊端如下:
1.每次new Thread新建物件效能差。
2.執行緒缺乏統一管理,可能無限制新建執行緒,相互之間競爭,及可能佔用過多系統資源導致宕機或oom。
3.缺乏更多功能,如定時執行、定期執行、執行緒中斷。
相比new Thread,Java提供的四種執行緒池的好處在於:
1.重用存在的執行緒,減少物件建立、消亡的開銷,效能佳。
2.可有效控制最大併發執行緒數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。
3.提供定時執行、定期執行、單執行緒、併發數控制等功能。

四種執行緒池
Java通過Executors提供四種執行緒池,分別為:
1,newCachedThreadPoo
建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。
2,newFixedThreadPool
建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
3,newScheduledThreadPool
建立一個定長執行緒池,支援定時及週期性任務執行。
4,newSingleThreadExecutor
建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。

原始碼分析newCachedThreadPool

這是一個可快取執行緒池,可以靈活的回收空閒執行緒,無可回收執行緒時,新建執行緒
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}碼可以看出底層呼叫的是ThreadPoolExecutor方法,傳入一個同步的阻塞佇列實現

  1. ThreadPoolExecupublic ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue) {
  6. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  7. Executors.defaultThreadFactory(), defaultHandler);
  8. }

通過原始碼可以看出,我們可以傳入執行緒池的核心執行緒數(最小執行緒數),最大執行緒數量,保持時間,時間單位,阻塞佇列這些引數,最大執行緒數設定為jvm可用的cpu數量為最佳實踐

newWorkStealingPool
建立持有足夠執行緒的執行緒池來並行,通過使用多個佇列減少競爭,不傳引數,則預設設定為cpu的數量
原始碼:

  1. public static ExecutorService newWorkStealingPool() {
  2. return new ForkJoinPool
  3. (Runtime.getRuntime().availableProcessors(),
  4. ForkJoinPool.defaultForkJoinWorkerThreadFactory,
  5. null, true);
  6. }

通過原始碼可以看出底層呼叫的是ForkJoinPool執行緒池

下面說一下ForkJoinPool

  1. public ForkJoinPool(int parallelism,
  2. ForkJoinWorkerThreadFactory factory,
  3. UncaughtExceptionHandler handler,
  4. boolean asyncMode) {
  5. this(checkParallelism(parallelism),
  6. checkFactory(factory),
  7. handler,
  8. asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
  9. "ForkJoinPool-" + nextPoolId() + "-worker-");
  10. checkPermission();
  11. }

使用一個無限佇列來儲存需要執行的任務,可以傳入執行緒的數量,不傳入,則預設使用當前計算機中可用的cpu數量,使用分治法來解決問題,使用fork()和join()來進行呼叫

newSingleThreadExecutor
建立一個單執行緒化的執行緒池,保證所有任務按照指定的順序執行(FIFO,LIFO,優先順序),當要求程序限制時,可以進行使用

原始碼:

  1. public static ExecutorService newSingleThreadExecutor() {
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L, TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }

newFixedThreadPool
建立一個固定執行緒數量,可重用的執行緒池

原始碼:

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

newScheduledThreadPool
建立一個可定期或者延時執行任務的執行緒池

原始碼:

return new ScheduledThreadPoolExecutor(corePoolSize);
1
通過原始碼可以看出底層呼叫的是一個ScheduledThreadPoolExecutor,然後傳入執行緒數量

下面來介紹一下ScheduledThreadPoolExecutor

  1. public ScheduledThreadPoolExecutor(int corePoolSize) {
  2. super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  3. new DelayedWorkQueue());
  4. }

通過原始碼可以看出底層呼叫了ThreadPoolExecutor,維護了一個延遲佇列,可以傳入執行緒數量,傳入延時的時間等引數,下面給出一個demo

  1. public static void main(String[] args) {
  2. ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
  3. for (int i = 0; i < 15; i = i + 5) {
  4. pool.schedule(() -> System.out.println("我被執行了,當前時間" + new Date()), i, TimeUnit.SECONDS);
  5. }
  6. pool.shutdown();
  7. }

執行結果

我被執行了,當前時間Fri Jan 12 11:20:41 CST 2018
我被執行了,當前時間Fri Jan 12 11:20:46 CST 2018
我被執行了,當前時間Fri Jan 12 11:20:51 CST 2018

有的小夥伴可能會用疑問,為什麼使用schedule()而不使用submit()或者execute()呢,下面通過原始碼來分析

  1. public void execute(Runnable command) {
  2. schedule(command, 0, NANOSECONDS);
  3. }
  4. public Future<?> submit(Runnable task) {
  5. return schedule(task, 0, NANOSECONDS);
  6. }

通過原始碼可以發現這兩個方法都是呼叫的schedule(),而且將延時時間設定為了0,所以想要實現延時操作,需要直接呼叫schedule()

下面我們再來分析一下submit()和execute()的以及shutdown()和shutdownNow()的區別

submit(),提交一個執行緒任務,可以接受回撥函式的返回值嗎,適用於需要處理返回著或者異常的業務場景
execute(),執行一個任務,沒有返回值
shutdown(),表示不再接受新任務,但不會強行終止已經提交或者正在執行中的任務
shutdownNow(),對於尚未執行的任務全部取消,正在執行的任務全部發出interrupt(),停止執行
五種執行緒池的適應場景
newCachedThreadPool:用來建立一個可以無限擴大的執行緒池,適用於伺服器負載較輕,執行很多短期非同步任務。
newFixedThreadPool:建立一個固定大小的執行緒池,因為採用無界的阻塞佇列,所以實際執行緒數量永遠不會變化,適用於可以預測執行緒數量的業務中,或者伺服器負載較重,對當前執行緒數量進行限制。
newSingleThreadExecutor:建立一個單執行緒的執行緒池,適用於需要保證順序執行各個任務,並且在任意時間點,不會有多個執行緒是活動的場景。
newScheduledThreadPool:可以延時啟動,定時啟動的執行緒池,適用於需要多個後臺執行緒執行週期任務的場景。
newWorkStealingPool:建立一個擁有多個任務佇列的執行緒池,可以減少連線數,建立當前可用cpu數量的執行緒來並行執行,適用於大耗時的操作,可以並行來執行

RejectedExecutionHandler 執行緒池四種拒絕任務策略

《Java執行緒池》:任務拒絕策略
在沒有分析執行緒池原理之前先來分析下為什麼有任務拒絕的情況發生。

這裡先假設一個前提:執行緒池有一個任務佇列,用於快取所有待處理的任務,正在處理的任務將從任務佇列中移除。因此在任務佇列長度有限的情況下就會出現新任務的拒絕處理問題,需要有一種策略來處理應該加入任務佇列卻因為佇列已滿無法加入的情況。另外線上程池關閉的時候也需要對任務加入佇列操作進行額外的協調處理。

RejectedExecutionHandler提供了四種方式來處理任務拒絕策略

1、直接丟棄(DiscardPolicy)

2、丟棄佇列中最老的任務(DiscardOldestPolicy)。

3、拋異常(AbortPolicy)

4、將任務分給呼叫執行緒來執行(CallerRunsPolicy)。

這四種策略是獨立無關的,是對任務拒絕處理的四中表現形式。最簡單的方式就是直接丟棄任務。但是卻有兩種方式,到底是該丟棄哪一個任務,比如可以丟棄當前將要加入佇列的任務本身(DiscardPolicy)或者丟棄任務佇列中最舊任務(DiscardOldestPolicy)。丟棄最舊任務也不是簡單的丟棄最舊的任務,而是有一些額外的處理。除了丟棄任務還可以直接丟擲一個異常(RejectedExecutionException),這是比較簡單的方式。丟擲異常的方式(AbortPolicy)儘管實現方式比較簡單,但是由於丟擲一個RuntimeException,因此會中斷呼叫者的處理過程。除了丟擲異常以外還可以不進入執行緒池執行,在這種方式(CallerRunsPolicy)中任務將有呼叫者執行緒去執行。

示例
1,newCachedThreadPool
建立一個可快取的執行緒池。如果執行緒池的大小超過了處理任務所需要的執行緒, 那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小。

  1. package io.ymq.thread.demo1;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. /**
  5. * 描述: 建立一個可快取的執行緒池。如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。
  6. * 此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小。
  7. *
  8. * @author yanpenglei
  9. * @create 2017-10-12 11:13
  10. **/
  11. public class TestNewCachedThreadPool {
  12. public static void main(String[] args) {
  13. ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
  14. for (int i = 1; i <= 10; i++) {
  15. final int index = i;
  16. try {
  17. Thread.sleep(index * 1000);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. cachedThreadPool.execute(new Runnable() {
  22. @Override
  23. public void run() {
  24. String threadName = Thread.currentThread().getName();
  25. System.out.println("執行:" + index + ",執行緒名稱:" + threadName);
  26. }
  27. });
  28. }
  29. }
  30. }

響應:
執行:1,執行緒名稱:pool-1-thread-1
執行:2,執行緒名稱:pool-1-thread-1
執行:3,執行緒名稱:pool-1-thread-1
執行:4,執行緒名稱:pool-1-thread-1
執行:5,執行緒名稱:pool-1-thread-1
執行:6,執行緒名稱:pool-1-thread-1
執行:7,執行緒名稱:pool-1-thread-1
執行:8,執行緒名稱:pool-1-thread-1
執行:9,執行緒名稱:pool-1-thread-1
執行:10,執行緒名稱:pool-1-thread-1
2,newFixedThreadPool
描述:建立固定大小的執行緒池。每次提交一個任務就建立一個執行緒,直到執行緒達到執行緒池的最大大小。
執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。

  1. package io.ymq.thread.demo2;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. /**
  5. * 描述:建立固定大小的執行緒池。每次提交一個任務就建立一個執行緒,直到執行緒達到執行緒池的最大大小。
  6. * 執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。
  7. *
  8. * @author yanpenglei
  9. * @create 2017-10-12 11:30
  10. **/
  11. public class TestNewFixedThreadPool {
  12. public static void main(String[] args) {
  13. ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
  14. for (int i = 1; i <= 10; i++) {
  15. final int index = i;
  16. fixedThreadPool.execute(new Runnable() {
  17. @Override
  18. public void run() {
  19. try {
  20. String threadName = Thread.currentThread().getName();
  21. System.out.println("執行:" + index + ",執行緒名稱:" + threadName);
  22. Thread.sleep(2000);
  23. } catch (InterruptedException e) {
  24. e.printStackTrace();
  25. }
  26. }
  27. });
  28. }
  29. }
  30. }

因為執行緒池大小為3,每個任務輸出index後sleep 2秒,所以每兩秒列印3個數字,和執行緒名稱。
響應:
執行:2,執行緒名稱:pool-1-thread-2
執行:3,執行緒名稱:pool-1-thread-3
執行:1,執行緒名稱:pool-1-thread-1

執行:4,執行緒名稱:pool-1-thread-1
執行:6,執行緒名稱:pool-1-thread-2
執行:5,執行緒名稱:pool-1-thread-3

執行:7,執行緒名稱:pool-1-thread-1
執行:9,執行緒名稱:pool-1-thread-3
執行:8,執行緒名稱:pool-1-thread-2

執行:10,執行緒名稱:pool-1-thread-1

3,newScheduledThreadPool
建立一個定長執行緒池,支援定時及週期性任務執行。延遲執行

  1. package io.ymq.thread.demo3;
  2. import java.util.concurrent.Executors;
  3. import java.util.concurrent.ScheduledExecutorService;
  4. import java.util.concurrent.TimeUnit;
  5. /**
  6. * 描述:建立一個定長執行緒池,支援定時及週期性任務執行。延遲執行
  7. *
  8. * @author yanpenglei
  9. * @create 2017-10-12 11:53
  10. **/
  11. public class TestNewScheduledThreadPool {
  12. public static void main(String[] args) {
  13. ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
  14. scheduledThreadPool.schedule(new Runnable() {
  15. @Override
  16. public void run() {
  17. System.out.println("表示延遲3秒執行。");
  18. }
  19. }, 3, TimeUnit.SECONDS);
  20. scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
  21. @Override
  22. public void run() {
  23. System.out.println("表示延遲1秒後每3秒執行一次。");
  24. }
  25. }, 1, 3, TimeUnit.SECONDS);
  26. }
  27. }

表示延遲1秒後每3秒執行一次。
表示延遲3秒執行。
表示延遲1秒後每3秒執行一次。
表示延遲1秒後每3秒執行一次。
表示延遲1秒後每3秒執行一次。
表示延遲1秒後每3秒執行一次。
4,newSingleThreadExecutor
建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。

  1. package io.ymq.thread.demo4;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. /**
  5. * 描述:建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。
  6. *
  7. * @author yanpenglei
  8. * @create 2017-10-12 12:05
  9. **/
  10. public class TestNewSingleThreadExecutor {
  11. public static void main(String[] args) {
  12. ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
  13. for (int i = 1; i <= 10; i++) {
  14. final int index = i;
  15. singleThreadExecutor.execute(new Runnable() {
  16. @Override
  17. public void run() {
  18. try {
  19. String threadName = Thread.currentThread().getName();
  20. System.out.println("執行:" + index + ",執行緒名稱:" + threadName);
  21. Thread.sleep(2000);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. });
  27. }
  28. }
  29. }

結果依次輸出,相當於順序執行各個任務。
響應:
執行:1,執行緒名稱:pool-1-thread-1
執行:2,執行緒名稱:pool-1-thread-1
執行:3,執行緒名稱:pool-1-thread-1
執行:4,執行緒名稱:pool-1-thread-1
執行:5,執行緒名稱:pool-1-thread-1
執行:6,執行緒名稱:pool-1-thread-1
執行:7,執行緒名稱:pool-1-thread-1
執行:8,執行緒名稱:pool-1-thread-1
執行:9,執行緒名稱:pool-1-thread-1
執行:10,執行緒名稱:pool-1-thread-1

Future和FutureTask的使用

Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。必要時可以通過get方法獲取執行結果,該方法會阻塞直到任務返回結果。

但是Future只是一個介面,所以是無法直接用來建立物件使用的,因此就有了FutureTask。

 1.使用Callable+Future獲取執行結果

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

publicclassTest {

publicstaticvoidmain(String[] args) {

ExecutorService executor = Executors.newCachedThreadPool();

Task task =newTask();

Future<Integer> result = executor.submit(task);

executor.shutdown();

try{

Thread.sleep(1000);

}catch(InterruptedException e1) {

e1.printStackTrace();

}

System.out.println("主執行緒在執行任務");

try{

System.out.println("task執行結果"+result.get());

}catch(InterruptedException e) {

e.printStackTrace();

}catch(ExecutionException e) {

e.printStackTrace();

}

System.out.println("所有任務執行完畢");

}

}

classTaskimplementsCallable<Integer>{

@Override

publicInteger call()throwsException {

System.out.println("子執行緒在進行計算");

Thread.sleep(3000);

intsum =0;

for(inti=0;i<100;i++)

sum += i;

returnsum;

}

}

  執行結果:

  1. 子執行緒在進行計算
  2. 主執行緒在執行任務
  3. task執行結果4950
  4. 所有任務執行完畢

  2.使用Callable+FutureTask獲取執行結果

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

publicclassTest {

publicstaticvoidmain(String[] args) {

//第一種方式

ExecutorService executor = Executors.newCachedThreadPool();

Task task =newTask();

FutureTask<Integer> futureTask =newFutureTask<Integer>(task);

executor.submit(futureTask);

executor.shutdown();

//第二種方式,注意這種方式和第一種方式效果是類似的,只不過一個使用的是ExecutorService,一個使用的是Thread

/*Task task = new Task();

FutureTask<Integer> futureTask = new FutureTask<Integer>(task);

Thread thread = new Thread(futureTask);

thread.start();*/

try{

Thread.sleep(1000);

}catch(InterruptedException e1) {

e1.printStackTrace();

}

System.out.println("主執行緒在執行任務");

try{

System.out.println("task執行結果"+futureTask.get());

}catch(InterruptedException e) {

e.printStackTrace();

}catch(ExecutionException e) {

e.printStackTrace();

}

System.out.println("所有任務執行完畢");

}

}

classTaskimplementsCallable<Integer>{

@Override

publicInteger call()throwsException {

System.out.println("子執行緒在進行計算");

Thread.sleep(3000);

intsum =0;

for(inti=0;i<100;i++)

sum += i;

returnsum;

}

}