並行程式設計框架 ForkJoin(介紹了一點原理,可擴充套件)
本文假設您已經瞭解一般並行程式設計知識,瞭解Java concurrent部分如ExecutorService等相關內容。
雖說是Java的ForkJoin並行框架,但不要太在意Java,其中的思想在其它語言環境也是同樣適用的。因為併發程式設計在本質上是一樣的。就好像如何找到優秀的Ruby程式設計師?其實要找的只是一個優秀的程式設計師。當然,如果語言層面直接支援相關的語義會更好。
引言
Java 語言從一開始就支援執行緒和併發性語義。Java5增加的併發工具又解決了一般應用程式的併發需求,Java6、Java7又進一步補充了一些內容。原來的工具主要是粗粒度的併發。比如每個web請求由一個工作執行緒處理,線上程池分配任務。而Java7中新引入的ForkJoin可以處理更細粒度的平行計算。
早期的時候都是單核cpu環境,如果不是多核環境下,執行緒/程序並不是真正的並行執行,主要用來表示非同步執行效果。單核cpu上,假如每個任務完全是cpu密集的(沒有等待),那麼這種偽併發並不會使計算變快。只有在真正的多核環境才能起到加速作用,而現在多核已經普及,甚至已經到了手機上!
介紹
ForkJoin是適用於多核環境的輕量級並行框架。目標是在多核系統下,通過並行運算,充分利用多處理器,提高效率與加速執行。
ForkJoin程式設計正規化:將問題遞迴地分解為較小的子問題,並行處理這些子問題,然後合併結果,如:
if (my portion of the work is small enough)
do the work directly
else
split my work into two pieces
invoke the two pieces and wait for the results
ForkJoin由一組工作執行緒組成,用來執行任務,核心是work-stealing演算法。可以有大量任務,但實際只有少量真正的物理執行緒,預設是機器的cpu數量,也可指定。很多其它工作的演算法都可以在此基礎之上進行。
雖然起初直接使用它的人可能不多,但將來會被很多框架在底層使用,因為是如此基礎,所以最終ForkJoin可能會無處不在。
一般而言,使用者只需要關心兩個方法fork() 和 join()。它們分別表示:子任務的非同步執行和阻塞等待結果完成。
ForkJoin框架的核心是ForkJoinPool類,實現了work-stealing演算法,用於執行ForkJoinTask型別的任務(也就是按照該演算法排程執行緒與任務,當然還負責解決好相關的一些其它問題)。
work-stealing演算法
work-stealing 是一種任務排程方法,由多個工作執行緒組成,每個工作執行緒用一個雙端佇列維護一組任務。Fork的時候是把任務加到佇列的頭部,而不像一般的執行緒池那樣是加到任務佇列末尾。工作執行緒選擇頭部最新的任務來執行。當工作執行緒沒有任務可執行時,它會嘗試從其它執行緒的任務佇列尾部竊取一個任務執行。如果沒有任務執行了並且竊取其它任務失敗,那麼工作執行緒停止。
這種方法的優點是減少了爭用,因為工作執行緒從頭獲取任務,而竊取執行緒從尾部竊取任務。另一個優點是遞迴的分治法使得早期產生的是較大的任務單元,而竊取到較大任務會進一步遞迴分解,因此也減少了尾部竊取的次數。另外,父任務很可能要等待子任務(join),所以從佇列頭部子任務開始執行也是一種優化。
總之,它會使用有限的執行緒執行大量任務,同時保持各執行緒的任務都處於繁忙的執行狀態,而儘量不讓執行緒處於等待狀態。為了做到這點可能會從其它執行緒的任務佇列中竊取任務來執行,所以叫work-stealing。
就像前面所說物理執行緒不能太多,過多的話切換管理開銷就會較大,還會消耗更多的記憶體等資源,並且沒有帶來任何好處。預設是用cpu數量的執行緒數,一般情況都比較合適(比如Runtime.getRuntime().availableProcessors()返回處理器的數量),但具體的數值還和任務自身的特點有關,可以通過不同引數測試比較一下。而任務可以是大量的,由每個執行緒的工作佇列維護。
ForkJoin是簡化了一些開發者的工作,如果不用ForkJoin,最原始的方式是自己手工切分任務並分別建立執行緒執行。
分治、並行、可伸縮的思考:
這三者關係很親密。分治思想(divide-and-conquer)是一種簡單樸素的思想,很多問題都可以這樣解決。ForkJoin就相當於分治法的並行版本。 分治本身只是解決問題的思想,既可以順序執行也可以並行執行,但是在並行環境中更加有效,因為可以並行處理子問題。而在並行方面,可並行處理問題要麼是彼此完全獨立的問題,要麼是可分解單獨處理的問題。可伸縮性又和能否並行處理緊密相關,因為如果不能並行處理就要受到單機處理能力的限制,也就難以伸縮了。
ForkJoin與MapReduce兩個平行計算框架的區別 ?
MapReduce是把大資料集切分成小資料集,並行分佈計算後再合併。
ForkJoin是將一個問題遞迴分解成子問題,再將子問題並行運算後合併結果。
二者共同點:都是用於執行並行任務的。基本思想都是把問題分解為一個個子問題分別計算,再合併結果。應該說平行計算都是這種思想,彼此獨立的或可分解的。從名字上看Fork和Map都有切分的意思,Join和Reduce都有合併的意思,比較類似。
區別:
1)環境差異,分散式 vs 單機多核:ForkJoin設計初衷針對單機多核(處理器數量很多的情況)。MapReduce一開始就明確是針對很多機器組成的叢集環境的。也就是說一個是想充分利用多處理器,而另一個是想充分利用很多機器做分散式計算。這是兩種不同的的應用場景,有很多差異,因此在細的程式設計模式方面有很多不同。
2)程式設計差異:MapReduce一般是:做較大粒度的切分,一開始就先切分好任務然後再執行,並且彼此間在最後合併之前不需要通訊。這樣可伸縮性更好,適合解決巨大的問題,但限制也更多。ForkJoin可以是較小粒度的切分,任務自己知道該如何切分自己,遞迴地切分到一組合適大小的子任務來執行,因為是一個JVM內,所以彼此間通訊是很容易的,更像是傳統程式設計方式。
ForkJoin框架基本結構:
ForkJoinPool本身實現了ExecutorService介面,負責排程執行ForkJoinTask。
ForkJoinTask是提交給ForkJoinPool 執行的任務,本身也實現了Future 介面。
ForkJoinTask有兩個子類RecursiveAction和RecursiveTask。 RecursiveAction 沒有返回值(只需fork);RecursiveTask有返回值(需要合併)。類似於Runnable和 Callable一樣。沒有返回值一般意味著所有子任務都執行完了即可,中間的子任務不需要join了。其實要不要返回值都可以實現,有返回值可以直接合並,沒有返回值可以把結果儲存在共享的資料上。
而我們要做的是實現自己要完成的任務,只需要繼承其一,並覆蓋抽象方法compute()。在這個方法中實現自己的任務,遞迴分解任務。
ForkJoinPool與一般的ExecutorService實現的差別?
ForkJoin實現了ExecutorService介面,這個介面就是用來把任務交給執行緒池中的工作執行緒去執行。ForkJoin也是一個ExecutorService,但區別在於ForkJoin使用了work-stealing演算法,見前面的介紹。普通的執行緒池是按FIFO的方式執行,而ForkJoin優先執行(由其它任務)後建立子任務。對於大部分會產生子任務的任務模式,ForkJoin的處理實現會很高效。如果設定了非同步模式, ForkJoin也可能適合執行事件型別(不需要join)的任務。
影響ForkJoin加速效果的因素
理想效果是核越多加速效果越好。但是並行不一定更快,引數不對還可能更慢:
1)併發數,即執行緒數。一般是可用的cpu數,預設就是這個,一般表現很好。
2)任務切分的粒度。如果切分粒度等於總任務量,一個任務執行,就相當於單執行緒順序執行。每個任務執行的計算量,太大的話加速效果有限,不能發揮到最好。相反,太小的話,消耗在任務管理的成本佔了主要部分,導致還不如順序執行的快。
需要適當平衡二者。因為還和任務本身的特定有關,所以可以做個基準測試比較一下。
而總的執行時間還與任務的規模有關。
任務粒度應該適中,多大合適?好像在什麼地方上看到說:經驗上單個任務為100-10000個基本指令,當然還和任務本身的特定有關。
個人感覺多核cpu只適用於解決計算密集型應用,因為實際問題可能IO等其他方面的瓶頸,多核也還是無法充分利用的。
使用ForkJoin的步驟:
ForkJoin框架替我們完成了一些工作,那麼我們使用時還要完成哪些工作:
1)如何執行單個任務。如果只切分出一個任務執行,就相當於單執行緒順序執行。
2)如何遞迴地切分任務(以及任務切分後是否需要合併結果)
3)切分粒度多少合適(最小任務單元)
這些具體表現在:繼承ForkJoinTask的一個子類,並實現抽象方法compute()。在這個方法中實現自己的任務,遞迴分解任務。
這些準備好之後就可以啟動了:建立一個表示全部任務的ForkJoinTask物件,建立一個ForkJoinPool的例項,把task作為引數執行ForkJoinPool的invoke方法。
在ForkJoin任務外部執行總任務:execute非同步執行任務,沒有返回結果void;invoke執行任務並等待返回結果,結果是特定型別;submit執行一個任務,返回ForkJoinTask(實際上是作為Future物件返回)。一般應該在外部使用invoke呼叫執行總任務。而execute和submit只是為了實現ExecutorService規定的相關語義,invoke是ForkJoin中特有的。
在ForkJoinTask內部遞迴執行的過程中:fork是非同步執行,invoke是等待任務執行完成。
具體例項:
多看看具體示例比較好。
1)合併排序示例:
合併排序是常見的排序演算法之一。示例實現了對一個整數陣列的合併排序。同時還演示了不同併發數(執行緒數)與不同陣列大小的組合測試。程式碼在<jdk_ home>/sample/ForkJoin/ 中。
2)把圖片模糊處理示例:
一個圖片可以被表示為一個m*n大小的整數陣列,其中每個整數表示一個畫素(的顏色)。模糊處理之後的影象還是一個同樣大小的整數陣列。處理過程是把原來的每個畫素與周圍畫素的顏色求平均值即可。如果順序執行就是從頭到尾對每個畫素執行一次計算得到目標畫素,因為每個畫素的計算是獨立的,所以可以把這個整數陣列切分成一塊一塊的子陣列(即子任務)分別執行。任務不適合切分的過小,所以設定了一個常數閾值10000,大小小於10000的子陣列就直接執行,否則對半切分為兩個子陣列的任務分別執行。文章 原始碼
在我自己的機器上i3處理器 (i3,4cpu),並行確實快了不少。
其它的例如:求最大值max、平均值avg、求和sum等聚合函式都是可以分解計算的。
示例中都是對陣列的處理,比較常見的是對陣列、集合進行並行地處理操作,但也不限於此。
網上有一些Fibonacci 的示例,但這些示例並不適合展示ForkJoin。
Doug Lea與JSR-166
說到Java併發程式設計,就不能不說到Doug Lea與JSR-166。Doug Lea是併發程式設計方面的專家,紐約州立大學奧斯威戈分校的計算機教授。曾是JCP執行委員,是JSR-166的leader。JSR-166就是負責向Java語言中新增併發程式設計工具的,即我們見到的java.util.concurrent包(及子包)。還是《 Concurrent Programming in Java Design Principles and Patterns》一書的作者,是這方面最早的書。他還是知名的記憶體分配方法dlmalloc的作者,這是C語言中的動態記憶體分配函式malloc的一種普遍使用的實現。
參考資料:
其它並行框架: