JAVA併發-Executor任務執行框架
阿新 • • 發佈:2019-02-19
首先介紹兩個重要的介面,Executor和ExecutorService,定義如下:
為了配合使用上面的併發程式設計介面,有一個Executors工廠類,負責建立各類滿足ExecutorService介面的執行緒池,具體如下:
newFixedThreadPool:建立一個固定長度的執行緒池,執行緒池中執行緒的數量從1增加到最大值後保持不變。如果某個執行緒壞死掉,將會補充一個新的執行緒。
newCachedThreadPool:建立長度不固定的執行緒池,執行緒池的規模不受限制,不常用。
newSingleThreadExecutor:建立一個單執行緒的Executor,他其中有一個執行緒來處理任務,如果這個執行緒壞死掉,將補充一個新執行緒。
newScheduledThreadPool :建立固定長度的執行緒池,以延時或定時的方式來執行任務。
下面是Executor和ExecutorService中常用方法的示例:
上面的ExecutorSerivce介面中的invokeAll(tasks)方法用於批量執行任務,並且將結果按照task列表中的順序返回。此外,還存在一個批量執行任務的介面CompletionTask。ExecutorCompletionService是實現CompletionService介面的一個類,該類的實現原理很簡單:
用Executor類來執行任務,同時把在執行任務的Future放到BlockingQueue<Future<V>>佇列中。該類實現的關鍵就是重寫FutureTask類的done()方法,FutureTask類的done()方法是一個鉤子函式(關於鉤子函式,請讀者自行查詢),done()方法在FutureTask任務被執行的時候被呼叫。
ExecutorCompletionService類的核心程式碼如下:
其中的done()方法定義如下:
ExecutorCompletionService的使用示例如下:
public interface Executor {
void execute(Runnable command);
}
public interface ExecutorService extends Executor { //不再接受新任務,待所有任務執行完畢後關閉ExecutorService void shutdown(); //不再接受新任務,直接關閉ExecutorService,返回沒有執行的任務列表 List<Runnable> shutdownNow(); //判斷ExecutorService是否關閉 boolean isShutdown(); //判斷ExecutorService是否終止 boolean isTerminated(); //等待ExecutorService到達終止狀態 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); //當task執行成功的時候future.get()返回result <T> Future<T> submit(Runnable task, T result); //當task執行成功的時候future.get()返回null Future<?> submit(Runnable task); //批量提交任務並獲得他們的future,Task列表與Future列表一一對應 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; //批量提交任務並獲得他們的future,並限定處理所有任務的時間 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; //批量提交任務並獲得一個已經成功執行的任務的結果 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
為了配合使用上面的併發程式設計介面,有一個Executors工廠類,負責建立各類滿足ExecutorService介面的執行緒池,具體如下:
newFixedThreadPool:建立一個固定長度的執行緒池,執行緒池中執行緒的數量從1增加到最大值後保持不變。如果某個執行緒壞死掉,將會補充一個新的執行緒。
newCachedThreadPool:建立長度不固定的執行緒池,執行緒池的規模不受限制,不常用。
newSingleThreadExecutor:建立一個單執行緒的Executor,他其中有一個執行緒來處理任務,如果這個執行緒壞死掉,將補充一個新執行緒。
newScheduledThreadPool
下面是Executor和ExecutorService中常用方法的示例:
import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class Demo{ public static void main(String [] args){ //--------Executor示例------------// Executor s=Executors.newSingleThreadExecutor(); s.execute(new MyRunnableTask("1")); //--------ExecutorService示例------------// ExecutorService es=Executors.newFixedThreadPool(2); //--------get()示例------------// Future<String> future=es.submit(new MyCallableTask("10")); try{ System.out.println(future.get()); }catch(Exception e){} //--------get(timeout, timeunit)示例------------// future=es.submit(new MyCallableTask("11")); try{ System.out.println(future.get(500,TimeUnit.MILLISECONDS)); }catch(Exception e){ System.out.println("cancle because timeout"); } //--------invokeAll(tasks)示例------------// List<MyCallableTask> myCallableTasks=new ArrayList<MyCallableTask>(); for(int i=0;i<6;i++){ myCallableTasks.add(new MyCallableTask(i+"")); } try { List<Future<String>> results = es.invokeAll(myCallableTasks); Iterator<Future<String>> iterator=results.iterator(); while(iterator.hasNext()){ future=iterator.next(); System.out.println(future.get()); } } catch (Exception e) {} //--------invokeAll(tasks,timeout,timeunit))示例------------// try { //限定執行時間為2100ms,每個任務需要1000ms,執行緒池的長度為2,因此最多隻能處理4個任務。一共6個任務,有2個任務會被取消。 List<Future<String>> results = es.invokeAll(myCallableTasks,2100,TimeUnit.MILLISECONDS); Iterator<Future<String>> iterator=results.iterator(); while(iterator.hasNext()){ future=iterator.next(); if(!future.isCancelled()) System.out.println(future.get()); else System.out.println("cancle because timeout"); } } catch (Exception e) {} es.shutdown(); } } class MyRunnableTask implements Runnable{ private String name; public MyRunnableTask(String name) { this.name=name; } @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("runnable task--"+name); } } class MyCallableTask implements Callable<String>{ private String name; public MyCallableTask(String name) { this.name=name; } @Override public String call() throws Exception { try { Thread.sleep(1000); } catch (InterruptedException e) {} StringBuilder sb=new StringBuilder("callable task--"); return sb.append(name).toString(); } }
上面的ExecutorSerivce介面中的invokeAll(tasks)方法用於批量執行任務,並且將結果按照task列表中的順序返回。此外,還存在一個批量執行任務的介面CompletionTask。ExecutorCompletionService是實現CompletionService介面的一個類,該類的實現原理很簡單:
用Executor類來執行任務,同時把在執行任務的Future放到BlockingQueue<Future<V>>佇列中。該類實現的關鍵就是重寫FutureTask類的done()方法,FutureTask類的done()方法是一個鉤子函式(關於鉤子函式,請讀者自行查詢),done()方法在FutureTask任務被執行的時候被呼叫。
ExecutorCompletionService類的核心程式碼如下:
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
其中的done()方法定義如下:
/**
* Protected method invoked when this task transitions to state
* <tt>isDone</tt> (whether normally or via cancellation). The
* default implementation does nothing. Subclasses may override
* this method to invoke completion callbacks or perform
* bookkeeping. Note that you can query status inside the
* implementation of this method to determine whether this task
* has been cancelled.
*/
protected void done() { }
ExecutorCompletionService的使用示例如下:
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Demo{
public static void main(String [] args) throws InterruptedException, ExecutionException{
CompletionService<String> cs=new ExecutorCompletionService<String>(
Executors.newFixedThreadPool(2));
for(int i=0;i<6;i++){
cs.submit(new MyCallableTask(i+""));
}
for(int i=0;i<6;i++){
Future<String> future=cs.take();
//Retrieves and removes the Future representing the next completed task,
//waiting if none are yet present.
System.out.println(future.get());
}
}
}
class MyCallableTask implements Callable<String>{
private String name;
public MyCallableTask(String name) {
this.name=name;
}
@Override
public String call() throws Exception {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
StringBuilder sb=new StringBuilder("callable task--");
return sb.append(name).toString();
}
}