Fork/Join框架(二)建立一個Fork/Join池
宣告:本文是《 Java 7 Concurrency Cookbook 》的第五章,作者: Javier Fernández González 譯者:許巧輝 校對:方騰飛
建立一個Fork/Join池
在這個指南中,你將學習如何使用Fork/Join框架的基本元素。它包括:
- 建立一個ForkJoinPool物件來執行任務。
- 建立一個ForkJoinPool執行的ForkJoinTask類。
你將在這個示例中使用Fork/Join框架的主要特點,如下:
- 你將使用預設構造器建立ForkJoinPool。
- 在這個任務中,你將使用Java API文件推薦的結構:
If (problem size < default size){ tasks=divide(task); execute(tasks); } else { resolve problem using another algorithm; }
- 你將以一種同步方式執行任務。當一個任務執行2個或2個以上的子任務時,它將等待它們的結束。通過這種方式 ,正在執行這些任務的執行緒(工作執行緒)將會查詢其他任務(尚未執行的任務)來執行,充分利用它們的執行時間。
- 你將要實現的任務將不會返回任何結果,所以你將使用RecursiveAction作為它們實現的基類。
準備工作
這個指南中的例子使用Eclipse IDE實現。如果你使用Eclipse或其他IDE,如NetBeans,開啟它並建立一個新的Java專案。
如何做…
在這個指南中,你將繼續實現一個任務來修改產品列表的價格。任務最初是負責更新一個佇列中的所有元素。你將會使用10作為參考大小,如果一個任務必須更新超過10個元素,這些元素將被劃分成兩個部分,並建立兩個任務來更新每個部分中的產品的價格。
按以下步驟來實現這個示例:
1.建立類Product,將用來儲存產品的名稱和價格。
public class Product {
2.宣告一個私有的String型別的屬性name和一個私有的double型別的屬性price。
private String name; private double price;
3.實現這些方法,用來設定和獲取這兩個屬性的值。
public String getName() { return name; } public void setName(String name) { this.name = name; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; }
4.建立ProductListGenerator類,用來產生隨機產品的數列。
public class ProductListGenerator {
5.實現generate()方法。它接收一個數列大小 的int型別引數,返回一個產生產品數列的List<Product>物件。
public List<Product> generate (int size) {
6.建立返回產品數列的物件。
List<Product> ret=new ArrayList<Product>();
7.建立產品佇列。給所有產品賦予相同值。比如,10用來檢查程式是否工作得很好。
for (int i=0; i<size; i++){ Product product=new Product(); product.setName("Product"+i); product.setPrice(10); ret.add(product); } return ret; }
8.建立Task類,指定它繼承RecursiveAction類。
public class Task extends RecursiveAction {
9.宣告類的序列版本UID。這個元素是必需的,因為RecursiveAction類的父類ForkJoinTask實現了Serializable介面。
private static final long serialVersionUID = 1L;
10.宣告一個私有的、List<Product>型別的屬性products。
private List<Product> products;
11.宣告兩個私有的、int型別的屬性first和last。這些屬性將決定這個任務產品的阻塞過程。
private int first; private int last;
12.宣告一個私有的、double型別的屬性increment,用來儲存產品價格的增長。
private double increment;
13.實現這個類的構造器,初始化所有屬性。
public Task (List<Product> products, int first, int last, double increment) { this.products=products; this.first=first; this.last=last; this.increment=increment; }
14.實現compute()方法 ,該方法將實現任務的邏輯。
@Override protected void compute() {
15.如果last和first的差小於10(任務只能更新價格小於10的產品),使用updatePrices()方法遞增的設定產品的價格。
if (last-first<10) { updatePrices();
16.如果last和first的差大於或等於10,則建立兩個新的Task物件,一個處理產品的前半部分,另一個處理產品的後半部分,然後在ForkJoinPool中,使用invokeAll()方法執行它們。
} else { int middle=(last+first)/2; System.out.printf("Task: Pending tasks: %s\n",getQueuedTaskCount()); Task t1=new Task(products, first,middle+1, increment); Task t2=new Task(products, middle+1,last, increment); invokeAll(t1, t2); }
17.實現updatePrices()方法。這個方法更新產品佇列中位於first值和last值之間的產品。
private void updatePrices() { for (int i=first; i<last; i++){ Product product=products.get(i); product.setPrice(product.getPrice()*(1+increment)); } }
18.實現這個示例的主類,通過建立Main類,並實現main()方法。
public class Main { public static void main(String[] args) {
19.使用ProductListGenerator類建立一個包括10000個產品的數列。
ProductListGenerator generator=new ProductListGenerator(); List<Product> products=generator.generate(10000);
20.建立一個新的Task物件,用來更新產品佇列中的產品。first引數使用值0,last引數使用值10000(產品數列的大小)。
Task task=new Task(products, 0, products.size(), 0.20);
21.使用無參構造器建立ForkJoinPool物件。
ForkJoinPool pool=new ForkJoinPool();
22.在池中使用execute()方法執行這個任務 。
pool.execute(task);
23.實現一個顯示關於每隔5毫秒池中的變化資訊的程式碼塊。將池中的一些引數值寫入到控制檯,直到任務完成它的執行。
do { System.out.printf("Main: Thread Count: %d\n",pool.getActiveThreadCount()); System.out.printf("Main: Thread Steal: %d\n",pool.getStealCount()); System.out.printf("Main: Parallelism: %d\n",pool.getParallelism()); try { TimeUnit.MILLISECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } while (!task.isDone());
24.使用shutdown()方法關閉這個池。
pool.shutdown();
25.使用isCompletedNormally()方法檢查假設任務完成時沒有出錯,在這種情況下,寫入一條資訊到控制檯。
if (task.isCompletedNormally()){ System.out.printf("Main: The process has completed normally.\n"); }
26.在增長之後,所有產品的價格應該是12。將價格不是12的所有產品的名稱和價格寫入到控制檯,用來檢查它們錯誤地增長它們的價格。
for (int i=0; i<products.size(); i++){ Product product=products.get(i); if (product.getPrice()!=12) { System.out.printf("Product %s: %f\n",product.getName(),product.getPrice()); } }
27.寫入一條資訊到控制檯表明程式的結束。
System.out.println("Main: End of the program.\n");
它是如何工作的…
在這個示例中,你已經建立一個ForkJoinPool物件和一個在池中執行的ForkJoinTask類的子類。為了建立ForkJoinPool物件,你已經使用了無參構造器,所以它會以預設的配置來執行。它建立一個執行緒數等於計算機處理器數的池。當ForkJoinPool物件被建立時,這些執行緒被建立並且在池中等待,直到有任務到達讓它們執行。
由於Task類沒有返回結果,所以它繼承RecursiveAction類。在這個指南中,你已經使用了推薦的結構來實現任務。如果這個任務更新超過10產品,它將被分解成兩部分,並建立兩個任務,一個任務執行一部分。你已經在Task類中使用first和last屬性,用來了解這個任務要更新的產品佇列的位置範圍。你已經使用first和last屬性,只複製產品數列一次,而不是為每個任務建立不同的數列。
它呼叫invokeAll()方法,執行每個任務所建立的子任務。這是一個同步呼叫,這個任務在繼續(可能完成)它的執行之前,必須等待子任務的結束。當任務正在等待它的子任務(結束)時,正在執行它的工作執行緒執行其他正在等待的任務。在這種行為下,Fork/Join框架比Runnable和Callable物件本身提供一種更高效的任務管理。
ForkJoinTask類的invokeAll()方法是執行者(Executor)和Fork/Join框架的一個主要區別。在執行者框架中,所有任務被提交給執行者,而在這種情況下,這些任務包括執行和控制這些任務的方法都在池內。你已經在Task類中使用invokeAll()方法,它是繼承了繼承ForkJoinTask類的RecursiveAction類。
你使用execute()方法提交唯一的任務給這個池,用來所有產品數列。在這種情況下,它是一個非同步呼叫,而主執行緒繼續它的執行。
你已經使用ForkJoinPool類的一些方法,用來檢查正在執行任務的狀態和變化。基於這個目的,這個類包括更多的方法。參見有這些方法完整列表的監控一個Fork/Join池指南。
最後,與執行者框架一樣,你應該使用shutdown()方法結束ForkJoinPool。 以下截圖顯示這個示例執行的一部分:
你可以看出任務正在完成它們的工作和產品價格的更新。
不止這些…
ForkJoinPool類提供其他的方法,用來執行一個任務。這些方法如下:
- execute (Runnable task):這是在這個示例中,使用的execute()方法的另一個版本。在這種情況下,你可以提交一個Runnable物件給ForkJoinPool類。注意:ForkJoinPool類不會對Runnable物件使用work-stealing演算法。它(work-stealing演算法)只用於ForkJoinTask物件。
- invoke(ForkJoinTask<T> task):當execute()方法使用一個非同步呼叫ForkJoinPool類,正如你在本示例中所學的,invoke()方法使用同步呼叫ForkJoinPool類。這個呼叫不會(立即)返回,直到傳遞的引數任務完成它的執行。
- 你也可以使用在ExecutorService介面的invokeAll()和invokeAny()方法。這些方法接收一個Callable物件作為引數。ForkJoinPool類不會對Callable物件使用work-stealing演算法,所以你最好使用執行者去執行它們。
ForkJoinTask類同樣提供在示例中使用的invokeAll()的其他版本。這些版本如下:
- invokeAll(ForkJoinTask<?>… tasks):這個版本的方法使用一個可變引數列表。你可以傳入許多你想要執行的ForkJoinTask物件作為引數。
- invokeAll(Collection<T> tasks):這個版本的方法接收一個泛型型別T物件的集合(如:一個ArrayList物件,一個LinkedList物件或者一個TreeSet物件)。這個泛型型別T必須是ForkJoinTask類或它的子類。
即使ForkJoinPool類被設計成用來執行一個ForkJoinTask,你也可以直接執行Runnable和Callable物件。你也可以使用ForkJoinTask類的adapt()方法來執行任務,它接收一個Callable物件或Runnable物件(作為引數)並返回一個ForkJoinTask物件。 參見
- 在第8章,測試併發應用程式中的監控一個Fork/Join池的指南