1. 程式人生 > >FutureTask與Fork/Join

FutureTask與Fork/Join

英文 引用 ktr olt pri cte ren final 方法

在學習多線程的過程中,我們形成了一種思維習慣。那就是對於某個耗時操作不再做同步操作,讓他分裂成一個線程之後執行下一步,而線程執行耗時操作。並且我們希望在我們需要它返回的時候再去調用它的結果集。好比我們把米飯和水放進了電飯煲,轉頭就去炒菜了,等到菜完成之後,轉頭去查看飯是否完成。多線程造成了並行計算的現象,有時候它們是真的多核計算而有時候只是單核的切換。

FutureTask表示的是一種,異步操作的典範。我提交了任務,在未來我要拿到結果。

考慮一種簡單的場景,A問B一個問題,B一時回答不了,B要去考慮一段時間,等到有結果了,再告訴A。

這時,我們需要類A,類B。

package
Future; //調用端 public class CallA implements CallBack{ private CallB b; public CallA(CallB b){ this.b = b; } public void submitProblem(String problem){ System.out.println("a 提交問題:"+problem); new Thread(){ public void run(){ b.execute(CallA.
this, problem); } }.start(); System.out.println("a 提交問題完畢"); } @Override public void result(String result) { System.out.println(result); } }
package Future;

//執行處理
public class CallB {
	public void execute(CallBack callBack,String problem){
		System.out.println("接受問題:"+problem);
		System.out.println("開始處理");
		try{
			Thread.sleep(2000);
		}catch (Exception e) {
			e.printStackTrace();
		}
		callBack.result("問題處理結果:abcdefg...");
	}
}

  類的結構是,A中保留它作用對象B的一個引用,在觸發詢問問題的時候,A向B提交了一個方法調用,並且同時開啟了一個線程,這是它不阻塞的原因。

  為“提問題”做一個面向對象的接口。

//回調接口
public interface CallBack {
	public void result(String result);
}	

  他們執行的主流程,十分簡單。

public class Main {
    public static void main(String[] args) {
        CallB b = new CallB();
        CallA a = new CallA(b);
        a.submitProblem("英文字母");
    }
}

  熟悉了這個過程,JDK提供了FutureTask的接口。

package Future;

import java.util.concurrent.Callable;

public class RealData implements Callable<String>{
    private String data;
    public RealData(String data){
        this.data = data;
    }
    @Override
    public String call() throws Exception {
        StringBuffer sb = new StringBuffer();
        for(int i=0;i<10;i++){
            sb.append(data);
            try{
                Thread.sleep(1500);
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
        return sb.toString();
    }

}

  實現這個Callable接口,重寫call方法,在未來調用get的時候將返回運算結果。

package Future;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

//jdk future框架
public class FutureMain {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        FutureTask<String> future = new FutureTask<String>(new RealData("a"));
        ExecutorService executor = Executors.newFixedThreadPool(1);
        executor.submit(future);
        System.out.println("請求完畢");
        try{
            Thread.sleep(1000);
        }catch (Exception e) {
        }
        System.out.println("future task 返回:"+future.get());
    }
}

多線程的優勢體現在並行計算中,雖然某大佬說研究並行計算是在浪費時間,但是作為一種由多線程產生的技術來說,先了解一下特點。

JDK為我們提供了一套Join/Fork框架,考慮下面這個例子。

package ForkAndJoin;

import java.util.concurrent.RecursiveAction;

public class PrintTask extends RecursiveAction{
    private final int Max = 50;
    private int start;
    private int end;
    public PrintTask(int start,int end){
        this.start = start;
        this.end = end;
    }
    @Override
    protected void compute() {
        if((end - start)<Max){
            for(int i=start;i<end;i++){
                System.out.println("當前線程:"+Thread.currentThread().getName()+" i :"+i);
            }
        }else{
            int middle = (start+end)/2;
            PrintTask left = new PrintTask(start, middle);
            PrintTask right = new PrintTask(middle, end);
            left.fork();
            right.fork();
        }
    }

}
package ForkAndJoin;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class ForkJoinPoolTest {
    public static void main(String[] args) throws InterruptedException {
        ForkJoinPool forkJoin = new ForkJoinPool();
        forkJoin.submit(new PrintTask(0,200));
        forkJoin.awaitTermination(2, TimeUnit.SECONDS);
        forkJoin.shutdown(); 
    }
}

在compute方法中寫主要的任務處理,這是一個並行計算的小例子。

J/F的模式很像map-reduce模式,將任務分小,然後各個處理。

  

FutureTask與Fork/Join