漫談併發程式設計:Actor模型
0x00 前言
一般來說有兩種策略用來在併發執行緒中進行通訊:共享資料和訊息傳遞。熟悉c和java併發程式設計的都會比較熟悉共享資料的策略,比如java程式設計師就會常用到java.util.concurrent
包中同步、鎖相關的資料結構。
使用共享資料方式的併發程式設計面臨的最大的一個問題就是資料條件競爭(data race)。處理各種鎖的問題是讓人十分頭痛的一件事。
和共享資料方式相比,訊息傳遞機制最大的優點就是不會產生資料競爭狀態(data race)。實現訊息傳遞有兩種常見的型別:基於channel的訊息傳遞和基於Actor的訊息傳遞。本文主要是來分享Scala的Actor模型。
文章結構
本篇部落格嘗試講解Actor模型。由於目前Scala的使用頻率較高,因此主要語言為Scala
主要分為下面幾個部分:
- Actor模型的基本概念使用
- 講一下akka框架中scala的基本使用,主要提幾個重要的api
- 寫幾個例子幫助理解
1). HelloWorld 簡單版:通過這個例子來簡單看一下Akka中Actor的使用
2). HelloWordl 進階版:稍微進化了一點點,多了preStart和PoisonPill的使用。
3). WordCount 偽分散式:一個單機版的wordcount,一個map,多個reduce。後續再補充完全分散式的程式。
0x01 基本概念
Actor是電腦科學領域中的一個平行計算模型,它把actors當做通用的平行計算原語:一個actor對接收到的訊息做出響應,進行本地決策,可以建立更多的actor,或者傳送更多的訊息;同時準備接收下一條訊息。
在Actor理論中,一切都被認為是actor,這和麵向物件語言裡一切都被看成物件很類似。但包括面嚮物件語言在內的軟體通常是順序執行的,而Actor模型本質上則是併發的。
什麼是Actor模型
Actor的概念來自於Erlang,在AKKA中,可以認為一個Actor就是一個容器,用以儲存狀態、行為、Mailbox以及子Actor與Supervisor策略。Actor之間並不直接通訊,而是通過Mail來互通有無。
每個Actor都有一個(恰好一個)Mailbox。Mailbox相當於是一個小型的佇列,一旦Sender傳送訊息,就是將該訊息入隊到Mailbox中。入隊的順序按照訊息傳送的時間順序。Mailbox有多種實現,預設為FIFO。但也可以根據優先順序考慮出隊順序,實現演算法則不相同。
訊息和信箱
非同步地傳送訊息是用actor模型程式設計的重要特性之一。訊息並不是直接傳送到一個actor,而是傳送到一個信箱(mailbox)。如下圖。
這樣的設計解耦了actor之間的關係——actor都以自己的步調執行,且傳送訊息時不會被阻塞。雖然所有actor可以同時執行,但它們都按照信箱接收訊息的順序來依次處理訊息,且僅在當前訊息處理完成後才會處理下一個訊息,因此我們只需要關心傳送訊息時的併發問題即可。
0x02 Akka中的Actor
我們會用到Akka框架提供的Actor,因此在這裡先大致介紹一下Akka中的Actor使用方式。
Actor System
Actor System是進入AKKA世界中的一個入口,也可以看做是Actor的系統工廠或管理者,掌控者Actor的生命週期,包括建立、停止Actor,當然也可以關閉整個ActorSystem。
比如我們後面會展示出來的程式碼:
object BetterHelloWorld extends App{
val system = ActorSystem("HelloActors")
system.actorOf(Props[BetterMaster], "master")
}
Actor的層級
Actor的整個體系就像是一家層級森嚴的企業組織,層次越高,管理許可權與職責就更大。在AKKA中,parent actor就是child actor的supervisior,這意味著parent actor能夠掌控child actor的整個生命週期。而這種分級的模式也能夠更好地支援系統的容錯。
如果要建立child actor,就不再呼叫ActorSystem的actorOf()方法。
如下BetterMaster
就是一個parent actor,而BetterTalker就是一個child actor,完整程式碼請看後面的例子。
parent actor:
class BetterMaster extends Actor {
val talker = context.actorOf(Props[BetterTalker], "talker")
override def preStart { ... }
def receive = { ... }
}
child actor:
class BetterTalker extends Actor {
def receive() = {
...
}
}
補充: AKKA提供的方法是在Parent Actor內部,通過呼叫ActorContext的actorOf()方法來建立它自身的child actor。
Actor的生命週期
AKKA為Actor生命週期的每個階段都提供了一個鉤子(hook)方法,我們可以通過觀察自定義Actor需要重寫的方法來理解Actor的生命週期。
Actor被定義為trait,其中一個典型的方法對是preStart()與postStop(),顧名思義,兩個方法分別在啟動和停止時被呼叫。 這兩個方法可以看我們在後文中的例子。
Akka官方文件提供了說明Actor生命週期的圖片,如下所示:
官網的圖不好看,盜一張感覺不錯的。大致能明白就好。
除了正常的start和stop操作,我們也會在例子中用到preStart()、postStop()、PoisonPill等操作。
0x03 例子
下面會放三個例子,分別有不同的知識點。
- HelloWorld 簡單版:通過這個例子來簡單看一下Akka中Actor的使用
- HelloWordl 進階版:稍微進化了一點點,多了preStart和PoisonPill的使用。
- WordCount 偽分散式:一個單機版的wordcount,一個map,多個reduce。後續再補充完全分散式的程式。
1. HelloWorld 簡單版
簡單的Hello,程式碼可以看註釋。
package com.dantezhao.helloworld.simple
import akka.actor._
/**
* Created by Dante on 2017/6/17.
*/
object SimpleHelloWorld extends App{
val system = ActorSystem("HelloActors")
val talker = system.actorOf(Props[SimpleTalker], "talker")
//傳送三條訊息
talker ! SimpleGreet("Dante")
talker ! SimplePraise("Winston")
talker ! SimpleCelebrate("clare", 18)
}
//這裡用三個case class來宣告三種訊息型別
// case class有一個好處就是可以用在case語句中
case class SimpleGreet(name: String)
case class SimplePraise(name: String)
case class SimpleCelebrate(name: String, age: Int)
//這是我們第一個actor
//它會接收三種訊息並列印相應的輸出
class SimpleTalker extends Actor {
def receive = {
case SimpleGreet(name) => println(s"Hello $name")
case SimplePraise(name) => println(s"$name, you're amazing")
case SimpleCelebrate(name, age) => println(s"Here's to another $age years, $name")
}
}
執行看一下結果:
Hello Dante
Winston, you're amazing
Here's to another 18 years, clare
2. HelloWorld 升級版
在此版本加上了控制Actor結束的PoisonPill。
可以看到當接收到PoisonPill,Actor將不再接收資料。
package com.dantezhao.helloworld.better
import akka.actor.{Actor, ActorSystem, PoisonPill, Props, Terminated}
/**
* Created by Dante on 2017/6/18.
*/
object BetterHelloWorld extends App{
val system = ActorSystem("HelloActors")
system.actorOf(Props[BetterMaster], "master")
}
//這裡用三個case class來宣告三種訊息型別
// case class有一個好處就是可以用在case語句中
case class BetterGreet(name: String)
case class BetterPraise(name: String)
case class BetterCelebrate(name: String, age: Int)
class BetterTalker extends Actor {
def receive() = {
case BetterGreet(name) => println(s"Hello $name")
case BetterPraise(name) => println(s"$name, you're amazing")
case BetterCelebrate(name, age) => println(s"Here's to another $age years, $name")
}
}
class BetterMaster extends Actor {
val talker = context.actorOf(Props[BetterTalker], "talker")
override def preStart {
context.watch(talker)
talker ! BetterGreet("Dante")
talker ! BetterPraise("Winston")
talker ! BetterCelebrate("Clare", 16)
//傳送一個毒丸,告訴actor已經結束了。因此後面傳送的訊息將不會被傳遞
talker ! PoisonPill
talker ! BetterGreet("Dante")
}
def receive = {
case Terminated(`talker`) => context.system.terminate()
}
}
執行結果。
Hello Dante
Winston, you're amazing
Here's to another 16 years, Clare
[INFO] [06/18/2017 11:32:12.309] [HelloActors-akka.actor.default-dispatcher-3] [akka://HelloActors/user/master/talker] Message [com.dantezhao.helloworld.better.BetterGreet] from Actor[akka://HelloActors/user/master#-1817351260] to Actor[akka://HelloActors/user/master/talker#-385284367] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
3. 偽分散式 WordCount
寫一個模擬WordCount的程式,工作流程如下:
- MasterActor負責管理其餘的Actor,併發送資料給MapActor。這裡的傳送資料暫時隨意發幾條。
- MapActor接收到一行行的資料後,將資料處理成
(word:1)
的形式,併發送到所有的ReduceActor中。 - ReduceActor接收到資料後,將資料處理成
(word:count_num)
的形式,傳送給AggregateActor。 - 最後,AggregateActor彙集ReduceActor的資料,並打印出top10。
如圖,是Actor的結構。
這裡不再貼完整的程式碼,只放出來幾個關鍵的程式碼段,其餘的請參考github。
MapActor的處理邏輯:
// 把一句話切割,返回(word:1)
def splitLine(line: String): MapData = {
var dataList = new ArrayBuffer[Word]
var parser: StringTokenizer = new StringTokenizer(line)
while (parser.hasMoreTokens()) {
var word: String = parser.nextToken().toLowerCase()
dataList.append(new Word(word, 1))
}
return new MapData(dataList)
}
ReduceActor的處理邏輯:
def reduce(dataList: ArrayBuffer[Word]): ReduceData = {
val reducedMap = new HashMap[String, Integer]
for (wc: Word <- dataList) {
val word: String = wc.word
if (reducedMap.contains(word)) {
reducedMap.put(word, reducedMap(word)+1 )
} else {
reducedMap.put(word, 1)
}
}
return new ReduceData(reducedMap)
}
AggregateActor的處理邏輯:
def aggregateInMemoryReduce(reducedList: HashMap[String, Integer]) {
var count: Integer = 0
for (key <- reducedList.keySet) {
if (wordCounts.contains(key)) {
count = reducedList(key) + wordCounts(key)
wordCounts.put(key, count)
}
else {
wordCounts.put(key, reducedList(key))
}
}
}
最後的求top10的邏輯:
override def postStop() {
wordCounts.toList.sortBy(_._2).takeRight(10).map(println)
}
執行看一下結果:
(best,1)
(brown,1)
(family,5)
(belong,5)
(same,5)
(to,6)
(and,6)
(fox,6)
(dog,8)
(the,8)
0x04 總結
本來是想搞Go的併發程式設計的,但是CSP和Actor模型有很多接近的地方,想搞CSP模型,總是要看Actor的,因此就先花了點時間看一下Actor模型。
目前來講,也是學著寫著,比較遺憾的是沒有實際的專案支撐,理解程度還是不深。 程式碼能正常執行,但是不是很理想,最後沒搞出來分散式的程式。 只能留待以後有興趣了再搞了。