FutureTask與Fork/Join
阿新 • • 發佈:2017-08-12
英文 引用 ktr olt pri cte ren final 方法
在學習多線程的過程中,我們形成了一種思維習慣。那就是對於某個耗時操作不再做同步操作,讓他分裂成一個線程之後執行下一步,而線程執行耗時操作。並且我們希望在我們需要它返回的時候再去調用它的結果集。好比我們把米飯和水放進了電飯煲,轉頭就去炒菜了,等到菜完成之後,轉頭去查看飯是否完成。多線程造成了並行計算的現象,有時候它們是真的多核計算而有時候只是單核的切換。
FutureTask表示的是一種,異步操作的典範。我提交了任務,在未來我要拿到結果。
考慮一種簡單的場景,A問B一個問題,B一時回答不了,B要去考慮一段時間,等到有結果了,再告訴A。
這時,我們需要類A,類B。
packageFuture; //調用端 public class CallA implements CallBack{ private CallB b; public CallA(CallB b){ this.b = b; } public void submitProblem(String problem){ System.out.println("a 提交問題:"+problem); new Thread(){ public void run(){ b.execute(CallA.this, problem); } }.start(); System.out.println("a 提交問題完畢"); } @Override public void result(String result) { System.out.println(result); } }
package Future; //執行處理 public class CallB { public void execute(CallBack callBack,String problem){ System.out.println("接受問題:"+problem); System.out.println("開始處理"); try{ Thread.sleep(2000); }catch (Exception e) { e.printStackTrace(); } callBack.result("問題處理結果:abcdefg..."); } }
類的結構是,A中保留它作用對象B的一個引用,在觸發詢問問題的時候,A向B提交了一個方法調用,並且同時開啟了一個線程,這是它不阻塞的原因。
為“提問題”做一個面向對象的接口。
//回調接口 public interface CallBack { public void result(String result); }
他們執行的主流程,十分簡單。
public class Main { public static void main(String[] args) { CallB b = new CallB(); CallA a = new CallA(b); a.submitProblem("英文字母"); } }
熟悉了這個過程,JDK提供了FutureTask的接口。
package Future; import java.util.concurrent.Callable; public class RealData implements Callable<String>{ private String data; public RealData(String data){ this.data = data; } @Override public String call() throws Exception { StringBuffer sb = new StringBuffer(); for(int i=0;i<10;i++){ sb.append(data); try{ Thread.sleep(1500); }catch (Exception e) { e.printStackTrace(); } } return sb.toString(); } }
實現這個Callable接口,重寫call方法,在未來調用get的時候將返回運算結果。
package Future; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; //jdk future框架 public class FutureMain { public static void main(String[] args) throws InterruptedException, ExecutionException { FutureTask<String> future = new FutureTask<String>(new RealData("a")); ExecutorService executor = Executors.newFixedThreadPool(1); executor.submit(future); System.out.println("請求完畢"); try{ Thread.sleep(1000); }catch (Exception e) { } System.out.println("future task 返回:"+future.get()); } }
多線程的優勢體現在並行計算中,雖然某大佬說研究並行計算是在浪費時間,但是作為一種由多線程產生的技術來說,先了解一下特點。
JDK為我們提供了一套Join/Fork框架,考慮下面這個例子。
package ForkAndJoin; import java.util.concurrent.RecursiveAction; public class PrintTask extends RecursiveAction{ private final int Max = 50; private int start; private int end; public PrintTask(int start,int end){ this.start = start; this.end = end; } @Override protected void compute() { if((end - start)<Max){ for(int i=start;i<end;i++){ System.out.println("當前線程:"+Thread.currentThread().getName()+" i :"+i); } }else{ int middle = (start+end)/2; PrintTask left = new PrintTask(start, middle); PrintTask right = new PrintTask(middle, end); left.fork(); right.fork(); } } }
package ForkAndJoin; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; public class ForkJoinPoolTest { public static void main(String[] args) throws InterruptedException { ForkJoinPool forkJoin = new ForkJoinPool(); forkJoin.submit(new PrintTask(0,200)); forkJoin.awaitTermination(2, TimeUnit.SECONDS); forkJoin.shutdown(); } }
在compute方法中寫主要的任務處理,這是一個並行計算的小例子。
J/F的模式很像map-reduce模式,將任務分小,然後各個處理。
FutureTask與Fork/Join