1. 程式人生 > 程式設計 >《Java7併發程式設計實戰手冊》學習筆記(八)——定製併發類

《Java7併發程式設計實戰手冊》學習筆記(八)——定製併發類

此篇部落格為個人學習筆記,如有錯誤歡迎大家指正

本次內容

  1. 定製ThreadPoolExecutor類
  2. 實現基於優先順序的Executor類
  3. 實現ThreadFactory介面生成定製執行緒
  4. 在Executor物件中使用ThreadFactory
  5. 定製執行在定時執行緒池中的任務
  6. 通過ForkJoinWorkerThreadFactory介面為Fork/Join框架生成定製執行緒
  7. 定製執行在Fork/Join框架中的任務
  8. 實現定製Lock類

Java的併發API已經提供了大量的介面和類來幫助我們編寫併發程式,但有時這些類仍無法滿足我們的需求。這時我們就可以定製屬於自己的併發類,通常來講我們可以通過繼承已有的併發類並對某些方法進行修改、拓展即可達到這一目的

1.定製ThreadPoolExecutor類

我們可以通過繼承ThreadPoolExecutor類並覆蓋父類的某些方法來定製我們自己的執行器

範例實現

MyExecutor(定製的執行器):

package day08.code_01;

import java.util.Date;
import java.util.List;
import java.util.concurrent.*;

public class MyExecutor extends ThreadPoolExecutor {

    //儲存任務開始時間的map
    private ConcurrentHashMap<String,Date> startTime;

    //覆蓋構造方法
public MyExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) { super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue); startTime = new ConcurrentHashMap<>(); } @Override public void shutdown
()
{ //在shutdown方法中輸出執行緒池相關資訊 System.out.printf("MyExecutor: Going to shutdown\n"); //執行完畢的任務數量 System.out.printf("MyExecutor: Executed tasks: %d\n",getCompletedTaskCount()); //正在執行的任務數量 System.out.printf("MyExecutor: Running tasks: %d\n",getActiveCount()); //等待執行的任務數量 System.out.printf("MyExecutor: Pending tasks: %d\n",getQueue().size()); //呼叫父類的shutdown方法 super.shutdown(); } @Override public List<Runnable> shutdownNow() { //在shutdownNow方法中輸出執行緒池相關資訊 System.out.printf("MyExecutor: Going to immediately shutdown\n"); //執行完畢的任務數量 System.out.printf("MyExecutor: Executed tasks: %d\n",getQueue().size()); //呼叫父類的shutdownNow方法 return super.shutdownNow(); } @Override protected void beforeExecute(Thread t,Runnable r) { //在任務開始執行前列印執行緒名稱和任務的雜湊碼 System.out.printf("MyExecutor: A task is beginning: %s: %s\n",t.getName(),r.hashCode()); //以任務雜湊碼為鍵,日期為值,裝入map中 startTime.put(String.valueOf(r.hashCode()),new Date()); //呼叫父類的beforeExecute方法 super.beforeExecute(t,r); } @Override protected void afterExecute(Runnable r,Throwable t) { //對任務進行型別強轉 Future<?> result = (Future<?>) r; try { //列印任務結束提示語 System.out.println("*****************************"); System.out.println("MyExecutor: A task is finishing"); //列印結果 System.out.printf("MyExecutor: Result: %s\n",result.get()); //計算執行所花費的時間 Date startDate = startTime.remove(String.valueOf(r.hashCode())); Date finishDate = new Date(); long diff = finishDate.getTime() - startDate.getTime(); //列印執行所花費的時間 System.out.printf("MyExecutor: Duration: %d\n",diff); System.out.println("*****************************"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } //呼叫父類方法 super.afterExecute(r,t); } } 複製程式碼

任務類:

package day08.code_01;

import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

public class SleepTwoSecondsTask implements Callable<String> {
    @Override
    public String call() throws Exception {
        //休眠2秒
        TimeUnit.SECONDS.sleep(2);
        //返回時間字串
        return new Date().toString();
    }
}
複製程式碼

main方法:

package day08.code_01;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) {
        //建立自定義的ThreadPoolExecutor物件
        MyExecutor myExecutor = new MyExecutor
                (2,4,1000,TimeUnit.MILLISECONDS,new LinkedBlockingDeque<>());
        //建立裝載Future物件的集合
        ArrayList<Future<String>> results = new ArrayList<>();
        //傳送十個任務
        for (int i = 0; i < 10; i++) {
            //建立任務
            SleepTwoSecondsTask task = new SleepTwoSecondsTask();
            //將任務傳送給執行器
            Future<String> result = myExecutor.submit(task);
            //將得到的Future物件裝入集合
            results.add(result);
        }
        //嘗試獲取前5個任務的結果
        for (int i = 0; i < 5; i++) {
            try {
                //得到任務執行結束後返回的結果
                String result = results.get(i).get();
                //列印任務編號及結果
                System.out.printf("Main: Result for Task %d : %s\n",i,result);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        //關閉執行器
        myExecutor.shutdown();
        //嘗試獲取後5個任務的結果
        for (int i = 5; i < 10; i++) {
            try {
                //得到任務執行結束後返回的結果
                String result = results.get(i).get();
                //列印任務編號及結果
                System.out.printf("Main: Result for Task %d : %s\n",result);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        //等待執行器關閉
        try {
            myExecutor.awaitTermination(1,TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //列印程式結束提示語
        System.out.printf("Main: End of the program\n");
    }
}
複製程式碼

2.實現基於優先順序的Executor類

執行器內部使用一個阻塞佇列來裝載等待執行的任務,我們可以通過ThreadPoolExecutor類的建構函式傳入一個實現了BlockingQueue<E>介面的物件引用。Java為我們提供了具有不同特點的阻塞佇列實現類,例如我們之前使用過的優先順序佇列PriorityBlockingQueue類。將此類的物件作為執行器中裝載任務的阻塞佇列可以實現按照優先順序執行任務的效果,需要注意的是,在這種情況下我們的任務類不止要實現Runnable介面還需要實現Comparable介面,具體原因已經在day07中做過記錄,就不在此贅述了

範例實現

任務類:

package day08.code_02;

import java.util.concurrent.TimeUnit;

public class MyPriorityTask implements Runnable,Comparable<MyPriorityTask> {

    //優先順序
    private int priority;

    //任務名稱
    private String name;

    public MyPriorityTask(String name,int priority) {
        this.priority = priority;
        this.name = name;
    }

    public int getPriority() {
        return priority;
    }

    @Override
    public int compareTo(MyPriorityTask o) {
        //如果優先順序較高,排在佇列靠前位置
        if (this.getPriority() > o.getPriority()) {
            return -1;
            //優先順序較低排在靠後位置
        } else if (this.getPriority() < o.getPriority()) {
            return 1;
        }
        //優先順序相同則沒有明確順序
        return 0;
    }

    @Override
    public void run() {
        //列印任務名稱和優先順序
        System.out.printf("MyPriorityTask: %s Priority : %d\n",name,priority);
        //休眠兩秒
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
複製程式碼

main方法:

package day08.code_02;

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) {
        //建立執行器,任務佇列使用優先順序佇列
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,2,1,TimeUnit.SECONDS,new PriorityBlockingQueue<Runnable>());
        //建立四個任務
        for (int i = 0; i < 4; i++) {
            //通過構造方法設定任務名稱和優先順序
            MyPriorityTask task = new MyPriorityTask("Task" + i,i);
            //將任務傳送到執行器
            executor.execute(task);
        }
        //休眠1秒
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //再次建立四個任務
        for (int i = 4; i < 8; i++) {
            //通過構造方法設定任務名稱和優先順序
            MyPriorityTask task = new MyPriorityTask("Task" + i,i);
            //將任務傳送到執行器
            executor.execute(task);
        }
        //關閉執行器
        executor.shutdown();
        //等待執行器將所有任務執行完畢
        try {
            executor.awaitTermination(1,TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //列印程式結束資訊
        System.out.printf("Main: End of the program\n");
    }
}
複製程式碼

3.實現ThreadFactory介面生成定製執行緒

我們可以通過實現ThreadFactory介面來定製特殊的執行緒工廠類,使用執行緒工廠建立執行緒物件較為簡單,並且可以執行緒建立執行緒的數量。當然,如果我們僅僅是把定製的執行緒工廠類作為一個獨立的類來使用,那麼完全可以不實現ThreadFactory介面;但是如果打算與其他併發API組合使用,例如將執行緒工廠的引用作為一個引數傳入其他方法中,那麼就必須實現這一介面。另外,我們也可以使用Executors類的defaultThreadFactory()方法獲取到一個最基本的執行緒工廠,這個工廠會生成同屬於一個執行緒組的基本執行緒物件

範例實現

在這個範例中,我們將使用定製執行緒工廠去建立定製執行緒
定製執行緒工廠類:

package day08.code_03;

import java.util.concurrent.ThreadFactory;

public class MyThreadFactory implements ThreadFactory {

    //計數器
    private int counter;

    //名稱字首
    private String prefix;

    public MyThreadFactory(String prefix) {
        this.prefix = prefix;
        counter = 1;
    }

    @Override
    public Thread newThread(Runnable r) {
        //建立執行緒,名字為字首加計數器數字
        MyThread myThread = new MyThread(r,prefix + "-" + counter);
        //計數器自增
        counter++;
        //返回建立好的執行緒
        return myThread;
    }
}
複製程式碼

定製執行緒類:

package day08.code_03;

import java.util.Date;

public class MyThread extends Thread {

    //執行緒建立時間
    private Date creationDate;
    //執行緒開始執行時間
    private Date startDate;
    //執行緒執行結束時間
    private Date finishDate;

    //重寫建構函式
    public MyThread(Runnable target,String name) {
        super(target,name);
        setCreationDate();
    }

    @Override
    public void run() {
        //設定開始時間
        setStartDate();
        //執行任務
        super.run();
        //設定結束時間
        setFinishDate();
    }

    //設定執行緒被建立的時間
    public void setCreationDate() {
        creationDate = new Date();
    }

    //設定執行緒開始執行的時間
    public void setStartDate() {
        startDate = new Date();
    }

    //設定執行緒執行結束的時間
    public void setFinishDate() {
        finishDate = new Date();
    }

    //獲取執行緒執行任務所消耗的時間
    public long getExecutionTime() {
        return finishDate.getTime() - startDate.getTime();
    }

    //重寫toString方法
    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder();
        //執行緒名稱
        builder.append(getName());
        builder.append(" : ");
        //建立時間
        builder.append("Creation Date: ");
        builder.append(creationDate);
        //執行時長
        builder.append(" Running time: ");
        builder.append(getExecutionTime());
        builder.append(" Milliseconds");
        return builder.toString();
    }
}
複製程式碼

任務類:

package day08.code_03;

import java.util.concurrent.TimeUnit;

public class MyTask implements Runnable {
    @Override
    public void run() {
        //休眠2秒
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
複製程式碼

main方法:

package day08.code_03;

public class Main {

    public static void main(String[] args) {
        //建立定製的執行緒工廠
        MyThreadFactory myTactory = new MyThreadFactory("MyThreadFactory");
        //建立任務
        MyTask myTask = new MyTask();
        //建立定製的執行緒物件
        Thread thread = myTactory.newThread(myTask);
        //開啟執行緒
        thread.start();
        //等待執行緒執行結束
        try {
            thread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //列印執行緒執行結束資訊
        System.out.printf("Main: Thread information\n");
        System.out.printf("%s\n",thread);
        System.out.printf("Main: End of the example\n");
    }

}
複製程式碼

4.在Executor物件中使用ThreadFactory

在第三小節中,我們編寫了自己的執行緒工廠類和執行緒類。因為我們實現了ThreadFactory介面,因此在建立執行器時,可以將定製執行緒工廠物件作為引數傳入。這樣一來執行器在建立執行緒時便會使用我們自定義的執行緒工廠

範例實現

在這個範例中,使用了第三小節的MyThreadMyThreadFactoryMyTask類,程式碼是完全一樣的,這裡就只給出main方法。
main方法:

package day08.code_04;

import day08.code_03.MyTask;
import day08.code_03.MyThreadFactory;

import java.util.concurrent.*;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        //建立定製工廠物件
        MyThreadFactory myTactory = new MyThreadFactory("MyThreadFactory");
        //建立執行器並將定製工廠物件作為引數傳入
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(myTactory);
        //建立任務
        MyTask myTask = new MyTask();
        //提交任務
        executor.submit(myTask);
        //關閉執行器
        executor.shutdown();
        //等待執行器將所有任務執行完畢
        executor.awaitTermination(1,TimeUnit.DAYS);
        //列印程式結束資訊
        System.out.printf("Main: End of the program\n");

    }
}
複製程式碼

5.定製執行在定時執行緒池中的任務

定時執行緒池(Scheduled Thread Pool)可以執行延遲任務和週期性任務。其中延遲任務可以執行實現了CallableRunnable介面的物件,但是週期性任務只能執行實現了Runnable介面的。另外,儘管我們向定時執行緒池傳送的任務物件均是實現了CallableRunnable介面的,但實際上任務想要在定時執行緒池中執行,就必須實現RunnableScheduledFuture介面,只不過這一工作由執行緒池內部的方法幫助我們完成了。在完成以下範例前,我們需要對定時執行緒池的機制有一定的瞭解,否則在定製類中重寫方法時會無從下手

我們先來大概分析一下ScheduledThreadPoolExecutor類的部分原始碼以及執行流程:
ScheduledThreadPoolExecutor類中有兩個內部類,其中一個是DelayedWorkQueue類,它是一個裝載任務的佇列,每當有任務準備裝入佇列時,任務的compareTo方法會被呼叫以此來決定任務在佇列中所處的位置;另一個是ScheduledFutureTask類,它實現了RunnableScheduledFuture介面並繼承了FutureTask類。此內部類有如下幾個變數和方法和此範例有較重要的關聯:

  • private final long period;:變數period用於儲存當前任務的執行週期
  • RunnableScheduledFuture<V> outerTask = this;:變數outerTask用於儲存需要在下一次放入任務佇列中的任務,預設指向當前物件
  • private long time;:變數time表示週期任務下一次執行的時間(納秒)
  • getDelay(TimeUnit unit):此方法會根據time變數和當前時間的納秒值來返回以給定時間引數為單位的距離任務下一次開始執行的時間,原始碼如下,其中now()方法返回了當前時間的納秒值
        public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(),NANOSECONDS);
        }
    複製程式碼
  • compareTo(Delayed other):此方法在任務被裝入任務佇列中會被呼叫。等待佇列實際上是使用陣列維護的最小堆,待進入佇列的元素會和佇列中的元素根據下一次任務開始執行的時間進行比較,時間較短的將排在佇列較前方。通過原始碼我們可以看出,內部類的compareTo方法會先判斷傳入的元素是否為當前物件自身,如果是則不進行排序;再判斷傳入元素是否為內部類物件,如果是則根據time變數進行比較。如果前兩個判斷均不生效則意味著定時執行緒池中的任務類是使用者的定製任務類,這種情況下呼叫傳入引數自身的compareTo方法
    public int compareTo(Delayed other) {
        //判斷是否是自身
        if (other == this) // compare zero if same object
            return 0;
        //判斷是否為ScheduledFutureTask類
        if (other instanceof ScheduledFutureTask) {
            ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
            //根據內部類的成員變數進行比較
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
        return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }
    複製程式碼
  • run():在這個run方法中,值得注意的是在執行完FutureTask的run方法後,我們需要重新設定任務的執行時間,這一操作在這裡等價於更新ScheduledFutureTask類的成員變數time。另外,由於是週期任務,我們必須將任務重新放入佇列中
    public void run() {
        //判斷當前任務是否是週期任務
        boolean periodic = isPeriodic();
        //判斷當前狀態是否可以執行
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        //如果不是週期任務
        else if (!periodic)
            //呼叫FutureTask的run方法執行Runnable或Callable例項的run方法
            ScheduledFutureTask.super.run();
            //否則呼叫runAndReset方法執行並初始化狀態
        else if (ScheduledFutureTask.super.runAndReset()) {
            //設定週期任務下一次的執行時間
            setNextRunTime();
            //將當前任務重新放入佇列中並開啟執行緒執行
            reExecutePeriodic(outerTask);
        }
    }
    複製程式碼

接下來是ScheduledThreadPoolExecutor類,其父類是ThreadPoolExecutor。通過檢視原始碼我們可以發現ScheduledThreadPoolExecutor類的構造方法通過super呼叫ThreadPoolExecutor類的構造方法,傳入一個內部類DelayedWorkQueue物件作為執行緒池的任務佇列,原來定時執行緒池底層使用的仍然是ThreadPoolExecutor類。定時執行緒池主要有如下方法:

  • scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit):此方法在這個範例中需要重寫。通過原始碼我們可以看到,在進行了兩次校驗後,在方法中建立了一個ScheduledFutureTask物件,並將其作為decorateTask這一方法的引數。decorateTask方法是我們線上程池中使用定製任務類的關鍵,我們需要重寫此方法使其返回一個定製任務類例項。
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        //建立內部類物件
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay,unit),unit.toNanos(period));
        //此方法需要重寫,預設是返回sft
        RunnableScheduledFuture<Void> t = decorateTask(command,sft);
        //儲存任務,方便下一次向佇列中提交
        sft.outerTask = t;
        //向佇列中新增任務並執行
        delayedExecute(t);
        return t;
    }
    複製程式碼
  • decorateTask(Runnable runnable,RunnableScheduledFuture<V> task):此方法是留給使用者進行拓展的,在這裡只是返回了傳入的內部類物件,並沒有實現什麼功能,我們可以重寫這個方法,使其返回我們的定製任務類例項
    protected <V> RunnableScheduledFuture<V> decorateTask(
        Runnable runnable,RunnableScheduledFuture<V> task) {
        return task;
    }
    複製程式碼
  • delayedExecute(RunnableScheduledFuture<?> task):向佇列中延遲提交任務,super.getQueue()方法在這裡得到的是上面記錄過的內部佇列類
    private void delayedExecute(RunnableScheduledFuture<?> task) {
        //校驗
        if (isShutdown())
            reject(task);
        else {
            //呼叫父類方法向佇列中新增任務
            super.getQueue().add(task);
            //校驗
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                //啟動一個執行緒去等待任務
                ensurePrestart();
        }
    }
    複製程式碼

範例實現

在這個範例中,我們將定製一個類並使其在定時執行緒池中執行。我們需要做一系列的工作,例如實現介面、繼承現有類、重寫父類方法。

首先我們需要定義一個類並實現RunnableScheduledFuture介面來作為在定時執行緒池中執行的定製任務類,先檢視此介面的繼承關係。

通過上面這張圖我們可以發現RunnableScheduledFuture介面又繼承了多個介面,如果直接實現介面就需要重寫大量的方法。然而發現FutureTask類已經實現了RunnableFuture介面,我們只需要繼承這個類就可以降低工作量。在定製任務類的構造方法中,我們將呼叫FutureTask類的構造方法傳入實現了RunnableCallable介面的物件和對應的返回值型別,併為當前類的成員變數賦值。定製類中有一個實現了RunnableScheduledFuture介面的成員變數。它將儲存定時執行緒池所返回的內部類,這樣做將進一步減輕工作量因為可以在後續的方法中直接呼叫此物件的方法如isPeriodic()getDelay()。需要注意的是,原書在這裡還直接呼叫了此物件的compareTo()方法,這是不正確的。因為在定時執行緒池中執行任務、重新整理執行時間的是定製任務類而不是內部任務類,也就是說time變數在第一次被賦值後就不會再改變,然而內部類的compareTo()方法卻不可避免的使用了time這一變數,這顯然是錯誤的。當我們向定時執行緒池中新增一個以上的週期任務時就會出現難以預測的問題。

MyScheduledTask(自定義的執行在定時執行緒池中的任務類):

package day08.code_05;


import java.util.Date;
import java.util.concurrent.*;

public class MyScheduledTask<V> extends FutureTask<V>
        implements RunnableScheduledFuture<V> {

    //儲存ScheduledFutureTask物件
    private RunnableScheduledFuture<V> task;

    //定時執行器
    private ScheduledThreadPoolExecutor executor;

    //執行週期
    private long period;

    //開始時間
    private long startDate;

    public MyScheduledTask(Runnable runnable,V result,RunnableScheduledFuture<V> task,ScheduledThreadPoolExecutor executor) {
        //呼叫父類FutureTask的構造方法
        super(runnable,result);
        this.task = task;
        this.executor = executor;
    }

    public void setPeriod(long period) {
        this.period = period;
    }

    @Override
    public boolean isPeriodic() {
        return task.isPeriodic();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        //非週期任務直接呼叫ScheduledFutureTask物件的方法
        if (!isPeriodic()) {
            return task.getDelay(unit);
        } else {
            //週期性任務但是還未執行過,直接呼叫ScheduledFutureTask物件的方法
            if (startDate == 0) {
                return task.getDelay(unit);
            } else {
                //週期性任務並且之前執行過
                //根據自定義的屬性計算出距離下一次執行的時間
                Date now = new Date();
                long delay = startDate - now.getTime();
                return unit.convert(delay,TimeUnit.MILLISECONDS);
            }
        }
    }

    @Override
    public int compareTo(Delayed o) {
        //使用定製任務類自己的方法獲取時間差
        long diff = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
        //較早執行的任務排在佇列前面
        if (diff < 0) {
            return -1;
        //較晚執行的任務排在佇列後面
        } else if (diff > 0) {
            return 1;
        }
        return 0;
    }

    @Override
    public void run() {
        //判斷任務是否是週期性執行且執行器未關閉
        if (isPeriodic() && (!executor.isShutdown())) {
            //獲取當前時間
            Date now = new Date();
            //計算出下一次任務的執行時間
            startDate = now.getTime() + period;
            //將任務再次加入
            executor.getQueue().add(this);
        }
        //列印任務開始執行時的日期
        System.out.printf("Pre-MyScheduledTask: %s\n",new Date());
        //列印任務執行的週期
        System.out.printf("MyScheduledTask: Is Periodic: %s\n",isPeriodic());
        //呼叫FutureTask的方法來執行傳入的任務並重置
        super.runAndReset();
        //列印任務結束的時間
        System.out.printf("Post-MyScheduledTask: %s\n",new Date());
    }

}
複製程式碼

想要使用定製任務類,我們需要與其配套的定製定時執行緒池,直接繼承ScheduledThreadPoolExecutor類並重寫其中的方法即可。這個範例中定製執行緒池只重寫了2個方法,實際運用中可根據不同需求進行改變:
MyScheduledThreadPoolExecutor(定製定時執行緒池類):

package day08.code_05;

import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyScheduledThreadPoolExecutor extends
        ScheduledThreadPoolExecutor {

    //指定核心執行緒數量的構造方法
    public MyScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate
            (Runnable command,TimeUnit unit) {
        //呼叫父類的方法執行傳入的任務
        ScheduledFuture<?> task = super.scheduleAtFixedRate
                (command,initialDelay,period,unit);
        //將返回值強轉為定製的任務
        MyScheduledTask myTask = (MyScheduledTask) task;
        //設定任務執行週期
        myTask.setPeriod(TimeUnit.MILLISECONDS.convert(period,unit));
        //返回任務
        return myTask;
    }

    //裝飾任務方法,此方法會在父類的scheduleAtFixedRate方法中被呼叫
    @Override
    protected <V> RunnableScheduledFuture<V> decorateTask(
            Runnable runnable,RunnableScheduledFuture<V> task) {
        //建立我們自己的定製任務類
        //第三個引數task在這裡傳入的是ScheduledThreadPoolExecutor的內部類ScheduledFutureTask
        MyScheduledTask<V> myTask =
                new MyScheduledTask<>(runnable,task,this);
        //返回定製任務類
        return myTask;

    }
}
複製程式碼

Task(向執行緒池提交的任務類,只進行簡單的休眠工作):

package day08.code_05;

import java.util.concurrent.TimeUnit;

public class Task implements Runnable {
    @Override
    public void run() {
        //列印任務開始提示語
        System.out.printf("Task: Begin\n");
        //休眠2秒
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //列印任務結束提示語
        System.out.printf("Task: End\n");
    }
}
複製程式碼

main方法:

package day08.code_05;

import java.util.Date;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        //建立我們自己的定時執行器
        MyScheduledThreadPoolExecutor executor = new MyScheduledThreadPoolExecutor(2);
        //建立一個任務
        Task task = new Task();
        //列印程式開始的時間
        System.out.printf("Main: %s\n",new Date());
        //按照指定時間執行任務
        executor.scheduleAtFixedRate(task,3,TimeUnit.SECONDS);
        //當前執行緒休眠10秒
        TimeUnit.SECONDS.sleep(10);
        //關閉執行器
        executor.shutdown();
        //等待執行器關閉
        executor.awaitTermination(1,TimeUnit.DAYS);
        //列印程式結束提示語
        System.out.printf("Main: End of the program\n");
    }
}
複製程式碼

6.通過ForkJoinWorkerThreadFactory介面為Fork/Join框架生成定製執行緒

之前我們通過實現ThreadFactory介面創造了執行緒工廠以此來生成定製執行緒。我們同樣可以通過實現ForkJoinWorkerThreadFactory介面來創造執行緒工廠以此來為Fork/Join框架生成定製執行緒。

範例實現

建立Fork/Join框架中的定製執行緒,我們可以繼承ForkJoinWorkerThread類並提供相應的構造方法。在這個範例中,定製執行緒類還重寫了onStart()onTermination()方法:

  • onStart():該方法會線上程被建立後,第一個任務開始執行前被自動執行,我們可以重寫此方法去初始化執行緒內部狀態或列印日誌。根據原始碼中的註釋,如果要重寫該方法,需要將super.onStart()這條程式碼放在最開始
  • onTermination():該方法會線上程關閉前執行,我們可以重寫此方法線上程關閉之前釋放資源或列印日誌。根據原始碼中的註釋,如果要重寫該方法,需要將super.onTermination()這條程式碼放在末尾
package day08.code_06;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;

public class MyWorkerThread extends ForkJoinWorkerThread {

    //執行緒級別的計數器
    private static ThreadLocal<Integer> taskCounter = new ThreadLocal<>();

    //構造方法
    protected MyWorkerThread(ForkJoinPool pool) {
        super(pool);
    }

    @Override
    protected void onStart() {
        //必須先呼叫父類的onStart方法
        super.onStart();
        //列印執行緒資訊
        System.out.printf("MyWorkerThread %d: Initializing task counter\n",getId());
        //初始化任務計數器
        taskCounter.set(0);
    }

    @Override
    protected void onTermination(Throwable exception) {
        //列印執行緒資訊和執行的任務數
        System.out.printf("MyWorkerThread %d: %d\n",getId(),taskCounter.get());
        //必須在最後呼叫父類的onTermination方法
        super.onTermination(exception);
    }

    //呼叫此方法可以改變任務計數器的值
    public void addTask() {
        //得到計數器的值
        int counter = taskCounter.get().intValue();
        //自增
        counter++;
        //更新計數器的值
        taskCounter.set(counter);
    }

}
複製程式碼

MyWorkerThreadFactory(定製工廠類,使用工廠方法返回定製執行緒)

package day08.code_06;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;

public class MyWorkerThreadFactory implements
        ForkJoinPool.ForkJoinWorkerThreadFactory {
    @Override
    public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
        return new MyWorkerThread(pool);
    }
}
複製程式碼

MyRecursiveTask(帶有返回值的任務類,在這個範例中將對超大陣列求和,不是重點)

package day08.code_06;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

public class MyRecursiveTask extends RecursiveTask<Integer> {

    //超大陣列
    private int array[];

    //任務起始、終止位置
    private int start,end;

    //構造方法
    public MyRecursiveTask(int[] array,int start,int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        //初始化結果
        int ret = 0;
        //獲取當前執行緒
        MyWorkerThread thread = (MyWorkerThread) Thread.currentThread();
        //呼叫執行緒的addTask方法增加任務計數器的值
        thread.addTask();
        //如果任務過大則分解
        if (end - start > 10000) {
            int middle = (start + end) / 2;
            MyRecursiveTask task1 = new MyRecursiveTask(array,start,middle);
            MyRecursiveTask task2 = new MyRecursiveTask(array,middle,end);
            //非同步執行任務
            task1.fork();
            task2.fork();
            //合併結果
            return addResults(task1,task2);
        }
        //求出範圍內陣列的和
        for (int i = start; i < end; i++) {
            ret += array[i];
        }
        //返回結果
        return ret;

    }

    private Integer addResults(MyRecursiveTask task1,MyRecursiveTask task2) {
        int value;
        //嘗試獲取兩個任務的返回值
        try {
            value = task1.get().intValue() + task2.get().intValue();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            value = 0;
        }
        //休眠1秒
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //返回結果值
        return value;
    }
}
複製程式碼

main方法:

package day08.code_06;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws ExecutionException,InterruptedException {
        //建立定製執行緒工廠
        MyWorkerThreadFactory factory = new MyWorkerThreadFactory();
        //建立執行緒池並將定製執行緒工廠作為引數傳入
        ForkJoinPool pool = new ForkJoinPool(4,factory,false);
        //建立超大陣列並初始化
        int[] array = new int[100000];
        for (int i = 0; i < array.length; i++) {
            array[i] = i;
        }
        //建立任務
        MyRecursiveTask task = new MyRecursiveTask(array,0,array.length);
        //非同步執行
        pool.execute(task);
        //等待任務執行結束
        task.join();
        //關閉執行緒池
        pool.shutdown();
        //等待執行緒池中的任務執行結束
        pool.awaitTermination(1,TimeUnit.DAYS);
        //列印任務返回的結果和程式執行結束提示語
        System.out.printf("Main: Result: %d\n",task.get());
        System.out.println("Main: End of the program");
    }

}
複製程式碼

7.定製執行在Fork/Join框架中的任務

之前我們建立可以使用Fork/Join框架執行的任務通常都是繼承RecursiveAcitonRecursiveTask這兩個抽象類並重寫其中的方法。實際上,我們也可以根據這兩個抽象類的構造去建立定製的任務抽象類。先檢視RecursiveAcitonRecursiveTask這兩個抽象類的原始碼

  • RecursiveAciton類:該任務無返回值
    public abstract class RecursiveAction extends ForkJoinTask<Void> {
        private static final long serialVersionUID = 5232453952276485070L;
        
        //抽象方法,主要用於重寫任務邏輯
        protected abstract void compute();
    
        //獲取任務結果,當前任務類無返回值,所以此方法必須返回null
        public final Void getRawResult() { return null; }
    
       //設定任務結果,當前任務類無返回值,所以此方法為空
        protected final void setRawResult(Void mustBeNull) { }
    
        //執行緒池呼叫任務的此方法執行任務
        //此方法又呼叫compute方法,我們可以重寫此方法做拓展
        protected final boolean exec() {
            compute();
            return true;
        }
    }
    複製程式碼
  • RecursiveTask類:該方法有返回值
    public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
        private static final long serialVersionUID = 5232453952276485270L;
    
       //任務結果
        V result;
    
        //抽象方法,主要用於重寫任務邏輯
        protected abstract V compute();
        
        //獲取任務結果,直接返回成員變數result
        public final V getRawResult() {
            return result;
        }
        
        //設定任務結果,為成員變數result賦值
        protected final void setRawResult(V value) {
            result = value;
        }
    
        //執行緒池呼叫任務的此方法執行任務
        //此方法又呼叫compute方法,我們可以重寫此方法做拓展
        protected final boolean exec() {
            result = compute();
            return true;
        }
    }
    複製程式碼

範例實現

根據以上兩個類的原始碼,我們在此範例中將建立自己的無返回值抽象任務類,並使任務類繼承此定製抽象類而不是RecursiveAcitonRecursiveTask這兩個類
MyWorkerTask(定製抽象任務類):

package day08.code_07;

import java.util.Date;
import java.util.concurrent.ForkJoinTask;

public abstract class MyWorkerTask extends ForkJoinTask<Void> {

    //任務名稱
    private String name;

    //構造方法
    public MyWorkerTask(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    @Override
    public Void getRawResult() {
        //因為當前任務無返回值,所以返回null
        return null;
    }

    @Override
    protected void setRawResult(Void value) {
        //因為無返回值,所以方法為空
    }

    @Override
    protected boolean exec() {
        //建立任務開始時間
        Date startDate = new Date();
        //開始執行任務
        compute();
        //建立任務執行結束時間
        Date finishDate = new Date();
        //計算時間差
        long diff = finishDate.getTime() - startDate.getTime();
        //列印任務執行所花費的時間
        System.out.printf("MyWorkerTask: %s : %d Milliseconds to complete\n",diff);
        return true;
    }

    protected abstract void compute();
}
複製程式碼

Task(任務類,繼承了定製抽象任務類):

package day08.code_07;

public class Task extends MyWorkerTask {

    //必備元素
    private static final long serialVersionUID = 1L;

    //陣列
    private int array[];

    //任務起始、終止位置
    private int start,end;

    //構造方法
    public Task(String name,int[] array,int end) {
        super(name);
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        //如果任務過大,進行拆分
        if (end - start > 100) {
            int mid = (start + end) / 2;
            Task task1 = new Task(this.getName() + "1",array,mid);
            Task task2 = new Task(this.getName() + "2",mid,end);
            //同步執行
            invokeAll(task1,task2);
        } else {
            //將範圍內的陣列元素自增
            for (int i = start; i < end; i++) {
                array[i]++;
            }
        }
        //修廟50毫秒
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
複製程式碼

main方法:

package day08.code_07;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        //建立陣列,元素預設都為0
        int[] array = new int[10000];
        //建立執行緒池
        ForkJoinPool pool = new ForkJoinPool();
        //建立任務
        Task task = new Task("Task",array.length);
        //同步執行任務
        pool.invoke(task);
        //關閉執行緒池
        pool.shutdown();
        //等待執行緒池執行完所有任務後關閉
        pool.awaitTermination(1,TimeUnit.DAYS);
        //檢查任務是否正常完成了
        //因為最初元素都為0,正確自增之後應該為1
        for (int i = 0; i < array.length; i++) {
            if (array[i] != 1) {
                System.out.println("Error!");
            }
        }
        //列印程式結束提示語
        System.out.printf("Main: End of the program\n");
    }
}
複製程式碼

8.實現定製Lock類

之前使用過ReentrantLock類作為鎖,在這一小節中我們會定製自己的Lock類。以ReentrantLock類為例,通過檢視原始碼可以看到鎖的底層是靠AbstractOwnableSynchronizer這一抽象類(之後簡稱為AQS類)的子類實現的。檢視AQS類的原始碼,發現此類內部有一個計數器(state)和若干操作此計數器的方法,原來AQS類才是那個真正的‘鎖’,之前使用過的Lock類只是在真正的鎖上又進行了一層封裝。當我們嘗試獲取鎖時,其實是當前執行緒在嘗試改變AQS類內部計數器的值,計數器的值將會以CAS操作來進行更新。如果更新失敗則表示當前執行緒獲取鎖失敗,這時執行緒會被裝入CAS類內部維護的一個佇列(連結串列實現)並不斷嘗試更改計數器的值,這便是我們在使用鎖時看到的執行緒阻塞直到得到鎖這一現象。另外,如果希望定製的鎖具有可重入性,我們可以呼叫AQS類的父類AbstractOwnableSynchronizer中的setExclusiveOwnerThread()getExclusiveOwnerThread()方法來設定和獲取當前持有鎖的執行緒,這樣一來線上程嘗試修改計數器值時,我們可以判斷當前執行緒是否已經持有了鎖並進行對應的操作。在繼承AQS抽象類後,我們必須要重寫tryAcquire()tryRelease()這兩個方法因為抽象類中並沒有給出這兩個方法的正確實現而是直接丟擲了異常。

範例實現

在這個範例中,我們會繼承AQS類並重寫其中的部分方法來實現定製的AQS類,並以此類為基礎實現定製Lock類。最後我們將使用定製Lock類物件來同步程式碼
MyLock(定製Lock類,需要實現Lock介面):

package day08.code_08;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyLock implements Lock {

    //定製AQS類物件
    private AbstractQueuedSynchronizer sync;

    public MyLock() {
        //通過構造方法為AQS物件賦值
        sync = new MyAbstractQueuedSynchronizer();
    }

    @Override
    public void lock() {
        //呼叫AQS類的方法嘗試修改計數器的值
        //此方法內部會呼叫定製AQS類中的tryAcquire方法
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        //呼叫AQS類的方法嘗試修改計數器的值(可中斷)
        //此方法內部會呼叫定製AQS類中的tryAcquire方法
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        //嘗試獲取鎖,如果失敗直接返回不阻塞
        try {
            return sync.tryAcquireNanos(1,1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return false;
        }
    }

    @Override
    public boolean tryLock(long time,TimeUnit unit) throws InterruptedException {
        //嘗試在指定時間內獲取鎖,如果失敗直接返回不阻塞
        return sync.tryAcquireNanos(1,TimeUnit.NANOSECONDS.convert(time,unit));
    }

    @Override
    public void unlock() {
        //呼叫AQS類的方法嘗試減少計數器的值
        //此方法內部會呼叫定製AQS類的tryRelease方法
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        //建立AQS內部類物件並返回
        return sync.new ConditionObject();
    }
}
複製程式碼

MyAbstractQueuedSynchronizer(定製AQS類):

package day08.code_08;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class MyAbstractQueuedSynchronizer extends
        AbstractQueuedSynchronizer {

    //採用原子變數作為內部計數器
    private volatile AtomicInteger state;

    public MyAbstractQueuedSynchronizer() {
        //在構造方法中初始化計數器
        state = new AtomicInteger(0);
    }

    @Override
    protected boolean tryAcquire(int arg) {
        //獲得當前執行緒
        Thread now = Thread.currentThread();
        //判斷當前執行緒是否為持鎖執行緒
        if (getExclusiveOwnerThread() == now) {
            //增加計數器的值
            state.set(state.get() + arg);
            return true;
            //否則嘗試增大計數器的值
        } else if (state.compareAndSet(0,arg)) {
            //修改成功,設定當前執行緒為持鎖執行緒
            setExclusiveOwnerThread(now);
            return true;
        }
        //修改失敗返回false
        return false;
    }

    @Override
    protected boolean tryRelease(int arg) {
        //獲得當前執行緒
        Thread now = Thread.currentThread();
        //當前執行緒不是持鎖執行緒就直接拋異常
        if (now != getExclusiveOwnerThread()) {
            throw new RuntimeException("Error!");
        }
        //得到計數器當前值
        int number = state.get();
        //判斷減少指定引數後是否為0
        if (number - arg == 0) {
            //為0則表示執行緒釋放了鎖,將持鎖執行緒設定為null
            setExclusiveOwnerThread(null);
        }
        //減少計數器的值
        return state.compareAndSet(number,number - arg);
    }
}
複製程式碼

Task(任務類):

package day08.code_08;

import java.util.concurrent.TimeUnit;

public class Task implements Runnable {

    //定製鎖
    private MyLock lock;

    //任務名曾
    private String name;

    public Task(MyLock lock,String name) {
        this.lock = lock;
        this.name = name;
    }

    @Override
    public void run() {
        //獲取鎖
        lock.lock();
        //列印獲取鎖的提示資訊
        System.out.printf("Task: %s: Take the lock\n",name);
        //呼叫hello方法,主要為了測試定製鎖的可重入性
        hello();
        //休眠兩秒
        try {
            TimeUnit.SECONDS.sleep(2);
            //列印釋放鎖的提示資訊
            System.out.printf("Task: %s: Free the lock\n",name);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }

    private void hello() {
        //獲取鎖
        lock.lock();
        //列印Hello
        System.out.println("Hello!");
        //釋放鎖
        lock.unlock();
    }
}
複製程式碼

main方法:

package day08.code_08;

import java.util.concurrent.TimeUnit;

public class Main {

    public static void main(String[] args) {
        //建立定製鎖物件
        MyLock lock = new MyLock();
        //建立十個任務並分別開啟執行緒執行
        for (int i = 0; i < 10; i++) {
            Task task = new Task(lock,"Task-" + i);
            Thread thread = new Thread(task);
            thread.start();
        }
        //主執行緒休眠兩秒
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        boolean value;
        //不斷自旋嘗試獲取鎖
        do {
            try {
                value = lock.tryLock(1,TimeUnit.SECONDS);
                //獲取鎖失敗列印相關資訊
                if (!value) {
                    System.out.printf("Main: Trying to get the Lock\n");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                value = false;
            }
        } while (!value);
        //列印成功獲取鎖資訊
        System.out.println("Main: Got the lock");
        //釋放鎖
        lock.unlock();
        //列印程式結束資訊
        System.out.println("Main: End of the program");
    }

}
複製程式碼