Scala(四)-- Actor程式設計
Actor程式設計
1.Actor模型
1.1 什麼是Actor 一個Actor指的是一個最基本的計算單元。它能接收一個訊息並且基於其執行計算。 這個理念很像面嚮物件語言,一個物件接收一條訊息(方法呼叫),然後根據接收的訊息做事(呼叫了哪個方法)。 Actors一大重要特徵在於actors之間相互隔離,它們並不互相共享記憶體。這點區別於上述的物件。也就是說,一個actor能維持一個私有的狀態,並且這個狀態不可能被另一個actor所改變。
聚沙成塔
One ant is no ant, one actor is no actor. 光有一個actor是不夠的,多個actors才能組成系統。在actor模型裡每個actor都有地址,所以它們才能夠相互發送訊息。 Scala中的Actor能夠實現並行程式設計的強大功能,它是基於事件模型的併發機制,Scala是運用訊息(message)的傳送、接收來實現多執行緒的。使用Scala能夠更容易地實現多執行緒應用的開發。 Actor是電腦科學領域中的一個平行計算模型,它把actors當做通用的平行計算原語:一個actor對接收到的訊息做出響應,進行本地決策,可以建立更多的actor,或者傳送更多的訊息;同時準備接收下一條訊息。 在Actor理論中,一切都被認為是actor,這和麵向物件語言裡一切都被看成物件很類似。但包括面嚮物件語言在內的軟體通常是順序執行的,而Actor模型本質上則是併發的。 每個Actor都有一個(恰好一個)Mailbox。Mailbox相當於是一個小型的佇列,一旦Sender傳送訊息,就是將該訊息入隊到Mailbox中。入隊的順序按照訊息傳送的時間順序。Mailbox有多種實現,預設為FIFO。但也可以根據優先順序考慮出隊順序,實現演算法則不相同。
訊息和信箱
非同步地傳送訊息是用actor模型程式設計的重要特性之一。訊息並不是直接傳送到一個actor,而是傳送到一個信箱(mailbox) 這樣的設計解耦了actor之間的關係——actor都以自己的步調執行,且傳送訊息時不會被阻塞。雖然所有actor可以同時執行,但它們都按照信箱接收訊息的順序來依次處理訊息,且僅在當前訊息處理完成後才會處理下一個訊息,因此我們只需要關心傳送訊息時的併發問題即可。 Actor之間通過傳送訊息來通訊,訊息的傳送是非同步的,通過一個郵件佇列(mail queue)來處理訊息。每個Actor是完全獨立的,可以同時執行它們的操作。每一個Actor是一個計算實體,對映接收到的訊息到以下動作:
- 傳送有限個訊息給其它Actor
- 建立有限個新的Actor
- 為下一個接收的訊息指定行為 以上三種動作並沒有固定的順序,可以併發地執行。Actor會根據接收到的訊息進行不同的處理。
Actor模型有兩種任務排程方式:基於執行緒的排程以及基於事件的排程
基於執行緒的排程:為每個Actor分配一個執行緒,在接收一個訊息時,如果當前Actor的郵箱(mail box)為空,則會阻塞當前執行緒。 基於執行緒的排程實現較為簡單,但執行緒數量受到操作的限制,現在的Actor模型一般不採用這種方式; 基於事件的除錯:事件可以理解為上述任務或訊息的到來,而此時才會為Actor的任務分配執行緒並執行。
綜上,我們知道可以把系統中的所有事物都抽象成一個Actor:
- Actor的輸入是接收到的訊息。
- Actor接收到訊息後處理訊息中定義的任務。
- Actor處理完成任務後可以傳送訊息給其它的Actor。
1.2 Actors有郵箱 只得指明的一點是,儘管許多actors同時執行,但是一個actor只能順序地處理訊息。也就是說其它actors傳送了三條訊息給一個actor,這個actor只能一次處理一條。所以如果你要並行處理3條訊息,你需要把這條訊息發給3個actors。 訊息非同步地傳送到actor,所以當actor正在處理訊息時,新來的訊息應該儲存到別的地方。Mailbox就是這些訊息儲存的地方。 Actors通過非同步訊息溝通,在處理訊息之前訊息被存放在Mailbox中。
1.3 Actors做什麼 當一個actor接收到訊息後,它能做如下三件事中的一件: Create more actors; 建立其他actors Send messages to other actors; 向其他actors傳送訊息 Designates what to do with the next message. 指定下一條訊息到來的行為 前兩件事比較直觀,第三件卻很有意思。我之前說過一個actor能維持一個私有狀態。「指定下一條訊息來到做什麼」意味著可以定義下條訊息來到時的狀態。更清楚地說,就是actors如何修改狀態。 設想有一個actor像計算器,它的初始狀態是數字0。當這個actor接收到add(1)訊息時,它並不改變它原本的狀態,而是指定當它接收到下一個訊息時,狀態會變為1。
1.4 容錯 Fault tolerance
Erlang 引入了「隨它崩潰」的哲學理念,這部分關鍵程式碼被監控著,監控者的唯一職責是知道程式碼崩潰後幹什麼(如將這個單元程式碼重置為正常狀態),讓這種理念成為可能的正是actor模型。 每段程式碼都執行在process中,process是erlang稱呼actor的方式。這個process完全獨立,意味著它的狀態不會影響其他process。我們有個supervisor,實際上它只是另一個process(所有東西都是actor),當被監控的process掛了,supervisor這個process會被通知並對此進行處理。這就讓我們能建立「自愈」系統了。如果一個actor到達異常狀態並崩潰,無論如何,supervisor都可以做出反應並嘗試把它變成一致狀態,這裡有很多策略,最常見的是根據初始狀態重啟actor。
1.5 分散式 Distribution 另一個關於actor模型的有趣方面是它並不在意訊息傳送到的actor是本地的或者是另外節點上的。 轉念一想,如果actor只是一些程式碼,包含了一個mailbox和一個內部狀態,actor只對訊息做出響應,誰會關注它執行在哪個機器上呢?只要我們能讓訊息到達就行了。這允許我們基於許多計算機上構建系統,並且恢復其中任何一臺.
1.6 對比傳統java併發程式設計與Scala Actor程式設計
Scala中的Actor能夠實現並行程式設計的強大功能,它是基於事件模型的併發機制,Scala是運用訊息(message)的傳送、接收來實現多執行緒的
java 內建執行緒模型 | Scala actor模型 |
---|---|
共享資料鎖,每個object有一個monitor,監視多執行緒對共享資料的訪問 | 不共享資料,actor之間通過message通訊加鎖的程式碼段用syncronized標識 |
加鎖的程式碼段用syncronized標識 | |
死鎖問題:兩個物件在沒有釋放資源的狀況下又去申請新的資源,造成互相等待,形成死鎖 | |
每個執行緒內部都是順序執行的 | 每個actor內部都是順序執行的 |
1.7 Actor方法執行順序
1.首先呼叫start()方法啟動Actor 2.呼叫start()方法後其act()方法會被執行 3.向Actor傳送訊息
1.8 傳送訊息的方式
! | 傳送非同步訊息,沒有返回值。 |
---|---|
!! | 傳送非同步訊息,返回值是 Future[Any] |
!? | 傳送同步訊息,等待返回值。 |
Actor例項 -1-
import scala.actors.Actor
object actor1 extends Actor{
def act():Unit = {
for(i<-1 to 10){
println("actor1---"+i)
Thread.sleep(1000)
}
}
}
object actor2 extends Actor{
def act():Unit = {
for(i<-1 to 10){
println("actor2---"+i)
Thread.sleep(1000)
}
}
}
object ActorDemo {
def main(args: Array[String]): Unit = {
//啟動actor
actor1.start()
actor2.start()
}
}
說明:上面分別呼叫了兩個單例物件的start()方法,他們的act()方法會被執行,相同與在java中開啟了兩個執行緒,執行緒的run()方法會被執行。 注意:這兩個Actor是並行執行的,act()方法中的for迴圈執行完成後actor程式就退出了
-2- receive相當於是建立執行緒和銷燬執行緒的過程,可以不斷地接收訊息 接收訊息方式1:receive 特點:要反覆處理訊息,receive外層用while(…), 不用的話只處理一次。
class actor3 extends Actor{
override def act(): Unit = {
while (true) {
//receive引數是一個匿名函式
receive {
case "start" => {
println("Starting")
Thread.sleep(2000)
println("started")
}
case "stop" => {
println("Stopping")
Thread.sleep(2000)
println("stoped")
}
}
}
}
}
object ActorDemo2 {
def main(args: Array[String]): Unit = {
val myactor = new actor3()
myactor.start()
//傳送非同步無返回值的訊息
myactor !"start"
myactor !"stop"
println("have finished")
}
}
說明:在act()方法中加入了while (true) 迴圈,就可以不停的接收訊息 注意:傳送start訊息和stop的訊息是非同步的,但是Actor接收到訊息執行的過程是同步的按順序執行
-3- react類似執行緒池機制,可以複用執行緒,不斷地接收訊息,使用react 特點: (1) 從不返回 (2) 要反覆執行訊息處理,react外層用loop,不能用while(…) (3) 通過複用執行緒,比receive更高效 應儘可能使用react,react方式會複用執行緒,比receive更高效
import scala.actors.Actor
class actor4 extends Actor{
override def act(): Unit = {
loop {
//react可以進行執行緒複用,不用重新建立和銷燬執行緒,減少在資源佔用
react {
case "start" => {
println("Starting")
Thread.sleep(2000)
println("started")
}
case "stop" => {
println("Stopping")
Thread.sleep(2000)
println("stoped")
}
}
}
}
}
object ActorDemo3 {
def main(args: Array[String]): Unit = {
val myactor = new actor4()
myactor.start()
//傳送非同步無返回值的訊息
myactor !"start"
myactor !"stop"
println("have finished")
}
}
-4-Scala寫wordCount
object ScalaWordCount {
def main(args: Array[String]): Unit = {
//模擬從檔案讀取字串
val lines = List("hello scala spark","hello scala java","hello hadoop java")
//資料切分,生成一個一個的單詞,並壓平
val words:List[String] = lines.flatMap(_.split(" "))
//過濾資料中多餘的空格
val filterwords:List[String] = words.filter(_ != " ")
//把一個一個的單詞程式設計一個一個的元組(word,1)
val tuples:List[(String,Int)] = filterwords.map((_,1))
//按照key進行分組
val grouped:Map[String,List[(String,Int)]] = tuples.groupBy(_._1)
//聚合,相同的key的value
val sumed:Map[String,Int] = grouped.mapValues(_.size)
//對結果進行排序
val sorted:List[(String,Int)] = sumed.toList.sortWith(_._2 > _._2)
//一句話實現
//val result:List[(String,Int)] = lines.flatMap(_.split(" ")).filter(_ != "").map((_,1)).groupBy(_._1).mapValues(_.size).toList.sortWith(_._2 > _._2)
//列印
println(sorted)
}
}
-5-ActorWordCount
import scala.actors.{Actor, Future}
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
//使用actor同時處理多個檔案,wordcount,最後將結果彙總
object ActorWordCount {
def main(args: Array[String]): Unit = {
//儲存所有actor返回的處理結果
val replys = ArrayBuffer[Future[Any]]()
//獲取任務
val files = Array("C:\\Users\\brz\\Desktop\\hbaseMR\\hbaseMR\\hbase2hdfs.java","C:\\Users\\brz\\Desktop\\hbaseMR\\hbaseMR\\hdfs2hbase.java")
//分發任務
for(file <- files){
val task = new Task
task.start()
//傳送處理任務 (wordcount處理結果 )
val reply:Future[Any] = task !! submitTask(file) //回撥
replys += reply
}
//彙總處理結果
val processedReplys = new ArrayBuffer[Map[String,Int]]()
while(replys.size>0){
//遍歷future
//有返回結果的future陣列
val done = replys.filter(_.isSet)
for(res <- done){
//一個future的返回結果
val mapped = res.apply().asInstanceOf[Map[String,Int]]
//把每一個處理結果放到彙總的數組裡
processedReplys += mapped
replys -= res
}
val res = processedReplys.flatten.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
println(res)
}
class Task extends Actor{
override def act(): Unit = {
while (true){
receive({
//拿到任務
case submitTask(file)=>{
val line = Source.fromFile(file).getLines()
val listline = line.toList
val words = listline.flatMap(_.split(" "))
val tuples = words.map((_,1))
val grouped = tuples.groupBy(_._1)
val sumed = grouped.mapValues(_.size)
//將結果返回
sender ! sumed
}
})
}
}
}
case class submitTask(file:String)
}