1. 程式人生 > >執行緒池--拒絕策略RejectedExecutionHandler 原理和實驗

執行緒池--拒絕策略RejectedExecutionHandler 原理和實驗

 

當執行緒池的任務快取佇列已滿並且執行緒池中的執行緒數目達到maximumPoolSize,如果還有任務到來就會採取任務拒絕策略,通常有以下四種策略:

ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。 預設策略

ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。 

ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程) 

ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務

下面來看幾個例子:

1. AbortPolicy策略:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.RejectedExecutionException;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

 

public class AbortPolicyDemo { 

    private static final int THREADS_SIZE = 1

    private static final int CAPACITY = 1

    public static

 void main(String[] args) throws Exception {

        // 建立執行緒池。執行緒池的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),"執行緒池"的阻塞佇列容量為1(CAPACITY)。 

        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY)); 

        // 設定執行緒池的拒絕策略為"丟擲異常" 

        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); 

        try {

            // 新建10個任務,並將它們新增到執行緒池中。 

            for (int i = 0; i < 10; i++) {

                Runnable myrun = new MyRunnable("task-"+i);

                pool.execute(myrun);

            

        }catch (RejectedExecutionException e) {

            e.printStackTrace(); 

           // 關閉執行緒池

            pool.shutdown();

        

        }

}

輸出:

task-0 is running.

java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]

at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)

at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)

at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)

at AbortPolicyDemo.main(AbortPolicyDemo.java:18)

task-1 is running.

結果分析:

先是task0佔有了唯一的執行緒執行額度並開始執行,隨後task1佔有阻塞佇列的唯一空間,隨後task2想執行,發現task0還沒跑完,執行額度又滿了,於是嘗試進入阻塞佇列,結果發現阻塞佇列被task1給佔了,無路可走的task2於是丟擲異常,關閉執行緒池。

 

2.DiscardPolicy策略:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

 

class MyRunnable implements Runnable {

    private String name;

    public MyRunnable(String name) {

        this.name = name;

    }

    @Override

    public void run() {

        try {

            System.out.println(this.name + " is running.");

            Thread.sleep(100);

        catch (Exception e) {

            e.printStackTrace();

        }

    }

}

 

public class DiscardPolicyDemo

{

    private static final int THREADS_SIZE = 1;

    private static final int CAPACITY = 1;

    public static void main(String[] args) throws Exception {

        // 建立執行緒池。執行緒池的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),"執行緒池"的阻塞佇列容量為1(CAPACITY)。

        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(CAPACITY));

        // 設定執行緒池的拒絕策略為"丟棄"

        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

        // 新建10個任務,並將它們新增到執行緒池中。

        for (int i = 0; i < 10; i++) {

            Runnable myrun = new MyRunnable("task-"+i);

            pool.execute(myrun);

        }

        // 關閉執行緒池

        pool.shutdown();

    }

}

輸出結果:

task-0 is running.

task-1 is running.

結果分析:

執行緒池的第一個引數決定可以併發幾個執行緒,這裡設為1,那麼就表示執行緒只能一個一個來,另一個引數是最後這個ArrayBlockingQueue所代表的,意思是如果新提交到執行緒池的執行緒如果當前沒有執行額度(例如上面這種一次只能執行1個執行緒,我這裡就暫稱為執行額度為1),那麼只能放到這個阻塞佇列中等待,而阻塞佇列本身也是有大小的,所以也會滿,滿了怎麼辦,就對應上面說的四種策略,所以上面的程式碼如果在33行後加一行Thread.sleep(200);則10個執行緒都能得到執行,因為上面設定了每個執行緒的執行時間為100毫秒,那麼只要等超過100毫秒的時間,執行緒就會執行完畢並釋放資源,則後序提交的執行緒就來得及進入執行緒池,否則像上面的程式碼那樣太急著提交,直接就會導致阻塞的發生,從而觸發拒絕策略。

 

3. DiscardOldestPolicy策略:

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

 

 

 

public class DiscardOldestPolicyDemo

{

    private static final int THREADS_SIZE = 1;

    private static final int CAPACITY = 1;

    public static void main(String[] args) throws Exception {

        // 建立執行緒池。執行緒池的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),"執行緒池"的阻塞佇列容量為1(CAPACITY)。

        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY));

        // 設定執行緒池的拒絕策略為"DiscardOldestPolicy"

        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

        // 新建10個任務,並將它們新增到執行緒池中。

        for (int i = 0; i < 10; i++) {

            Runnable myrun = new MyRunnable("task-"+i);

            pool.execute(myrun);

        }

        // 關閉執行緒池

        pool.shutdown(); }

}

輸出結果為:

task-0 is running.

 

task-9 is running.

結果分析:

可見1到8都被擠掉了,推演如下:先task0進入開始執行200ms,在這200ms內,發生瞭如下後序事件:task1進入阻塞佇列等待,task2想進入阻塞佇列,但是發現task1佔著茅坑,於是把它擠出去,取而代之,task3想進入阻塞佇列,又發現task2佔著茅坑,於是把它擠出去,取而代之,以此類推,最終task-9佔據了阻塞佇列的位置,並等task0完成後task9開始執行。於是乎有了上面的輸出。

 

4. CallerRunsPolicy

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

 

public class CallerRunsPolicyDemo {

    private static final int THREADS_SIZE = 1;

    private static final int CAPACITY = 1;

    public static void main(String[] args) throws Exception {

        // 建立執行緒池。執行緒池的"最大池大小"和"核心池大小"都為1(THREADS_SIZE),"執行緒池"的阻塞佇列容量為1(CAPACITY)。

        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(CAPACITY));

        // 設定執行緒池的拒絕策略為"CallerRunsPolicy"

        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        // 新建10個任務,並將它們新增到執行緒池中。

        for (int i = 0; i < 10; i++) {

            Runnable myrun = new MyRunnable("task-"+i); pool.execute(myrun);

        }

        // 關閉執行緒池

        pool.shutdown();

    }

}

結果:

task-2 is running.

task-0 is running.

task-3 is running.

task-1 is running.

task-5 is running.

task-4 is running.

task-7 is running.

task-6 is running.

task-9 is running.

 

task-8 is running.

結果分析:

首先,所有的人物都得到了執行,沒有一個被漏掉或者丟擲異常什麼的,其次,注意到順序看起來亂七八糟,但是仔細看還是亂中有序,可以這樣理解:相當於現在有三個槽,一個是執行執行緒空間,一個是阻塞佇列,一個是主執行緒,前兩個槽視為一個整體,這個整體跟主執行緒的槽對比,誰有空,執行緒就進入誰,最終可以決定執行的順序。