1. 程式人生 > 其它 >Fork / Join Framework教程:ForkJoinPool示例

Fork / Join Framework教程:ForkJoinPool示例

在Java程式中有效並行使用CPU核心一直是一個挑戰。很少有能夠將工作分配到多個CPU核心,然後將它們合併以返回結果集的框架。Java 7Fork and Join框架已包含此功能。

基本上,Fork-Join將手頭的任務分解為多個小任務,直到該小任務足夠簡單,可以將其解決而無需進一步拆分。這就像分而治之的演算法。在此框架中要注意的一個重要概念是,理想情況下,沒有工作執行緒處於空閒狀態。他們實現了一種工作竊取演算法,該演算法使空閒的工作執行緒從忙碌的工作執行緒那裡竊取工作進行處理。

叉連線框架

Fork-Join框架

它基於Java併發性思想領袖Doug Lea的工作。Fork / Join處理執行緒的麻煩;您只需要向框架指出可以分解並遞迴處理哪些部分。它使用虛擬碼(摘自Doug Lea關於該主題的論文):

結果解決(問題){
	如果(問題很小)
		直接解決問題
	其他{
		將問題分解為獨立的部分
		分叉新的子任務來解決每個部分
		加入所有子任務
		由子結果組成結果
	}
}
討論要點

1)Fork / Join框架中使用的核心類
    i)   ForkJoinPool 
    ii)ForkJoinTask 
2)Fork / Join Pool框架的示例實現
    i)  實現原始碼
    ii)它如何工作?
3)Fork / Join框架與ExecutorService之間的區別
4)JDK中的現有實現
5)結論

Fork / Join框架中使用的核心類

支援Fork-Join機制的核心類是

ForkJoinPoolForkJoinTask

讓我們詳細瞭解他們的角色。

貨叉池

從本質上講,它是實現我們上面提到的工作竊取演算法的ForkJoinPool一種專門實現ExecutorService。我們ForkJoinPool通過提供目標並行度(即處理器數量)來建立的例項,如下所示:

ForkJoinPool pool = new ForkJoinPool(numberOfProcessors);

Where numberOfProcessors = Runtime.getRunTime().availableProcessors();

如果使用無引數建構函式,則預設情況下,它將建立一個大小等於使用上述技術獲得的可用處理器數量的大小的池。

儘管您指定了任何初始池大小,但池會動態調整其大小,以嘗試在任何給定的時間點維護足夠的活動執行緒。與其他相比,另一個重要的區別ExecutorService是,由於程式池的所有執行緒都處於守護程式模式,因此無需在程式退出時顯式關閉該池。

向任務提交任務的三種不同方法ForkJoinPool

1)execute()方法//所需的非同步執行;呼叫其fork方法在多個執行緒之間分配工作。
2)invoke()方法://等待獲取結果;在池上呼叫invoke方法。
3)Submit()方法://返回一個Future物件,可用於檢查狀態並在完成時獲取結果。

ForkJoinTask

這是用於建立在.NET中執行的任務的抽象類ForkJoinPool。該RecursiveactionRecursiveTask是僅有的兩個直接的,稱為子類ForkJoinTask。這兩個類之間的唯一區別是,RecursiveActionRecursiveTask確實具有返回值並返回指定型別的物件時,它們不返回值。

在這兩種情況下,您都需要在子類中實現compute方法,該方法執行任務所需的主要計算。

ForkJoinTask類提供了用於檢查任務的執行狀態的幾種方法。該isDone()方法返回,如果以任何方式任務完成如此。該isCompletedNormally()如果沒有取消或遇到異常,並且在任務完成方法返回trueisCancelled()如果任務被取消,則返回true。最後,如果任務已取消或遇到異常,則isCompletedabnormally()返回true。

Fork / Join Pool框架的示例實現

在此示例中,您將學習如何使用ForkJoinPoolForkJoinTask類提供的非同步方法來管理任務。您將實現一個程式,程式將搜尋資料夾及其子資料夾中具有確定副檔名的檔案ForkJoinTask您將要實現的類將處理資料夾的內容。對於該資料夾內的每個子資料夾,它將ForkJoinPool以非同步方式將新任務傳送到類。對於該資料夾中的每個檔案,任務將檢查檔案的副檔名並將其繼續新增到結果列表中。

上述問題的解決方案在FolderProcessor類中實現,如下所示:

實現原始碼

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class CustomRecursiveAction extends RecursiveAction {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	final int THRESHOLD = 2;
	double[] numbers;
	int indexStart, indexLast;

	CustomRecursiveAction(double[] n, int s, int l) {
		numbers = n;
		indexStart = s;
		indexLast = l;
	}

	@Override
	protected void compute() {
		if ((indexLast - indexStart) > THRESHOLD)
			for (int i = indexStart; i < indexLast; i++)
				numbers[i] = numbers[i] + Math.random();
		else
			invokeAll(new CustomRecursiveAction(numbers, indexStart, (indexStart - indexLast) / 2),
					new CustomRecursiveAction(numbers, (indexStart - indexLast) / 2, indexLast));
	}

	public static void main(String[] args) {
		final int SIZE = 10;
		ForkJoinPool pool = new ForkJoinPool();
		double na[] = new double[SIZE];
		System.out.println("initialized random values :");
		for (int i = 0; i < na.length; i++) {
			na[i] = (double) i + Math.random();
			System.out.format("%.4f ", na[i]);
		}
		System.out.println();
		CustomRecursiveAction task = new CustomRecursiveAction(na, 0, na.length);
		pool.invoke(task);
		System.out.println("Changed values :");
		for (int i = 0; i < 10; i++)
			System.out.format("%.4f ", na[i]);
		System.out.println();
	}
}


import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

public class FolderProcessor extends RecursiveTask<List<String>> {
	private static final long serialVersionUID = 1L;
	// This attribute will store the full path of the folder this task is going to
	// process.
	private final String path;
	// This attribute will store the name of the extension of the files this task is
	// going to look for.
	private final String extension;

	// Implement the constructor of the class to initialize its attributes
	public FolderProcessor(String path, String extension) {
		this.path = path;
		this.extension = extension;
	}

	// Implement the compute() method. As you parameterized the RecursiveTask class
	// with the List<String> type,
	// this method has to return an object of that type.
	@Override
	protected List<String> compute() {
		// List to store the names of the files stored in the folder.
		List<String> list = new ArrayList<String>();
		// FolderProcessor tasks to store the subtasks that are going to process the
		// subfolders stored in the folder
		List<FolderProcessor> tasks = new ArrayList<FolderProcessor>();
		// Get the content of the folder.
		File file = new File(path);
		File content[] = file.listFiles();
		// For each element in the folder, if there is a subfolder, create a new
		// FolderProcessor object
		// and execute it asynchronously using the fork() method.
		if (content != null) {
			for (int i = 0; i < content.length; i++) {
				if (content[i].isDirectory()) {
					FolderProcessor task = new FolderProcessor(content[i].getAbsolutePath(), extension);
					task.fork();
					tasks.add(task);
				}
				// Otherwise, compare the extension of the file with the extension you are
				// looking for using the checkFile() method
				// and, if they are equal, store the full path of the file in the list of
				// strings declared earlier.
				else {
					if (checkFile(content[i].getName())) {
						list.add(content[i].getAbsolutePath());
					}
				}
			}
		}
		// If the list of the FolderProcessor subtasks has more than 50 elements,
		// write a message to the console to indicate this circumstance.
		if (tasks.size() > 50) {
			System.out.printf("%s: %d tasks ran.\n", file.getAbsolutePath(), tasks.size());
		}
		// add to the list of files the results returned by the subtasks launched by
		// this task.
		addResultsFromTasks(list, tasks);
		// Return the list of strings
		return list;
	}

	// For each task stored in the list of tasks, call the join() method that will
	// wait for its finalization and then will return the result of the task.
	// Add that result to the list of strings using the addAll() method.
	private void addResultsFromTasks(List<String> list, List<FolderProcessor> tasks) {
		for (FolderProcessor item : tasks) {
			list.addAll(item.join());
		}
	}

	// This method compares if the name of a file passed as a parameter ends with
	// the extension you are looking for.
	private boolean checkFile(String name) {
		return name.endsWith(extension);
	}

	public static void main(String[] args) {
		// Create ForkJoinPool using the default constructor.
		ForkJoinPool pool = new ForkJoinPool();
		// Create three FolderProcessor tasks. Initialize each one with a different
		// folder path.
		FolderProcessor system = new FolderProcessor("C:\\Windows", "log");
		FolderProcessor apps = new FolderProcessor("C:\\Program Files", "log");
		FolderProcessor documents = new FolderProcessor("C:\\Documents And Settings", "log");
		// Execute the three tasks in the pool using the execute() method.
		pool.execute(system);
		pool.execute(apps);
		pool.execute(documents);
		// Write to the console information about the status of the pool every second
		// until the three tasks have finished their execution.
		do {
			System.out.printf("******************************************\n");
			System.out.printf("Main: Parallelism: %d\n", pool.getParallelism());
			System.out.printf("Main: Active Threads: %d\n", pool.getActiveThreadCount());
			System.out.printf("Main: Task Count: %d\n", pool.getQueuedTaskCount());
			System.out.printf("Main: Steal Count: %d\n", pool.getStealCount());
			System.out.printf("******************************************\n");
			try {
				TimeUnit.SECONDS.sleep(1);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		} while ((!system.isDone()) || (!apps.isDone()) || (!documents.isDone()));
		// Shut down ForkJoinPool using the shutdown() method.
		pool.shutdown();
		// Write the number of results generated by each task to the console.
		List<String> results;
		results = system.join();
		System.out.printf("System: %d files found.\n", results.size());
		results = apps.join();
		System.out.printf("Apps: %d files found.\n", results.size());
		results = documents.join();
		System.out.printf("Documents: %d files found.\n", results.size());
	}
}

上面程式的輸出將如下所示:

Main: Parallelism: 2
Main: Active Threads: 3
Main: Task Count: 1403
Main: Steal Count: 5551
******************************************
******************************************
Main: Parallelism: 2
Main: Active Threads: 3
Main: Task Count: 586
Main: Steal Count: 5551
******************************************
System: 337 files found.
Apps: 10 files found.
Documents: 0 files found.

這個怎麼運作?

FolderProcessor該類中,每個任務都處理資料夾的內容。如您所知,此內容包含以下兩種元素:

  • 檔案
  • 其他資料夾

如果任務找到一個資料夾,它將建立另一個Task物件來處理該資料夾,並使用fork()方法將其傳送到池中。如果該任務具有空閒的工作執行緒或可以建立新的工作執行緒,則此方法會將任務傳送到執行該任務的池。該方法將立即返回,因此任務可以繼續處理資料夾的內容。對於每個檔案,任務都會將其副檔名與要查詢的副檔名進行比較,如果相等,則將檔名新增到結果列表中。

任務處理完分配的資料夾的所有內容後,它將等待使用join()方法完成傳送給池的所有任務的完成。在任務中呼叫的此方法等待其執行完成,並返回由compute()方法返回的值。該任務將其傳送的所有任務的結果與自己的結果分組,並將該列表作為compute()方法的返回值返回。

Fork / Join框架和ExecutorService之間的區別

該叉/加入和執行器框架之間的主要區別是工作竊取演算法。與Executor框架不同,當任務正在等待使用join操作建立的子任務完成時,正在執行該任務的執行緒(稱為worker執行緒)將尋找尚未執行的其他任務並開始執行它的執行。通過這種方式,執行緒可以充分利用其執行時間,從而提高了應用程式的效能。

JDK中的現有實現

Java SE中有一些通常有用的功能,已經使用fork / join框架實現了。

1)Java SE 8中引入的一種這樣的實現由java.util.Arrays類用於其parallelSort()方法。這些方法類似於sort(),但是通過fork / join框架利用併發性。在多處理器系統上執行時,大型陣列的並行排序比順序排序快。

2)Stream.parallel()中使用的並行性。閱讀Java 8中有關此並行流操作的更多資訊。

結論

設計好的多執行緒演算法很困難,並且fork / join並非在每種情況下都有效。它在其自身的適用範圍內非常有用,但是最後,您必須確定問題是否適合該框架,如果不合適,則必須準備好利用java提供的精湛工具來開發自己的解決方案。 util.concurrent軟體包。