《實戰Java高併發程式設計》讀後感
寫在前面無關的內容
白駒過隙,看下日曆已經畢業4年多,加上在大學裡的4年,算算在計算機界也躺了八年,按照格拉德韋爾的1萬小時定律差不多我也該成為行業的專家了,然後並沒有。當看著“什麼是Java?”、“什麼是程式?”、“多執行緒是什麼?”、“怎麼構建一個合理的大型網站?”、“怎麼保證系統的穩定執行”這些耳熟能詳的問題時,就知道前方的路還有很遠很遠,這些問題也許我一直無法給出確切的回答,但至少希望給別人解釋多執行緒時,不會說“多執行緒就是很多執行緒一起跑”這種文字級別的說明。
初讀此書的感受
每天晚上看一點,零零碎碎的花二十天左右差不多算是翻完了。總的來說講的是通俗易懂,用例和程式碼選擇得當;但不管怎麼說,這是一本純理論的解釋書籍,雖說中間穿插了很多樣例,也都基本上是為了演示理論構造出來的,自然就少了些生動;這種純理論的介紹對我這種記憶不佳的人簡直就是成噸的傷害,以至於前天才看完,今天上面的很多細節就被忘的差不多了,甚至懷疑要不了多久我都是否記得看過這本書。 言歸正傳,看書本來就不是為了記住書名和作者名,真正要記住的是作者想通過書來表達的東西,或者是看書者想通過看書得到的收穫,反正我是這麼認為的。當時買這本書看就是想系統性的瞭解Java高併發這個概念,什麼是Java高併發,Java高併發的原理、怎麼實現Java高併發,哪裡用到Java高併發,我們怎麼使用Java高併發等,關於這些核心問題書中也都或多或少的給出了一定的回答;可能是此書專注於理論解釋的緣故,對“Java高併發的原理”、“怎麼實現Java高併發”講的比較詳細,而其它的則講的粗略些。對於書上的理論、方法是不是最好的,我沒做考究,當然目前我的水平也不夠;純從感覺上講,書中的理論嚴謹,方法簡潔,至少我是還沒發現有什麼不妥,或者是需要改進的地方,目前權且當它是最好的吧,以後會是啥樣,那就是以後的事了。一些個人的理解
什麼是Java高併發?
Java高併發就是,用Java語言在短時間內處理大量的業務。
Java高併發的原理?
並行的設計模式和對多執行緒的良好支援,後面詳細講到,也是全文的重點。高併發的背景:
計算機經過這麼多年的發展,如果當前的計算理論或者晶片的生產工藝沒有本質的突破,那麼單個CUP的計算能力可能已經走到盡頭了,而且目前我們還沒有看到突破的可能。CUP的效能是遇到了不可逾越的瓶頸,但我們的硬體工程師並沒有放棄對效能的提升,於是他們想出了一個十分“機智”的辦法,一個CPU的效能是固定的,那我多弄幾個CPU放一起組成一個多核CPU,效能不就提高了幾倍麼。
並行模式:
一個16核CPU,總的來看效能確實比單核CPU效能提高了16倍,這很顯而易見;但CPU效能提高16倍的前提是這CPU的16個核都同時勤奮的工作,然後目前計算機顯然還沒這麼智慧,並不會把我們的給它的任務拆分到16個不同的CPU核心上執行,而是選一個CPU核心來執行完成任務;如果這樣來看,那CPU核心再多也沒用啊,幹活的就那幾個,只是多了些吃瓜的群眾。為了更好的將這些吃瓜的群眾也派上用場,加快我們的任務執行速度,既然計算不會自動幫我們分拆任務,那我們先將任務分拆好,然後再給計算機執行,這樣總能接受了吧。
通俗的說,就是在業務邏輯層將業務拆分成很多子業務,然後再將這些大量的子業務交給計算機處理,這樣計算機就可以用每個CPU核心分別去處理這些子業務,只要拆分的合理,那麼就能將合成的多核CPU的效能發揮出來了。由於是CPU的每個核心都在同時執行,於是就把這種模式稱之為--並行模式。
Java的多執行緒:
我們都知道執行緒是程式執行的最小單位,也就是說一個CPU核心在一個時刻只能執行一個執行緒。我們把任務拆分成子任務,然後交給CPU執行,實質上就是把一個大的程式(可以看做是一個需要執行很久的執行緒),拆分成多個小的執行緒,交給作業系統,然後由作業系統排程分配CPU核心來分別執行這些執行緒,當這些執行緒都執行完成後,我們的程式也就執行完成了。而Java提供一整套對執行緒進行管理的方法,包括建立、啟動、執行、掛起、結束等,我們通過Java提供的方法,能夠較方便的實現我們對執行緒的拆分,以及協同執行的控制,對於這一整體解決方案,稱之為Java多執行緒。Java多執行緒是實現Java程式高併發的核心,比較複雜,下節詳細講解。
總結:
這一節我們談到了三個重要概念,高併發、並行模式、多執行緒。下面總結下這三者的關係。
高併發是我們要實現的目標,要在較短的時間內處理掉大量的業務;
並行模式是實現我們高併發的一種方案,這個主要是基於現實情況考慮得來的。假如我們的CPU執行能力能提高几個數量級,一個CPU在短時間內就能處理掉大量業務,那麼就沒有並行的必要了;主要是當前單個CPU的執行能力有限,只能通過多個CPU同時執行,才能在短時間能完成大量業務,所以就有了並行;
多執行緒是實現並行模式的軟體基礎(多核CPU是實現並行的物理基礎),因為執行緒是CPU核執行的最小單位,如果只有一個執行緒,再多CPU也無法並行;
由於我們是軟體開發,所以高併發問題也就具體化到了解決程式的多執行緒問題,而在這些前面加上限定詞Java,就是說我們是用Java這個工具來解決這些問題的。
怎麼實現Java高併發?
高併發的原理我們已經明白,簡單的說就是把一個大任務拆分成很多小任務,然後完成這些小的任務就可以了。大任務:簡單的理解就是要做很多、或者是很複雜的事,這裡可以等同於我們的高併發;小任務:簡單的理解就是做一件簡單的事,這裡可以等同於我們的執行緒;這種等價是十分不嚴謹的,但這也算是對複雜概念的一種白話解釋,方便記憶和理解。
聽這麼一說,感覺要實現高併發也很簡單啊,就把事情拆分下交給計算機就ok了,然而難就難在這拆分上。至於為啥很難拆分,首先讓我們瞭解下程式的執行流程:
1讀取資料--》2快取到記憶體--》3快取到cache--》4CPU運算--》5寫回cahce--》6寫回記憶體--》7持久化資料,重複上述流程,直到完成程式完成。
從執行流程我們不難看出,程式下一步的執行是依賴上一步的執行結果,比如2沒有執行完成,3是無法執行的。如果不對我任務進行拆分我們是很容易做到這一點的,因為整個軟體設計就是嚴格要求程式是有序的,為此還要求程式只能有順序、選擇、迴圈這3種結構;但一旦對任務進行了拆分,那麼一切都變了。比如我把一個任務拆分成兩個子任務A和B,假如B的運算資料是A的運算結果,那麼B就需要A執行完成6後,才能開始3,否則就會出現錯誤。而我們不管對A、或者B的程式單獨做什麼樣的設計,都無法保證B能正確執行,要保證B的正確執行,我們就需要實現A、B兩個程式的交叉控制;我們都知道,一個程式的耦合的越低越好,那麼我們不難得出:在拆分的時候要儘可能的減小子任務間的耦合度。
減少耦合是我們拆分時的第一目標,如果能消除耦合當然是最好的,但這是很難、幾乎不可能做到(或者是消除耦合的代價很大不能接受)。這個時候我們就不僅僅是對大任務的拆分,還要考慮拆分後的執行;反映到Java高併發上,就是把高併發的任務合理拆分成多執行緒,並處理好執行緒間的資源分配、執行緒的協同執行等問題。要保證高效率,又要n多執行緒執行不衝突,這是一件很難辦到的事;不過好訊息就是,Java提供了大量的方法和工具來幫我們解決這個問題,下面將重點介紹這些方法和工具。
Java多執行緒的基礎
首先我們瞭解下執行緒的生命週期,建立---》執行--》掛起--》....執行--》結束;
建立執行緒Java提供了2種方法:extends Thread; implements Runnable(推薦使用);
啟動執行緒:Object.start();啟動後執行緒進入執行狀態;
掛起執行緒:分為主動掛起,Object.wait()、Thread.sleep();被動掛起,synchronized、lock等;
喚醒執行緒:不同的掛起方式,分別有對於啟用方式wait(notify),sleep(時間引數),synchronized(鎖物件),lock(unlock);
結束執行緒:執行完成後自然退出(常用);設定狀態量,讓執行緒結束執行而退出;
有了這些基礎的方法後,我們理論上是能開發多執行緒程式了,但如果用這麼原始的方法去開發幾萬、幾十萬行程式碼的程式,這是一件讓人無法想象的事。於是Java再進一步,給你我們提供了大量實用的API和框架。
執行緒間的資源共享
程序是作業系統分配和排程的基本單位,在當代面向執行緒設計的計算機結構中,程序是執行緒的容器;簡單的說就是作業系統只負責把資源分配給程序,然後執行緒使用程序裡的資源。既然執行緒使用的是程序裡的資源,而不是僅僅使用執行緒獨有的資源,如果有多個執行緒,那麼自然就會存在一個資源共享的問題,否則臨界資源同時被多個執行緒使用,那肯定是要出錯了。為了解決資源的共享問題,JDK提供了很多方法,不過我只寫我認為重要、有用的。
1、Lock:
解決臨界資源問題最直接的方法就是加鎖,Lock作為synchronized功能的擴充套件,比synchronized更加靈活好用,並且也擴充套件了一些相關的子類針對特定場景的使用,在JDK底層中有大量應用;不過這還是屬於基礎方法,使用複雜,建議謹慎使用。
2、執行緒池:
為了避免頻繁的建立和消耗執行緒,我們可以對建立的執行緒進行復用。這裡和資料庫連線池類似,當系統使用資料庫時並不是建立一個新的連線,而是從連線池中獲得一個可用的連線;反之,關閉連線時也不是真的關閉這個連線,而是將連線還給連線池,通過這種方式,既可以節約建立和銷燬物件的時間,又可以控制系統連線的數量,保證系統穩定執行。使用執行緒池我們也可以獲得類似的好處,將執行緒統一管理,既有利於系統執行效率的提高,也可以減少不同使用者獨立建立、銷燬執行緒可能帶來的風險,從而提高系統執行的穩定性。
為了實現執行緒池的功能,JDK提供了一套Executor框架,幫助我們實現執行緒池的功能。在實際應用中我們也應該使用執行緒池來對我們的執行緒進行管理,避免對執行緒的手動管理。關於Executor具體內容和使用會在附錄中有一篇單獨介紹。
3、併發集合:
在Java基礎裡我們知道,JDK給我們提供了一套常用的集合類,比如其中List、Map就給我們的程式設計帶來了極大的方便。很不幸的是,這些常用的集合類都是非執行緒安全的,比如兩個執行緒都在使用map.put(k, v)時,可能只有一個執行緒成功,這對於我們程式肯定是無法接受的。為了在多執行緒中我們也能方便的使用這些常用的集合類的功能,JDK提供了幾套不同的解決方案。
方法一:collections.synchronizedMap(new HashMap());、collections.synchronizedList(new LinkedList<String>()),使用collections集合工具包裡的方法將非執行緒安全的集合類包裝成執行緒安全的集合類。該方法的內部實現是mutex狀態鎖,可以滿足執行緒安全要求,但效能不優;
方法二:JDK單獨為實現執行緒安全重新提供了執行緒安全的集合類,ConcurrentHashMap、CopyOnWriteArrayList、ConcurrentLinkedQueue、BlockingQueue等,這些集合類對多執行緒的支援良好,效能優異,唯一不好的就是實現複雜。
關於併發集合的具體內容和使用也會在附錄中有一篇單獨介紹。
4、鎖的優化:
鎖是最常用的同步方法之一,在高併發的環境下,激烈的鎖競爭會導致程式效能下降。這裡解釋下高併發下,鎖與效能的關係。通過上面論述我們知道,高併發其實就是通過多執行緒,讓多個CPU核同時執行,來加快執行速度;但是由於鎖的存在,執行緒需要獲取鎖資源後才能執行,如果有多個執行緒同時需要鎖資源,那麼同一時刻只能有獲取鎖的那個執行緒執行,其它沒能獲取鎖資源的執行緒則需要等待;這樣就不能完全發揮出多執行緒的優勢,系統性能也就降低了。所以為了提高高併發的效能,需要我們對鎖進行合理優化。
下面介紹幾條鎖優化的建議。
減小持鎖時間:比如,fun(){ A(); B(); C(); },如果只有B()方法是需要同步的,那就只對B()加鎖就可以了,而不需要對整個fun()加鎖,這樣執行緒對鎖的持有時間明顯變短。
減小鎖粒度:比如,ConcurrentHashMap,我們HashMap內部是分段,當我們get()、put()時,一次只會操作其中的一段,這個時候顯然我們只需要對需要操作的這一小段加鎖就可以保證執行緒安全了;相對於對HashMap加鎖,對段的加鎖顆粒更小,鎖的競爭也就降低了。
讀寫分離鎖:比如,i是臨界資源,但相對於讀取,i並不是臨界資源(如果不修改i,n個執行緒同時讀取i,讀到的都是i,不會有執行緒安全問題);但是寫i就是臨界資源了,需要鎖的控制(因為存在一個寫的覆蓋問題,以及因為寫,造成的髒讀的問題);所以將讀和寫分離,能減少鎖的競爭。
鎖分離:比如,LinkedBlockingQueue,take()和put()方法,雖說都修改了佇列,但它們修改的位置不同,理論上兩者不存在衝突,所以用兩個鎖分別控制take()和put(),這樣削弱了鎖的競爭。
鎖粗化:因為申請鎖、釋放鎖都是要花時間的,比如,一次操作裡確實是一直需要鎖A,那麼就沒必要一會申請鎖A,一會兒又釋放鎖A,就一直拿著用完就得了。
關於鎖的優化,就兩個方向,一個是怎麼樣讓鎖更小(鎖的時間、鎖的物件、鎖的功能),讓鎖更小的目的就是降低系統對鎖的競爭;另一個方向就是讓鎖更大,讓鎖更大的目的是減少不必要的加鎖、釋放鎖的時間。效能優化需要我們根據執行的實際情況來改進,那個地方阻礙了系統性能的發揮,我們就應該做相應的優化。
一種很好的新思路ThreadLocal。
之前我們一直講鎖的優化,但想想如果我們直接都不用鎖了,那不就不存在競爭,效能最好了。這要從鎖的起源說起,加鎖我們是為了解決臨界資源的問題,如果我增加臨界資源的數量,一直增加到臨界資源的數量和執行緒數量一樣多,這樣不就不存在臨界資源競爭的問題了,於是就可以不需要鎖,自然就不需要解決鎖的問題了。關於ThreadLocal的方式,說出來很容易懂,只是有時在實際中往往會忽視掉。還有一點要注意的是,ThreadLocal只是提供一個簡單的容器,為每個執行緒都分配資源需要業務層程式碼來完成。這裡也體現了計算機裡的一個重要思想,即時間和空間是可以互換的,只是使用者需要控制好轉換的方向和量。
5、無鎖策略:
無鎖策略實質上是一種樂觀鎖思想,即認為執行緒是不會衝突的,既然不會發生衝突,那自然就不需要等待、加鎖來保證執行緒安全了,所有的執行緒都直接執行就完了。如果真的發生衝突了,那發生衝突的執行緒重新執行,直到沒發生衝突執行完成。
那問題是我們怎麼實現沒衝突就執行完成執行緒,有衝突就重新執行執行緒呢?這裡就使用到一個重要技術:比較交換技術(CAS Compare And Swap)。
CAS演算法:引數CAS(V,E,N),V要更新的變數,E變數的預測值,N新值;當V的值和E的值相同時,則將V的值更新為N的值,不同則放棄更新。通俗的說就是隻有當變數當前的值和我預測是一致時,我才修改。而且現在大多數處理器已經支援原子化CAS指令,效能優異,而且執行緒安全。
有了CAS技術後,就可以來實現我們的無鎖策略了:當我線上程中需要修改一個共享變數時,可以先讀出變數,快取到執行緒獨有的記憶體中,再對變數運算後;把變數、快取值、運算後的結果作為引數傳給CAS;如果CAS執行成功,那我們執行緒就完成了,如果CAS執行失敗,那麼我們就放棄本次操作,重新執行執行緒。
從無鎖策略原理我們知道,無鎖策略既沒有鎖的開銷,也不會切換執行緒,並且還有處理器的底層支援,顯然在高併發效能上是優於有鎖的程式;而且CAS策略也保證了每次一定會有一個執行緒執行成功,這樣也不存在死鎖、阻塞的問題。
寫了一大堆無鎖策略的優點,確實在併發效能上很有優勢;缺點呢就是我沒寫,並且也不打算寫的,實現複雜。JDK提供了無鎖策略實現的併發包,在java.util.concurrent.atomic工具包裡有相關方法,可以直接使用;關於無鎖策略的具體實現,參照書上Vector吧。
執行緒的協同執行(第5章)
在上節我們詳細講解了怎麼解決執行緒間的競爭問題,保證了多執行緒的安全,使多個執行緒能夠同時穩定執行。那麼怎麼讓這些執行緒更好的協同執行來完成我們大併發的任務呢?這節我們將講解一些常見的並行模式的設計方法,通過對這些經典的設計模式的學習,來提高我們並行程式的設計能力,讓我們設計出來的多執行緒執行的更好,最大的發揮出多執行緒的效能;最後通過介紹Java重要的NIO和AIO讓我們對多執行緒的使用有個更深入的認識。
1、常見的並行模式
單例模式:作為最常見的、最簡單的模式之一。核心思想就是將構造方法私有化,保證無法建立該類物件;然後對外提供一個獲取靜態例項的靜態方法,將靜態例項的建立隱藏到類裡面完成;這樣就確保了系統只有一個該靜態例項。
不變模式:不變模式的核心是物件一旦建立,就不允許修改。實現的關鍵字是final,用來修飾class時表示該類不能被繼承,防止通過繼承子類的修改;用來修飾變數時,表示該變數只許在物件構造時被賦值一次,不允許再修改。不變模式天生對多執行緒友好,比如說Java的基礎資料型別和String使用的都是不變模式,使用不變模式後,所有的例項方法均是不需要進行同步操作(例如:String.subString(),在擷取字串時就不用擔心別的執行緒此時修改了該字串;Long l=0L,也不用擔心受別的執行緒影響,但long l=0L就沒那麼幸運了),保證了多執行緒下的效能。
生產者-消費者模式:生產者-消費者的核心思想就是通過共享記憶體緩衝區,避免了兩者的直接通訊,實現生產者、消費者解耦。這也是並行模式的核心設計思想之一,實現執行緒的解耦。通常用來充當共享緩衝區資料結構有BlockQueue(BlockQueue是阻塞佇列,ConCurrentLinkedQueue為非執行緒阻塞佇列),即生產者負責將資料寫到BlockQueue裡,消費者直接到BlockQueue裡讀取資料,BlockQueue本身是JDK提供的一款多執行緒安全的阻塞佇列,能較好的實現執行緒安全和同步。但BlockQueue底層還是基於鎖的實現機制,完全使用鎖和阻塞等待來實現執行緒的同步,高併發效能並不是十分優異。我們知道ConCurrentLinkedQueue底層是用CAS策略實現的,效能優異,但這裡並不適合用來做共享緩衝區,主要是因為ConCurrentLinkedQueue是非執行緒阻塞,如果用來做緩衝區,需要業務層程式碼控制迴圈監控佇列,使用不方便,也影響了效能。那麼有沒有一種使用簡便,底層也是基於CAS策略實現的共享資料通道呢?Disrupt無快取框架提供了我們需要的共享資料通道,具體實現太複雜,就不研究了,使用十分方便,參加下官方API和樣例即可。
Future模式:Future模式核心思想是非同步呼叫,說到非同步呼叫大家最熟悉的肯定是$.ajax();Future模式原理與之相同,唯一不同的是Future會立即返回一個偽造的資料,等實際業務處理完成後再返回真實的資料。這種非同步呼叫,完全無阻塞,在大併發下能更好的提高系統性能。關於非同步與同步的好處,本身就是一個很大的話題,這裡我們知道它帶來的好處,以及提高系統性能的原因就好了。
2、網路NIO和AIO
IO:IO是計算機經典的問題之一,我們知道計算機執行需要連線不同的裝置,但這些裝置的速度完全不在一個等級,其中IO是最突出的例子。例如CPU一個時鐘週期0.3ns,cache(1-10ns),記憶體100ns,SSD固態硬碟50*1000ns,普通磁碟500*1000ns,網路IO速度和普通磁碟一個等級。上面是整理的當前各種電腦器件的大致速度。CPU的速度與磁碟速度相差都達到了6個數量級,在Java標準IO中,一個執行緒負責網路資料的讀取、資料的運算、結果的返回,這樣如果程式中有大量的IO操作,那麼CPU基本上就長時間處於無效的等待狀態了(等待IO裝置讀取資料),極大的降低了系統的效能。
NIO:NIO是替代Java IO的一套新機制,核心思想就是通過實現IO準備(即資料寫到緩衝區,但不一定全部讀寫完成)和執行緒處理的分離,來解決IO速度與CPU速度不匹配的問題。具體實現策略就是,用Channel作為資料快取工具,單獨建立一個或者少量的執行緒專門負責Channel的管理,當Channel裡的資料準備好了後會通知對應的執行緒進行資料處理。這樣就避免了每個執行緒都去等待IO的問題,把IO等待操作交給少量的執行緒去處理,實際的執行緒只是等到IO資料準備好了後再開始執行,這裡很大的提高了系統的效能。
AIO:NIO在網路操作中提供了非阻塞方法,但NIO的IO行為還是同步的,只是這個非阻塞是由少數的“管理執行緒”來實現的。AIO則在NIO的基礎上更進一步,它不是IO準備好時通知執行緒,而是在IO操作已經完成後(完成了整個IO操作),在給執行緒發通知。此時執行緒是完全不是阻塞,我們的業務邏輯將變成一個回撥函式,等待IO完成後,由系統自動觸發。很明顯AIO設計模式更符合IO業務場景的需求,實現起來也更簡單,是我們應該重點理解的。插說一句,實現AIO回撥很明顯的用到了Future模式,這也是對理論的很好的一個應用。
哪裡用到Java高併發?
這裡借用下Linus Torvalds關於並行模式的觀點,Linus認為並行模式只有在伺服器端開發和圖片處理這兩個領域有廣泛的應用。而Java作為最廣泛的伺服器端開發語言之一,Java高併發在伺服器端的開發自然是大有用處。當前網際網路系統主流採用的都是《伺服器--客戶端》模式,這時候往往是一個伺服器對應成千上萬的客戶端,此時對伺服器的效能要求極高,要求伺服器在短時間內處理大量的客戶端的請求,於是也就有了我們高併發的要求,這也是Java高併發當前的主要應用。
怎麼使用Java高併發?
即使我們學習並掌握了上面關於Java高併發這麼多的知識,但寫出一個正確的、高效能並且可擴充套件的高併發程式依然是很困難的,在實際開發中更多的還是使用優質的框架來做開發,這樣可以大大的降低我們開發難度和提高程式的穩定性、安全性、效能等等,在附錄中會講解Akka這個高併發框架。可能初學者會好奇,既然不用,那還大篇幅的去將高併發的實現原理、實現方法幹嘛,直接上Akka框架不就得了。其實這裡是對程式設計的一個誤解,以為程式設計就是寫程式碼,就是調API,其實不然,核心還是設計、分析、綜合處理等,程式碼只是具體的一種實現方法。比如如果對多執行緒沒有一個實質的瞭解,可能在業務的拆分上就很不合理,這樣是用什麼先進的框架也彌補不了的,這也是花大篇幅去解釋基本原理的原因。
書中重要知識附錄
這部分是單獨講解某個內容的,以講清核心原理為主,中間的部分方法、以及知識點有很多的忽略,詳情請引數書上的對應章節。
執行緒池
Executor框架
Executor採用工廠模式提供了各種型別的執行緒池,是實際使用中我們就直接從Executor中獲取我們想要的執行緒池,拿來直接使用即可。下面簡單的介紹下Executor提供的五大類執行緒池。
newFixedThreadPool()方法:該方法返回一個固定數量的執行緒池;
newSingleThreadExecutor()方法:該方法返回只有一個執行緒的執行緒池;
newCachedThreadPool()方法:該方法返回一個可根據實際情況調整執行緒數量的執行緒池;
newScheduledThreadPool()方法:該方法返回一個ScheduleExecutorService物件,可以指定執行緒數量;
newSingleThreadScheduledExecutor()方法:該方法返回一個ScheduleExecutorService物件,執行緒數量為1;
上面執行緒池分兩大類,一類是無計劃的任務,提交就會執行,這類業務應用很廣泛;另一類是有計劃的任務,提交後會按照設定的規則去執行,這種應用的場景相對少一些,不過對有些業務是必須的,比如:我們系統晚上需要清空使用者的狀態、優惠券到期了自動提醒等等,用到的就是這類計劃任務,常見的有spring task。下面分別舉例演示兩種執行緒池的使用。
1、newFixedThreadPool()固定大小的執行緒池
package concurrent.threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadDemo {
public static void main(String args[]) {
// 建立任務物件
MyTask task = new MyTask();
// 建立固定數量執行緒池
ExecutorService es = Executors.newFixedThreadPool(5);
for(int i=0; i<10; i++) {
// 向執行緒池裡提交任務
es.submit(task);
}
}
public static class MyTask implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ": Thread ID: " + Thread.currentThread().getId());
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
2、newScheduledThreadPool()計劃任務
package concurrent.threadpool;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduleThreadDemo {
public static void main(String args[]) {
// 建立任務物件
MyTask task = new MyTask();
// 建立任務排程計劃物件
ScheduledExecutorService ses = Executors.newScheduledThreadPool(100);
// 設定任務與執行計劃
ses.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
}
public static class MyTask implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis()/1000 + ": Thread ID: " + Thread.currentThread().getId());
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
大家可以看的出來這兩段程式碼都非常簡單,都是建立任務物件、獲取Executors提供的執行緒池物件、將任務物件繫結到執行緒池物件上。通過Executors提供的不同策略的物件,就能快速實現我們對執行緒的控制。執行緒池的內部實現
Executor為我們提供了功能各異的執行緒池,其實其內部均是由ThreadPoolExecutor實現的,我們詳細瞭解下ThreadPoolExecutor實現原理不但對我們使用理解Executor提供的執行緒池大有幫助,也讓我們能根據實際情況自定義特定的執行緒池。
首先介紹下ThreadPoolExecutor類最核心的構造方法,
public ThreadPoolExecutor(
int corePoolSize, // 指定執行緒池中執行緒的數量(匯流排程量可大於等於這個值)
int maximumPoolSize, // 指定執行緒池中最大執行緒數量(匯流排程量不可能超越這個數值)
long keepAliveTime, // 超過corePoolSize數量的空閒執行緒,存活的時間
TimeUtil unit, // keepAliveTime的單位
BlockingQueue<Runnable> workQueue, // 任務佇列,被提交但為執行的任務
ThreadFactory threadFactory, // 執行緒工廠,用於建立執行緒
RejectedExecutionHandler handler // 當workQueue佇列滿的時候的拒絕策略
)
看到corePoolSize和maximumPoolSize的含義,應該很容易通過設定引數的不同,得到Executors提供的執行緒池物件。該方法一共七個引數,前四個很簡單,我們都會使用,第六個一般使用的是JDK預設提供的,剩下的就只有workQueue和handler了。workQueue:存放的是提交的任務,例如:es.submit(task);在樣例中提交了10次task,但執行緒只有5個,於是就有5個提交但沒開始執行的任務存到了workQueue裡啦。既然是一個存放任務的佇列,我們知道實現佇列的方式有多種,比如:ArrayBlockQueue、LinkedBlockQueue、PriorityBlockQueue等等,選擇不同的佇列就會帶來不同的問題。ArrayBlockQueue,存在一個任務過多超出佇列長度;LinkedBlockQueue,接受過多的任務可能會佔用太多記憶體,造成記憶體崩潰等等。這裡介紹下,newFixedThreadPool和newSingleFixedThreadPool使用的都是LinkedBlockQueue,newCacheThreadExecutor使用的SynchronousQueue佇列。關於佇列的選擇是要根據實際情況來確定,這也是自定義執行緒池的核心。
handler:拒絕策略,實際上是一種補救措施,就是當超出了workQueue臨界值了,我們怎麼讓我們的系統不至於崩潰。JDK內建的處理方法有AbortPolicy,丟擲異常阻止程式(除非是安全性要求極高,否則在大併發情況下使用這種做法不是很明智);DiscardPolicy,丟棄無法處理的任務(如果允許丟棄,這是不錯方案);DiscardOledesPolicy:也是丟棄任務,只不過丟棄的是佇列最前的一個任務。由於上面策略都是實現RejectExecutionHandler介面,我們也可以實現該介面自定義拒絕策略。
自定義執行緒建立
package concurrent.threadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo {
public static void main(String args[]) {
// 建立任務物件
MyTask task = new MyTask();
// 獲取自定義執行緒池
ExecutorService es = getMyThreadPool();
for(int i=0; i<20; i++) {
// 向執行緒池提交任務
es.submit(task);
}
}
// 自定義執行緒池,我們建立一個執行緒數固定的執行緒池
public static ExecutorService getMyThreadPool() {
ExecutorService es = new ThreadPoolExecutor(
// 設定執行緒池大小
5, 5, 0L, TimeUnit.MILLISECONDS,
// 設定快取佇列
new LinkedBlockingQueue<Runnable>(5),
// 設定執行緒工廠
Executors.defaultThreadFactory(),
// 設定拒絕策略
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + " is discard! "); // 輸出日誌後直接丟棄任務
}
});
return es;
}
public static class MyTask implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ": Thread ID: " + Thread.currentThread().getId());
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
從上面程式的執行結果我們可以看到,10任務被執行(因為執行緒池有5個執行緒,快取佇列也能快取5個),10任務被丟棄,符合我們預期。這個樣例十分簡單,只是通過這個樣例展示怎麼去自定義一個執行緒池,具體的執行緒池定義,我們要根據實際情況,設定傳入的引數即可。Fork/Join框架
上面我們詳細介紹了執行緒池的原理,還是那句話,學底層原理是拿來做設計,並不是讓直接去使用。Fork/Join線上程池的基礎上,做了更近一步的封裝,對執行緒的開啟、分發做了優化,使系統更穩定。另外補充下,Fork/Join還涉及到關於多執行緒的一個重要思想:“分而治之”,通俗的將就是將複雜的問題拆分成多個簡單問題,分開處理。下面通過一段樣例瞭解下Fork/Join。
package concurrent.threadpool;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class ForkJoinDemo extends RecursiveTask<Long> {
private static final int THRESHOLD = 10000; // 門閥值,當大於這個值才進行拆分處理
private long start; // 數列的起始值
private long end; // 數列的結束值
public ForkJoinDemo(long start, long end) {
this.start = start;
this.end = end;
}
// 對數列進行求和
@Override
protected Long compute() {
// 定義求和物件
long sum = 0;
if((end - start) < THRESHOLD) {
// 當求和數列的數量小於門閥值,則直接計算不需要分拆
for(long i=start; i<=end; i++) {
sum += i;
}
}
else {
// 當求和數列的數量大於門閥值,則拆分成100個小任務
ArrayList<ForkJoinDemo> subTasks = new ArrayList<ForkJoinDemo>();
long step = (start + end) / 100; // 計算每個小任務的數列的數量
long firstOne = start; // 動態記錄小任務數列的起始值
// 將任務拆分,並提交給框架
for(int i=0; i<100; i++) {
long lastOne = firstOne + step; // 動態記錄小任務數列的結束值
if(lastOne > end) {
// 當佇列結束值大於end,需要將lastOne設定為end
lastOne = end;
}
ForkJoinDemo subTask = new ForkJoinDemo(firstOne, lastOne);
firstOne = firstOne + step + 1;
// 將子任務新增到陣列中
subTasks.add(subTask);
// 執行子任務,這裡是將子任務交個Fork/Join框架,什麼時候開始執行由框架自己決定
subTask.fork();
}
// 將子任務的計算結果彙總
for(ForkJoinDemo st : subTasks) {
sum += st.join();
}
}
return sum;
}
public static void main(String args[]) {
// 建立任務物件
ForkJoinDemo task = new ForkJoinDemo(0, 500000L);
// 建立Fork/Join執行緒池
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 將任務提交給執行緒池
ForkJoinTask<Long> result = forkJoinPool.submit(task);
try {
// 取出運算結果
long sum = result.get();
System.out.println("sum: " + sum);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
這段多執行緒求和程式碼本身沒啥用,主要是通過這段程式碼理解分拆的思想,和熟悉框架的使用。併發集合:
有一種說法,程式就是“演算法+資料結構”,而這類併發集合就是JDK為我們提供的資料結構,這些執行緒安全的併發集合主要有連結串列、HashMap、佇列等。
1、concurrentHashMap
concurrentHashMap主要是使用了“鎖的優化方案--減小鎖顆粒度方案”對mutex互斥變數進行了優化,提高了併發性。
我們知道為了讓HashMap執行緒安全,JDK採用的策略是,對HashMap新增mutex變數,所有的對HashMap的操作均要先獲得鎖,這樣就實現的HashMap多執行緒的同步;concurrentHashMap也是使用相同的策略,也是新增互斥變數mutex來實現執行緒的同步,不同的是concurrentHashMap的資料結構和HashMap有略微的差別。
我們可以理解為concurrentHashMap是由16(預設為16個)個小HashMap來組成的,然後對每個小HashMap新增互斥變數來實現小HashMap的多執行緒安全,這樣concurrentHashMap也就是執行緒安全的了。因為Map的重要方法get()和put()方法一次只會操作一個元素,也就是隻會修改16箇中的某一個HashMap,這樣我們只需對當前被修改的HashMap加鎖就能保證執行緒安全了。
但是Map也有操作全域性的方法,比如size()方法,要獲取當前Map裡元素的總數,這時候就要對全域性加鎖,相當於是16個HashMap的鎖都獲取到了才能計算(當然在實際中JDK並沒有這麼去做,而是使用了優化方案,但concurrentHashMap的size()方法也比HashMap的size()方法慢)。這裡也揭示了一個問題,就是使用減小鎖顆粒的優化方案時,過多的使用獲取全域性資訊的方法會降低效能。
2、concurrentLinkedQueue
concurrentLinkedQueue是在高併發環境中效能最好的佇列。之所以有很好的效能,是因為採用了CAS無鎖策略來保證執行緒安全的。關於CAS實現原理、效能好的原因在鎖的優化裡面有詳細論述,這裡就不再重複了。
3、CopyOnWriteArrayList
CopyOnWriteArrayList是適合用於讀操作遠遠大於寫操作場景的連結串列。首先我們來了解下CopyOnWriteArrayList的實現原理,其實在名字上已經展現了大半;連結串列採用無鎖機制,不過這裡使用的不是CAS策略,而是不變模式,當有讀操作,直接讀連結串列就可以了;當有寫操作,不去直接修改連結串列,而是將連結串列複製一份,然後修改複製的連結串列,修改完成後直接替換掉之前的連結串列,這類似於Java中String的處理。
這樣確實保證多執行緒安全,不過我們也看的出來,這樣做代價很大;每修改一次,就要對連結串列全部複製,這是非常佔記憶體和浪費CPU的;只有當寫操作很少時,這種方案才有價值,當寫操作較多時,這種方案就會因為頻繁複制物件而大大降低系統性能。
4、BlockingQueue
BlockingQueue是一個專門用來做資料共享通道的介面。上面提到concurrentLinkedQueue是效能最好的佇列,但它並不適用用來做資料共享通道;因為資料共享通道並不僅僅提供資料儲存的功能,還要做為執行緒的橋樑,實現執行緒間的協同執行等功能,concurrentLinkedQueue顯然沒有這樣的功能。
BlockingQueue之所以適合用來做資料共享通道,其核心還是在Blocking(阻塞)上。BlockingQueue提供了阻塞的put()和take()方法,當向通道寫、讀資料時,如果資料通道為空或者已滿,它們會等待滿足條件了後再執行操作,這樣就較方便的實現了讀和寫執行緒的協同執行。生產者-消費者就是BlockingQueue使用的經典案例。
5、SkipList
SkipList是一種可以用來快速查詢的資料結構,結構比較奇特,總的來講採用的是分層的連結串列,可以看做是為適用多執行緒改良的平衡二叉樹吧。在功能上concurrentShipListMap和concurrentHashMap類似,但用ShipList實現的Map是有序的,在有順序要求的業務中,當然首推適用concurrentShipListMap。
在效能上,當併發量不高、資料量不大時,concurrentHashMap總體效能略好些;當併發量很高、資料量很大時,concurrentShipListMap效能更好;也就是說concurrentShipListMap更適合大併發。concurrentShipListMap一個缺點就是佔記憶體較多,因為其實現需要冗餘記錄一些節點,這裡也是使用計算機裡總要的思想:用空間換時間。
總的來說這些常見的集合都有自己的優勢和缺陷,在使用時我們要根據實際業務場景選擇合適的集合,我們需要的是熟知其中的原理,這樣才能讓我們做出合理的選擇。
Akka介紹:
還是回到高併發這個起點上來,通過上面對Java多執行緒的瞭解,我們知道怎麼去合理使用伺服器上的多核CPU;但是實際高併發環境中,往往一臺伺服器是不夠的,需要多臺伺服器的聯合使用才能完成任務,這個時候或許你會想到分散式、叢集等。而Akka就提供了分散式這樣的功能,通過Akka不僅可以在單機上構建高併發程式,也可以在網路中構建分散式程式,並提供位置透明的Actor定位服務。總的來說Akka採用的是Actor和訊息系統來實現分散式的,下面就簡單的介紹下Actor模型、訊息投遞、訊息接受、訊息路由等知識,最後用一個簡單樣例演示下Akka的使用方式。
插說一段:多執行緒、分散式、叢集,作為解決高併發問題的方案(注意:並行模式是設計模式,並不是解決方案),經常一起出現,其實給初學者帶來了很大的困惑,很多初學者並不瞭解它們間的關係,很長一段時間我也是把它們當同一個東西來看的,其實不然。既然上面提到了這些概念,那我還是對它們分別做一個簡單說明吧。
多執行緒:一臺伺服器上執行多個執行緒,解決的重點是怎樣提高單個伺服器記憶體、CPU等資源的使用率問題;
分散式:一個業務拆分成多個子業務,部署在不同的伺服器上,解決的重點是業務在多臺伺服器高效協同執行的問題;
叢集:同一個業務,部署在多個伺服器上,指的是系統對多硬體的組合使用方式;
多執行緒我們可以看做是對程式的優化,提高程式本身的效能;分散式可以看做是一個架構,因為只有是分散式架構的系統才支援利用叢集的方式來對系統擴充套件;叢集就是對分散式系統橫向擴充套件;所以一般叢集與分散式是一起出現的,而兩者與多執行緒的關係則不大。因為這三者都提高了系統的效能,而且解決的問題的方向也不同,所以實際生產中都是將它們一起使用,來解決大高發問題。
Actor模型:A Model of Concurrent Computing in Distributed Systems(分散式系統中的平行計算模型)。Actor模型我們可以看做是一個函式,只不過比函式要複雜,因為還包含內建狀態,但它具有函式最重要的特徵,就是無論怎麼執行,也無論在哪裡執行,只有引數、環境一定,執行結果就是確定的,這也是實現分散式的基礎。
訊息系統:整個Akka應用是由訊息來驅動的,我們可以簡單的理解,Actor是具體的執行函式,訊息則是去呼叫這些函式;但這與一般的函式呼叫又不同,不是通過方法名來呼叫,而是給需要呼叫的Actor發一個訊息就可以了,Actor收到訊息後,就會自己去執行業務。這裡就涉及到訊息投遞、訊息接受、訊息路由等。
訊息投遞:函式我們知道,引數確定了執行結果也就確定了,Actor模型類似於函式,我們想要一個確定的結果,那麼我們投遞的訊息就應該是確定的,所以這裡強烈建議訊息使用不變模式,防止訊息在傳遞的過程中被修改。另外一個重要問題就是訊息的投遞策略,Akka採用的是至多一次投遞策略,這種策略中,每條訊息最多會被投遞一次。在這種情況下,可能偶爾會出現訊息的投遞失敗,而導致訊息丟失,這就需要我們在業務層維護訊息的可靠性,採用這種策略主要是因為效能最高、成本最低。
訊息收件箱:Akka框架為我們提供了一個叫做“收件箱”的元件,使用收件箱,可以很方便地對Actor進行訊息的傳送和接受。
訊息路由:其實就是用來群發訊息的,通過收件箱我們可以方便的給某個Actor傳送訊息,但是在高併發環境中,往往存在大量的功能相同的Actor在並行處理業務,這個時候我們需要把訊息合理的傳送給這些Actor,首選就是群發了,而訊息路由就是用來幫我們實現群發訊息功能的。訊息路由提供了不同的傳送規則,在建立訊息路由時可以指定合適的規則。
1、粒子群演算法
粒子群演算法與Actor模型有著天生的切合度,下面就正好用這經典的演算法來演示Akka的使用。當然Akka應用場景非常多,粒子群演算法只是某一個應用而已,介於我對Akka的瞭解也不深入,就不去標新立異,還是老老實實的先模仿吧。
粒子群最典型的應用就是用來尋找最優化方案。本質就是就是窮舉法,比如解決一個問題的方案有1億種,如果我們把這1億種方案的結果都算出來,然後對結果進行比較,自然是能找出最優方案的;但由於計算機的運算能力有限,只能運算出一萬種方案的結果;這個時候我們就只能隨機計算一萬種方案,然後找出其中最好的結果,做為我們的次優解,雖說這並不是最好的方案,但這是我們能獲取的最接近最優解的方案,在很多應用中,這種方案也是能被接受的(很典型的就是天氣預報,雖說不能100%準確,但也有很大的價值)。
隨機方案的選取其實是這種最優化計算的核心,Akka框架提供的運算支援只是基礎,但我們這裡主要是為了演示Akka的使用方法,就不去討論優化演算法了。
比如有這麼一個問題:
假設有400萬資金,要求4年內使用完。若在任意一年使用x萬元,則可以獲取√x萬元的收益(本金和收益均不能再使用);當年不用的資金可以存入銀行,年利率為10%,這部分的本金和收益均能拿去使用。數學好的使用拉格朗日公式對方程組求解很快就能算出第一年使用86.19萬元,第二年使用104.29萬元,第三年使用126.29萬元,第四年使用152.69萬元,能獲得43.09萬元的最優收益。數學不好的也沒事,現在不有了計算機麼,下面讓我們的計算機試試,看看能得到什麼樣的結果。
package concurrent.akka;
import java.util.Collections;
import java.util.List;
/**
* 可行的解決方案類,記錄每年投資的錢,和該種方案獲得的收益;
* 因為每種方案都是固定的,不會改變,所以這裡都設定成不變模式;
*/
public final class PsoValue {
// 該方案的收益
final double value;
// 該方案每年投資的錢
final List<Double> x;
public PsoValue(double v, List<Double> x2) {
this.value = v;
this.x = Collections.unmodifiableList(x2);
}
public double getValue() {
return value;
}
public List<Double> getX() {
return x;
}
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("value: ").append("-->").append(x.toString());
return sb.toString();
}
}
package concurrent.akka;
/**
* 全域性最優方案類,記錄全域性最優的方案;
*/
public final class GBestMsg {
final PsoValue value;
public GBestMsg(PsoValue v) {
value = v;
}
public PsoValue getValue() {
return value;
}
}
package concurrent.akka;
/**
* 個體最優方案類,記錄個體最優的方案;
*/
public final class PBestMsg {
final PsoValue value;
public PBestMsg(PsoValue v) {
value = v;
}
public PsoValue getValue() {
return value;
}
}
package concurrent.akka;
import java.util.List;
/**
* 投資方案適應度類,計算出投資方案的收益,收益越大,我們認為適應度越高
*/
public class Fitness {
public static double fitness(List<Double> x) {
double sum = 0;
for(int i=1; i<x.size(); i++) {
// 獲取對於年投資的收益
sum += Math.sqrt(x.get(i));
}
return sum;
}
}
package concurrent.akka;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import akka.actor.ActorSelection;
import akka.actor.UntypedActor;
/**
* 粒子類,也是粒子群演算法中最核心的類;
* 在粒子群演算法中,為了提高效率,選擇執行方案時並不是完全隨機的;
* 而是讓粒子先隨機分佈在整個區域內單獨查詢,看誰查詢的方案適配度最優,
* 並將這最優方案發送給大家,大家再以這個最優方案為方向,
* 按一定演算法選擇執行方案,繼續查詢,以此類推,直到結束。
*
* 如果不明白的,就直接把Bird看做是一個執行了多種方案的類,
* 重點理解怎麼使用Akka讓這些Bird傳遞訊息就可以了。
*/
public class Bird extends UntypedActor {
// Bird個體最優方案
private PsoValue pBest = null;
// 全域性最優方案
private PsoValue gBest = null;
// 粒子在各個維度上的移動速度;每一年的投資認為是一個維度,一共4個維度(這裡陣列長度為5,是為從1-4的角標,方便程式碼閱讀)
private List<Double> velocity = new ArrayList<Double>(5);
// 粒子的初始化位置
private List<Double> x = new ArrayList<Double>(5);
// 建立生成隨機數物件
private Random r = new Random();
// 建立粒子時,初始化粒子的位置,每一個位置可以看做是一種方案
@Override
public void preStart() {
// 初始化velocity和x
for(int i=0; i<5; i++) {
velocity.add(Double.NEGATIVE_INFINITY);
x.add(Double.NEGATIVE_INFINITY);
}
// 第一年投資資金 x1<=400
x.set(1, (double)r.nextInt(401));
// 第二年投資資金 x2<=440-1.1*x1(x1:第一年投資資金)
double max = 440 - 1.1 * x.get(1);
if(max < 0){
max = 0;
}
x.set(2, r.nextDouble() * max);
// 第三年投資資金 x3<=484-1.21*x1-1.1*x2
max = 484 - 1.21 * x.get(1) - 1.1 * x.get(2);
if(max < 0) {
max = 0;
}
x.set(3, r.nextDouble() * max);
// 第四年投資資金 x4<=532.4-1.331*x1-1.21*x2-1.1*x3
max = 532.4 - 1.331 * x.get(1) - 1.21 * x.get(2) - 1.1 * x.get(3);
if(max < 0) {
max = 0;
}
x.set(4, r.nextDouble() * max);
// 計算出該方案的適應度(收益)
double newFit = Fitness.fitness(x);
// 得到區域性最優方案(因為是第一個方案,肯定是當前最優方案)
pBest = new PsoValue(newFit, x);
// 建立區域性最優方案訊息
PBestMsg pBestMsg = new PBestMsg(pBest);
// 通過工廠獲取訊息傳送物件
ActorSelection selection = getContext().actorSelection("/user/masterbird");
// 將區域性最優方案訊息傳送給Master
selection.tell(pBestMsg, getSelf());
}
@Override
public void onReceive(Object msg) throws Exception {
// 如果接受到的是全域性最優方案訊息,則記錄最優方案,並根據全域性最優方案更新自己的執行速度
if(msg instanceof GBestMsg) {
gBest = ((GBestMsg) msg).getValue();
// 更新速度
for(int i=1; i<velocity.size(); i++) {
updateVelocity(i);
}
// 更新位置
for(int i=1; i<x.size(); i++) {
updateX(i);
}
// 有效性檢測,防止粒子超出了邊界
validateX();
// 重新計算適應度,如果產生了新的個體最優,就傳送給Master
double newFit = Fitness.fitness(x);
if(newFit > pBest.value) {
pBest = new PsoValue(newFit, x);
PBestMsg pBestMsg = new PBestMsg(pBest);
getSender().tell(pBestMsg, getSelf());
}
}
else {
unhandled(msg);
}
}
// 更新速度
public double updateVelocity(int i) {
double v = Math.random() * velocity.get(i)
+ 2 * Math.random() * (pBest.getX().get(i) - x.get(i))
+ 2 * Math.random() * (gBest.getX().get(i) - x.get(i));
v = v > 0 ? Math.min(v, 5) : Math.max(v, -5);
velocity.set(i, v);
return v;
}
// 更新位置
public double updateX(int i) {
double newX = x.get(i) + velocity.get(i);
x.set(i, newX);
return newX;
}
// 有效性檢測,防止粒子超出了邊界
public void validateX() {
// x1
if(x.get(1) > 400) {
x.set(1, (double) r.nextInt(401));
}
// x2
double max = 440 - 1.1 * x.get(1);
if((x.get(2) > max) || (x.get(2) < 0)) {
x.set(2, r.nextDouble() * max);
}
// x3
max = 484 - 1.21 * x.get(1) - 1.1 * x.get(2);;
if((x.get(3) > max) || (x.get(3) < 0)) {
x.set(3, r.nextDouble() * max);
}
// x4
max = 532.4 - 1.331 * x.get(1) - 1.21 * x.get(2) - 1.1 * x.get(3);
if((x.get(4) > max) || (x.get(4) < 0)) {
x.set(4, r.nextDouble() * max);
}
}
}
package concurrent.akka;
import akka.actor.ActorSelection;
import akka.actor.UntypedActor;
/**
* 主粒子類,使用者管理和通知全域性最優方案
*/
public class MasterBird extends UntypedActor {
// 全域性最優方案
private PsoValue gBest = null;
@Override
public void onReceive(Object msg) throws Exception {
if(msg instanceof PBestMsg) {
PsoValue pBest = ((PBestMsg) msg).getValue();
if((gBest == null) || (gBest.getValue() < pBest.getValue())) {
// 更新全域性最優方案,並通知所有粒子
gBest = pBest;
ActorSelection selection = getContext().actorSelection("/user/bird_*");
selection.tell(new GBestMsg(gBest), getSelf());
// 列印最優方案
System.out.println(gBest.getValue());
}
}
}
}
package concurrent.akka;
import akka.actor.ActorSystem;
import akka.actor.Props;
public class PSOMain {
public static final int BIRD_COUNT = 1000000;
public static void main(String args[]) {
// 建立Actor的管理和維護系統
ActorSystem system = ActorSystem.create("psoSystem");
// 建立Master粒子
system.actorOf(Props.create(MasterBird.class), "masterbird");
// 建立Bird粒子群
for(int i=0; i<BIRD_COUNT; i++) {
system.actorOf(Props.create(Bird.class), "bird_" + i);
}
}
}
當粒子群的數量達到100w時,基本每次的最大收益都能達到43.05萬之上了,與我們計算的理論值43.09非常接近,基本能滿足我們的要求。還有一個值得注意的問題,這裡建立了100w個粒子,如果是建立這麼多執行緒,我這ThinkPad的筆記本估計早卡死了,但在測試的時候,電腦完全不卡,並且一分鐘不到也就運算完了,可見在系統中是可以啟用大量的Actor,提高了我們對併發的支援。