Fork and Join: Java也可以輕鬆地編寫併發程式
原文地址 作者:Julien Ponge 譯者:iDestiny
資源下載:
如今,多核處理器在伺服器,桌上型電腦及膝上型電腦上已經很普遍了,同時也被應用在更小的裝置上,比如智慧手機和平板電腦。這就開啟了併發程式設計新的潛力,因為多個執行緒可以在多個核心上併發執行。在應用中要實現最大效能的一個重要技術手段是將密集的任務分隔成多個可以並行執行的塊,以便可以最大化利用計算能力。
處理併發(並行)程式,一向都是比較困難的,因為你必須處理執行緒同步和共享資料的問題。對於java平臺在語言級別上對併發程式設計的支援就很強大,這已經在Groovy(GPars), Scala和Clojure的社群的努力下得以證明。這些社群都儘量提供全面的程式設計模型和有效的實現來掩飾多執行緒和分散式應用帶來的痛苦。Java語言本身在這方面不應該被認為是不行的。Java平臺標準版(Java SE) 5 ,和Java SE 6引入了一組包提供強大的併發模組。Java SE 7中通過加入了對並行支援又進一步增強它們。
接下來的文章將以Java中一個簡短的併發程式作為開始,以一個在早期版本中存在的底層機制開始。在展示由Java SE7中的fork/join框架提供的fork/join任務之前,將看到java.util.concurrent包提供的豐富的原語操作。然後就是使用新API的例子。最後,將對上面總結的方法進行討論。
在下文中,我們假定讀者具有Java SE5或Java SE6的背景,我們會一路呈現一些Java SE7帶來的一些實用的語言演變。
Java中普通執行緒的併發程式設計
首先從歷史上來看,java併發程式設計中通過java.lang.Thread類和java.lang.Runnable介面來編寫多執行緒程式,然後確保程式碼對於共享的可變物件表現出的正確性和一致性,並且避免不正確的讀/寫操作,同時不會由於競爭條件上的鎖爭用而產生死鎖。這裡是一個基本的執行緒操作的例子:
Thread thread = new Thread() { @Override public void run() { System.out.println("I am running in a separate thread!"); } }; thread.start(); thread.join();
例子中的程式碼建立了一個執行緒,並且列印一個字串到標準輸出。通過呼叫join()方法,主執行緒將等待建立的(子)執行緒執行完成。
對於簡單的例子,直接操作執行緒這種方式是可以的,但對於併發程式設計,這樣的程式碼很快變得容易出錯,特別是好幾個執行緒需要協作來完成一個更大的任務的時候。這種情況下,它們的控制流需要被協調。
例如,一個執行緒的執行完成可能依賴於其他將要執行完成的執行緒。通常熟悉的例子就是生產者/消費者的例子,因為如果消費者佇列是空的,那麼生產者應該等待消費者,並且如果生產者佇列是空的,那麼消費者也應該等待生產者。該需求可能通過共享狀態和條件佇列來實現,但是你仍然必須通過使用共享物件上的java.lang.Object.nofity()和java.lang.Object.wait()來實現同步,這很容易出錯。
最終,一個常見的錯誤就是在大段程式碼甚至整個方法上使用synchronize進行互斥。雖然這種方法能實現執行緒安全的程式碼,但是通常由於排斥時間太長而限制了並行性,從而造成效能低下。
在通常的計算過程中,操作低階原語來實現複雜的操作,這是對錯誤敞開大門。因此,開發者應該尋求有效地封裝複雜性為更高階的庫。Java SE5提供了那樣的能力。
java.util.concurrent包中豐富的原語
Java SE5引入了一個叫java.util.concurrent的包家族,在Java SE6中得到進一步增強。該包家族提供了下面這些併發程式設計的原語,集合以及特性:
- Executors,增強了普通的執行緒,因為它們(執行緒)從執行緒池管理中被抽象出來。它們執行任務類似於傳遞執行緒(實際上,是實現了java.util.Runnable的例項被封裝了)。好幾種實現都提供了執行緒池和排程策略。而且,執行結果既可以同步也可以非同步的方式來獲取。
- 執行緒安全的佇列允許在併發任務中傳遞資料。一組豐富的實現通過基本的資料結構(如陣列連結串列,連結連結串列,或雙端佇列)和併發行為(如阻塞,支援優先順序,或延遲)得以提供。
- 細粒度的超時延遲規範,因為大部分java.util.concurrent包中的類都支援超時延遲。比如一個任務如果沒有在有限之間內完成,就會被executor中斷。
- 豐富的同步模式超越了java提供的互斥同步塊。這些同步模式包含了常見的俗語,如訊號量或同步柵欄。
- 高效的併發資料集合(maps, lists和sets)通過寫時複製和細粒度鎖的使用,使得在多執行緒上下文中表現出卓越的效能。
- 原子變數遮蔽開發者訪問它們時執行同步操作。這些變數包裝了通用的基本型別,比如Integers或Booleans,和物件引用。
- 大量鎖超越了內部鎖提供的加鎖/通知功能,比如,支援重入,讀寫鎖,超時,或者基於輪詢的加鎖嘗試。
作為一個例子,讓我們想想下面的程式:
注意:由於Java SE7引入了新的整數字面值,下劃線可以在任何地方插入以提高可讀性(比如,1_000_000)。
import java.util.*; import java.util.concurrent.*; import static java.util.Arrays.asList; public class Sums { static class Sum implements Callable<Long> { private final long from; private final long to; Sum(long from, long to) { this.from = from; this.to = to; } @Override public Long call() { long acc = 0; for (long i = from; i <= to; i++) { acc = acc + i; } return acc; } } public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newFixedThreadPool(2); List <Future<Long>> results = executor.invokeAll(asList( new Sum(0, 10), new Sum(100, 1_000), new Sum(10_000, 1_000_000) )); executor.shutdown(); for (Future<Long> result : results) { System.out.println(result.get()); } } }
這個例子程式利用executor來計算長整形數值的和。內部的Sum類實現了Callable介面,並被excutors用來執行結果計算,而併發工作則放在call方法中執行。java.util.concurrent.Executors類提供了好幾個工具方法,比如提供預先配置的Executors和包裝普通的java.util.Runnable物件為Callable例項。使用Callable比Runnable更優勢的地方在於Callable可以有確切的返回值。
該例子使用executor分發工作給2個執行緒。ExecutorService.invokeAll()方法放入Callable例項的集合,並且等待直到它們都返回。其返回Future物件列表,代表了計算的“未來”結果。如果我們想以非同步的方式執行,我們可以檢測每個Future物件對應的Callable是否完成了它的工作和是否丟擲了異常,甚至我們可以取消它。相比當使用普通的執行緒時,你必須通過一個共享可變的布林值來編碼取消邏輯,並且通過定期檢查該布林值來破壞該程式碼。因為invokeAll()是阻塞的,我們可以直接迭代Future例項來獲取它們的計算和。
另外要注意executor服務必須被關閉。如果它沒有被關閉,主方法執行完後JVM就不會退出,因為仍然有啟用執行緒存在。
Fork/Join 任務
概覽
Executors相對於普通的執行緒已經是一個很大的進步,因為executors很容易管理併發任務。有些型別的演算法存在需要建立子任務,並且讓它們彼此通訊來完成任務。這些都是”分而治之”的演算法,也被稱為”map and reduce”,這是參考函數語言程式設計的同名函式。想法是將資料區通過演算法處理分隔為更小切獨立的塊,這是”map”階段。反過來,一旦這些塊被處理完成了,各部分的結果就可以收集起來形成最終的結果,這就是”reduce”階段。
一個簡單的例子想要計算出一個龐大的整形陣列的和(如圖1)。由於加法是可交換的,可以拆分陣列分更小的部分,並且用併發執行緒計算各部分和。各部分和可以被加來從而計算出總和。因為執行緒可以獨立對一個數組的不同區域使用這種演算法操作。相比於單執行緒演算法(迭代陣列中每個整形),你將看到在多核架構中有了明顯的效能提升。
圖1:整形陣列中的部分和
通過executors解決上面的問題是很容易的:將陣列分為n(可用的物理處理單元)部分,建立Callable例項來計算每一部分的和,提交它們到一個管理了n個執行緒的池中,並且收集結果計算出最終結果。
然而,對其他型別的演算法和資料結構,其執行計劃並不是那麼簡單。特別是,識別出要以有效的方式被獨立處理的“足夠小”的資料塊的”map”階段並不能提前知道到資料空間的拓撲結構。基於圖和基於樹的資料結構尤為如此。在這些情況下,演算法應該建立層級”劃分”,即在部分結果返回之前等待子任務完成,雖然在像圖1中的陣列效能較差,但有好幾個併發部分和的計算的級別可以使用(比如,在雙核處理器上將陣列分為4個子任務)。
為了實現分而治之演算法的executors的問題是建立不相關的子任務,因為一個Callable是無限制的提交一個新的子任務給它的executors,並且以同步或非同步的方式等待它的結果。問題是並行:當一個Callable等待另一個Callable的結果時,它就處於等待狀態,從而浪費了一個機會來處理佇列中等待執行的另一個Callable。
通過Doug Lea努力填補了這一缺陷,在Java SE7中,fork/join框架被加到了java.util.concurrent包中。java.util.concurrent的Java SE5和Java SE6版本幫助處理併發,並且Java SE7的新增則幫助處理並行。
新增支援並行
核心的新增是新的ForkJoinPool執行者,專門執行實現了ForkJoinTask介面的例項。ForkJoinTask物件支援建立子任務來等待子任務完成。有了這些清晰的語義,當一個任務正在等待另一個任務完成並且有待執行的任務時,executor就能夠通過”偷取”任務,在內部的執行緒池裡分發任務。
ForkJoinTask物件主要有兩個重要的方法:
- fork()方法允許ForkJoinTask任務非同步執行,也允許一個新的ForkJoinTask從存在的ForkJoinTask中被啟動。
- 反過來, join()方法允許一個ForkJoinTask等待另一個ForkJoinTask執行完成。
如圖2所示,通過fork()和join()實現任務間的相互合作。注意fork()和join()方法名稱不應該與POSIX中的程序能夠複製自己的過程相混淆。fork()只會讓ForkJoinPool排程一個新的任務,而不會建立子虛擬機器。
圖2:Fork和Join任務間的協作
有兩種型別的ForkJoinTask的定義:
- RecursiveAction的例項代表執行沒有返回結果。
- 相反,RecursiveTask會有返回值。
通常,RecursiveTask是首選的,因為大部分分而治之的演算法會在資料集上計算後返回結果。對於任務的執行,不同的同步和非同步選項是可選的,這樣就可以實現複雜的模式。
例子:計算文件中的單詞出現次數
為了闡述新的fork/join框架的使用,讓我們用一個簡單的例子(計算一個單詞在文件集中的出現次數)。首先,也是最重要的,fork/join任務應該是純記憶體演算法,而沒有I/O操作。此外,應該儘可能避免通過共享狀態來進行任務間的通訊,因為這通常意味著加鎖會被執行。理想情況下,僅當一個任務fork另一個任務或一個任務join另一個任務時才進行任務通訊。
我們的應用操作一個檔案目錄結構並且載入每一個檔案的內容到記憶體中。因此,我們需要下面的類來表示模型。文件表示為一些列行:
class Document { private final List<String> lines; Document(List<String> lines) { this.lines = lines; } List<String> getLines() { return this.lines; } static Document fromFile(File file) throws IOException { List<String> lines = new LinkedList<>(); try(BufferedReader reader = new BufferedReader(new FileReader(file))) { String line = reader.readLine(); while (line != null) { lines.add(line); line = reader.readLine(); } } return new Document(lines); } }
注意:如果你對Java SE7比較陌生,你應該會對fromFlie方法中的亮點感到驚訝:
- LinkedList使用鑽石語法(<>)讓編譯器推斷出範型引數型別。因為lines是List<String>型別,所以LinkedList<>被擴充套件為LinkedList<String>。鑽石操作符使得範型處理更容易,其避免了重複型別,因為這些型別在編譯時就能被輕易的推斷出來。
- try塊使用了自動資源管理的語言特性。任何實現了java.lang.AutoClosable的類都可以在try塊中開啟。而不管是否有異常丟擲,任何在try塊中宣告的資源將會在執行離開try塊時合理地關閉。在Java SE7之前,正確地關閉資源很快變成巢狀的if/try/catch/finally塊的一張噩夢,而且經常很難寫正確。
一個資料夾時一個簡單的基於樹的結構:
class Folder { private final List<Folder> subFolders; private final List<Document> documents; Folder(List<Folder> subFolders, List<Document> documents) { this.subFolders = subFolders; this.documents = documents; } List<Folder> getSubFolders() { return this.subFolders; } List<Document> getDocuments() { return this.documents; } static Folder fromDirectory(File dir) throws IOException { List<Document> documents = new LinkedList<>(); List<Folder> subFolders = new LinkedList<>(); for (File entry : dir.listFiles()) { if (entry.isDirectory()) { subFolders.add(Folder.fromDirectory(entry)); } else { documents.add(Document.fromFile(entry)); } } return new Folder(subFolders, documents); } }
現在我們可以開始我們的主類了:
import java.io.*; import java.util.*; import java.util.concurrent.*; public class WordCounter { String[] wordsIn(String line) { return line.trim().split("(\\s|\\p{Punct})+"); } Long occurrencesCount(Document document, String searchedWord) { long count = 0; for (String line : document.getLines()) { for (String word : wordsIn(line)) { if (searchedWord.equals(word)) { count = count + 1; } } } return count; } }
occurrencesCount方法返回一個單詞在文件中的出現次數,利用wordIn方法產生一行內的單片語,它會基於空格或標點符號來分割每一行。
我們將實現兩種型別的fork/join任務。一個資料夾下的單詞出現次數就是該單詞在該資料夾下的所有的子資料夾和文件中出現次數的總和。因此,我們將用一個任務計數在文件中的出現次數和用另一個任務在資料夾下的計數,後者將forks子任務,然後將這些任務join起來,集合他們的結果。
依賴的任務關係很容易理解,如圖3所示,因為它直接對映底層文件或資料夾樹結構。fork/join框架通過在等待一個任務執行文件或資料夾單詞計數時可以通過join()同時執行一個資料夾任務,實現了並行最大化。
圖3:Fork/Join單詞計數任務
讓我們以DocumentSearchTask任務開始,它將計算一個文件中單詞的出現次數:
class DocumentSearchTask extends RecursiveTask<Long> { private final Document document; private final String searchedWord; DocumentSearchTask(Document document, String searchedWord) { super(); this.document = document; this.searchedWord = searchedWord; } @Override protected Long compute() { return occurrencesCount(document, searchedWord); } }
因為我們的任務需要返回值,因此它們擴充套件自RecursiveTask類,由於出現次數用long值表示,所以用Long作為範型引數。compute()方法是RecursiveTask的核心,這裡的實現就簡單的委派給上面的occurencesCount()方法。現在我們可以實現FolderSearchTask,該任務將對樹結構中的資料夾進行操作:
class FolderSearchTask extends RecursiveTask<Long> { private final Folder folder; private final String searchedWord; FolderSearchTask(Folder folder, String searchedWord) { super(); this.folder = folder; this.searchedWord = searchedWord; } @Override protected Long compute() { long count = 0L; List<RecursiveTask<Long>> forks = new LinkedList<>(); for (Folder subFolder : folder.getSubFolders()) { FolderSearchTask task = new FolderSearchTask(subFolder, searchedWord); forks.add(task); task.fork(); } for (Document document : folder.getDocuments()) { DocumentSearchTask task = new DocumentSearchTask(document, searchedWord); forks.add(task); task.fork(); } for (RecursiveTask<Long> task : forks) { count = count + task.join(); } return count; } }
該任務的compute()方法的實現簡單地對建構函式中傳遞的資料夾的每個元素fork出新的文件或資料夾任務,然後join所有的計算出的部分和並返回部分和。
對於fork/join框架,我們現在缺少一個方法來引導單詞 計數操作和一個fork/join池執行者:
private final ForkJoinPool forkJoinPool = new ForkJoinPool(); Long countOccurrencesInParallel(Folder folder, String searchedWord) { return forkJoinPool.invoke(new FolderSearchTask(folder, searchedWord)); }
一個初始的FolderSearchTask引導了所有任務。ForkJoinPool的invoke方法允許等待計算的完成。在上面的例子中,使用了ForkJoinPool的空建構函式,並行性將匹配硬體可用的處理器單元數(比如,在雙核處理器上該值為2)。
現在我們可以寫main()方法,通過命令列引數來獲得要操作的資料夾和搜尋的單詞:
public static void main(String[] args) throws IOException { WordCounter wordCounter = new WordCounter(); Folder folder = Folder.fromDirectory(new File(args[0])); System.out.println(wordCounter.countOccurrencesOnSingleThread(folder, args[1])); }
此示例的完整原始碼還包括更傳統的,基於遞迴實現的相同演算法,並執行在一個執行在單執行緒上:
Long countOccurrencesOnSingleThread(Folder folder, String searchedWord) { long count = 0; for (Folder subFolder : folder.getSubFolders()) { count = count + countOccurrencesOnSingleThread(subFolder, searchedWord); } for (Document document : folder.getDocuments()) { count = count + occurrencesCount(document, searchedWord); } return count; }
討論
在來自Oracle的Sun Fire T2000伺服器上進行了一個非正式的測試,該伺服器可以規定Java虛擬機器可用的處理器核數。上述的fork/join和單執行緒例項的不同版本都執行來查詢JDK原始碼檔案中import的出現次數。
這些不同版本都會執行好幾次,以確保Java虛擬機器熱點優化有足夠的時間來執行。對於2, 4, 8和12核最佳的執行時間被收集了起來並且在加速,那就是說,這些比值(單執行緒耗時/fork-join耗時)被計算出來了。圖4核表1反應了其結果。
如你所見,隨著處理器核數極小的增長就實現了近似線性地加速,因為fork/join框架關心的是最大化並行。
表1:非正式的測試執行時間和加速
Number of Cores | Single-Thread Execution Time (ms) | Fork/Join Execution Time (ms) | Speedup |
2 | 18798 | 11026 | 1.704879376 |
4 | 19473 | 8329 | 2.337975747 |
8 | 18911 | 4208 | 4.494058935 |
12 | 19410 | 2876 | 6.748956885 |
圖4:隨著核數(水平軸)增長而加速(垂直軸)
我們也可以通過讓fork任務不操作在文件級別,而是行級別來改進計算能力。這使得併發任務可能在同一文件的不同行執行。然而這將是牽強的。事實上,fork/join任務應該執行足夠的計算量,以克服fork/join執行緒池或任務管理開銷。行級別的操作很瑣碎,反而影響方法的效率。
附帶的原始碼還有一個基於整型陣列的歸併演算法fork/join例子,有趣的是它使用RecursiveAction來實現的。fork/join任務在呼叫join()時不會返回值。而是,這些任務共享可變狀態:待排序的陣列。實驗再次 表明隨核心數量增長,將實現近線性地加速。
總結
本文討論了Java中併發程式設計並強烈關注Java SE 7中所提供的新的fork / join任務,使得編寫並行程式更容易。這篇文章顯示,利用多核處理器,使用豐富的原語並組合它們編寫出高效能的程式,所有這些都無需處理執行緒的低級別操作和共享狀態同步。這篇文章通過一個既有吸引力又容易掌握的單詞計數例子闡釋了那些新APIs的使用,在非正式的測試中,隨處理器核數增長,獲得了接近線性的加速比。這些結果表面fork/join框架是多麼有用。因為我們沒又更改程式碼或調整Java虛擬機器硬體的最大化核心利用率。
你也可以應用該技術到你的問題和資料模型。只要你以”分而治之”的方式重寫你的演算法來釋放I/O操作和鎖,你將看到明顯的變化。
鳴謝
作者要感謝Brian Goetz和邁克Duigou在早期的文章中有用的反饋修正。他還要感謝Scott Oaks and Alexis Moussine-Pouchkine,在合適的硬體上執行這些測試。