Akka: 讓併發和容錯更容易:Akka示例教程
目錄
轉載原文:左洪斌
摘要
Akka用Scala語言寫成,為開發高併發、分散式和容錯式應用提供了便利,對開發者隱藏了很大程度的複雜性。把Akka用好肯定需要了解比這個教程更多的內容,但是希望這裡的介紹和示例能夠引起你的注意並繼續瞭解Akka。
寫併發程式很難。程式設計師不得不處理執行緒、鎖和競態條件等等,這個過程很容易出錯,而且會導致程式程式碼難以閱讀、測試和維護。
所以,很多人不傾向於使用多執行緒程式設計。取而代之的是,他們使用單執行緒程序(譯者注:只含有一個執行緒的程序),依賴外部服務(如資料庫、佇列等)處理所需的併發或非同步操作。雖然這種方法在有些情況下是可行的,但還有很多其他情況不能奏效。很多實時系統——例如交易或銀行業務應用,或實時遊戲——等待一個單執行緒程序完成就太奢侈了(他們需要立即應答!)。其他的一些對於計算或資源要求非常高的系統,如果在程式中不引入並行機制就會耗時很久(有些情況下可以達到幾個小時或數天)。
常用的一種單執行緒方法(例如,在 Node.js裡廣泛應用)是使用基於事件的、非阻塞模式(Event-Based, NON-Blocking Paradigm,其中Paradigm也有譯作成例)。雖然這種方法可以避免上下文切換、鎖和阻塞,的確能提高效能,但還是不能解決併發使用多個處理器(需要啟動和協調多個獨立的處理器)的問題。
那麼,這是不是意味著為了構建一個併發程式,除了深入到執行緒、鎖和競態條件之外沒有別的選擇呢?
感謝Akka框架,它為我們提供了一種選擇。本教程介紹了Akka的示例,並仔細研究它如何幫助並簡化分散式併發應用的實現。
Akka框架是什麼
這篇文章介紹了Akka並仔細研究它如何幫助並簡化分散式併發應用的實現。
Akka是JVM(JAVA虛擬機器,下同)平臺上構建高併發、分散式和容錯應用的工具包和執行時。Akka用 Scala語言寫成,同時提供了Scala和JAVA的開發介面。
Akka處理併發的方法基於 Actor(沒有慣用譯法,文中使用原詞)模型。在基於Actor的系統裡,所有的事物都是Actor,就好像在面向物件設計裡面所有的事物都是物件一樣。但是有一個重要區別——特別是和我們的討論相關的——那就是Actor模型是作為一個併發模型設計和架構的,而面向物件模式則不是。更具體一點,在Scala的Actor系統裡,Actor互相互動並共享資訊但並不對互動順序作出預設。Actor之間共享資訊和發起任務的機制是訊息傳遞。
建立和排程執行緒、接收和分發訊息以及處理競態條件和同步的所有複雜性,都委託給框架,框架的處理對應用來說是透明的。
Akka在多個Actor和下面的系統之間建立了一個層次(Layer),這樣一來,Actor只需要處理訊息就可以了。建立和排程執行緒、接收和分發訊息以及處理競態條件和同步的所有複雜性,都委託給框架,框架的處理對應用來說是透明的。
Actor嚴格遵守 響應式宣告。響應式應用的目標是通過滿足以下一個或多個條件來代替傳統的多執行緒應用:
- 事件驅動。使用Actor,程式碼可以非同步處理請求並用獨佔的方式執行非阻塞操作。
- 可伸縮性。在Akka裡,不修改程式碼就增加節點是可能的,感謝訊息傳遞和本地透明性(Location Transparency)。
- 高彈性。任何應用都會碰到錯誤並在某個時間點失敗。Akka的“監管”(容錯)策略為實現自愈系統提供了便利。
- 響應式。今天的高效能和快速響應應用需要對使用者快速反饋,因此對於事件的響應需要非常及時。Akka的非阻塞、基於訊息的策略可以幫助達成這個目標。
Akka中的Actor是什麼
Actor本質上就是接收訊息並採取行動處理訊息的物件。它從訊息源中解耦出來,只負責正確識別接收到的訊息型別,並採取相應的行動。
收到一條訊息之後,一個Actor可能會採取以下一個或多個行動:
- 執行一些本身的操作(例如進行計算、持久化資料、呼叫外部的Web服務等)
- 把訊息或衍生訊息轉發給另外一個Actor
- 例項化一個新的Actor並把訊息轉發給它
或者,如果這個Actor認為合適的話,可能會完全忽略這條訊息(也就是說,它可能選擇不響應)。
為了實現一個Actor,需要繼承Akka.Actor.Actor這個Trait(一般譯為“特徵”,譯法有一定爭議,文中保留原詞)並實現Receive方法。當一個訊息傳送給Actor時,它的Receive方法會被(Akka)呼叫。典型的實現包括使用模式匹配(Pattern Matching)來識別訊息型別並作出響應,參見下面的Akka示例:
import akka.actor.Actor import akka.actor.Props import akka.event.Logging class MyActor extends Actor { def receive = { case value: String => doSomething(value) case _ => println("received unknown message") } }
模式匹配是一種相對優雅的處理訊息的技術,相比基於回撥的實現,更傾向於產生“更整潔”以及更容易瀏覽的程式碼。例如,考慮一個簡化版的HTTP請求/響應實現。
首先,我們使用JavaScript中基於回撥的方式實現:
route(url, function(request){ var query = buildQuery(request); dbCall(query, function(dbResponse){ var wsRequest = buildWebServiceRequest(dbResponse); wsCall(wsRequest, function(wsResponse) { sendReply(wsResponse); }); }); });
現在,我們把它和基於模式匹配的實現做個比較:
msg match { case HttpRequest(request) => { val query = buildQuery(request) dbCall(query) } case DbResponse(dbResponse) => { var wsRequest = buildWebServiceRequest(dbResponse); wsCall(dbResponse) } case WsResponse(wsResponse) => sendReply(wsResponse) }
雖然基於回撥的JavaScript程式碼更緊湊,但確實更難以閱讀和瀏覽。相比而言,基於模式匹配的程式碼對於需要考慮哪些情況、每種情況都是怎麼處理的寫法更加清晰。
Actor系統
把一個複雜的問題不斷分解成更小規模的子問題通常是一種可靠的解決問題的技術。這個方法對於電腦科學特別有效(和 單一職責原則一致),因為這樣容易產生整潔的、模組化的程式碼,產生的冗餘很少甚至沒有,而且維護起來相對容易。
在基於Actor的設計裡,使用這種技術有助於把Actor的邏輯組織變成一個層級結構,也就是所謂的Actor系統。Actor系統提供了一個基礎框架,通過這個系統Actor之間可以進行互動。
在Akka裡面,和Actor通訊的唯一方式就是通過ActorRef
。ActorRef
代表Actor的一個引用,可以阻止其他物件直接訪問或操作這個Actor的內部資訊和狀態。訊息可以通過一個ActorRef
以下面的語法協議中的一種傳送到一個Actor:
-!
(“告知”) —— 傳送訊息並立即返回
-?
(“請求”) —— 傳送訊息並返回一個Future物件,代表一個可能的應答
每個Actor都有一個收件箱,用來接收發送過來的訊息。收件箱有多種實現方式可以選擇,預設的實現是先進先出(FIFO)佇列。
在處理多條訊息時,一個Actor包含多個例項變數來保持狀態。Akka確保Actor的每個例項都執行在自己的輕量級執行緒裡,並保證每次只處理一條訊息。這樣一來,開發者不必擔心同步或競態條件,而每個Actor的狀態都可以被可靠地保持。
Akka的Actor API中提供了每個Actor執行任務所需要的有用資訊:
sender
:當前處理訊息的傳送者的一個ActorRef
引用context
:Actor執行上下文相關的資訊和方法(例如,包括例項化一個新Actor的方法ActorOf
)supervisionStrategy
:定義用來從錯誤中恢復的策略self
:Actor本身的ActorRef
引用
Akka確保Actor的每個例項都執行在自己的輕量級執行緒裡,並保證每次只處理一條訊息。這樣一來,開發者不必擔心同步或競態條件,而每個Actor的狀態都可以被可靠地保持。
為了把這些教程組織起來,讓我們來考慮一個簡單的例子:統計一個文字檔案中單詞的數量。
為了達到演示Akka示例的目的,我們把這個問題分解為兩個子任務;即(1)統計每行單詞數量的“孩子”任務和(2)彙總這些單行單詞數量、得到檔案裡單詞總數的“父親”任務。
父Actor會從檔案中裝載每一行,然後委託一個子Actor來計算某一行的單詞數量。當子Actor完成之後,它會把結果用訊息發回給父Actor。父Actor會收到(每一行的)單詞數量的訊息並維持一個整個檔案單詞總數的計數器,這個計數器會在完成後返回給呼叫者。
(注意以下提供的Akka教程的例子只是為了教學目的,所以沒有顧及所有的邊界條件、效能優化等。同時,完整可編譯版本的程式碼示例可以在這個GIST中找到)
讓我們首先看一個子類StringCounterActor
的示例實現:
case class ProcessStringMsg(string: String) case class StringProcessedMsg(words: Integer) class StringCounterActor extends Actor { def receive = { case ProcessStringMsg(string) => { val wordsInLine = string.split(" ").length sender ! StringProcessedMsg(wordsInLine) } case _ => println("Error: message not recognized") } }
這個Actor有一個非常簡單的任務:接收ProcessStringMsg
訊息(包含一行文字),計算這行文字中單詞的數量,並把結果通過一個StringProcessedMsg
訊息返回給傳送者。請注意我們已經實現了我們的類,使用!
(“告知”)方法發出StringProcessedMsg
訊息(發出訊息並立即返回)。
好了,現在我們來關注父WordCounterActor
類:
case class StartProcessFileMsg() class WordCounterActor(filename: String) extends Actor { private var running = false private var totalLines = 0 private var linesProcessed = 0 private var result = 0 private var fileSender: Option[ActorRef] = None def receive = { case StartProcessFileMsg() => { if (running) { // println just used for example purposes; // Akka logger should be used instead println("Warning: duplicate start message received") } else { running = true fileSender = Some(sender) // save reference to process invoker import scala.io.Source._ fromFile(filename).getLines.foreach { line => context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line) totalLines += 1 } } } case StringProcessedMsg(words) => { result += words linesProcessed += 1 if (linesProcessed == totalLines) { fileSender.map(_ ! result) // provide result to process invoker } } case _ => println("message not recognized!") } }
這裡面有很多細節,我們來逐一考察(注意討論中所引用的行號基於以上程式碼示例)。
首先,請注意要處理的檔名被傳給了WordCounterActor
的構造方法(第3行)。這意味著這個Actor只會用來處理一個單獨的檔案。這樣通過避免重置狀態變數(running
,totalLines
,linesProcessed
和result
)也簡化了開發者的編碼工作,因為這個例項只使用一次(也就是說處理一個單獨的檔案),然後就丟棄了。
接下來,我們看到WordCounterActor
處理了兩種型別的訊息:
StartProcessFileMsg
(第12行)- 從最初啟動
WordCounterActor
的外部Actor接收到的訊息 - 收到這個訊息之後,
WordCounterActor
首先檢查它收到的是不是一個重複的請求 - 如果這個請求是重複的,那麼
WordCounterActor
生成一個警告,然後就不做別的事了(第16行) - 如果這不是一個重複的請求:
WordCounterActor
在FileSender
例項變數(注意這是一個Option[ActorRef]
而不是一個Option[Actor]
)中儲存傳送者的一個引用。當處理最終的StringProcessedMsg
(從一個StringCounterActor
子類中接收,如下文所述)時,為了以後的訪問和響應,這個ActorRef
是必需的。- 然後
WordCounterActor
讀取檔案,當檔案中每行都裝載之後,就會建立一個StringCounterActor
,需要處理的包含行文字的訊息就會傳遞給它(第21-24行)。
- 從最初啟動
StringProcessedMsg
(第27行)- 當處理完成分配給它的行之後,從
StringCounterActor
處接收到的訊息 - 收到此訊息之後,
WordCounterActor
會把檔案的行計數器增加,如果所有的行都處理完畢(也就是說,當totalLines
和linesProcessed
相等),它會把最終結果發給原來的FileSender
(第28-31行)。
- 當處理完成分配給它的行之後,從
再次需要注意的是,在Akka裡,Actor之間通訊的唯一機制就是訊息傳遞。訊息是Actor之間唯一共享的東西,而且因為多個Actor可能會併發訪問同樣的訊息,所以為了避免競態條件和不可預期的行為,訊息的不可變性非常重要。
因為Case Class預設是不可變的並且可以和模式匹配無縫整合,所以用Case Class的形式來傳遞訊息是很常見的。(Scala中的Case Class就是正常的類,唯一不同的是通過模式匹配提供了可以遞迴分解的機制)。
讓我們通過執行整個應用的示例程式碼來結束這個例子。
object Sample extends App { import akka.util.Timeout import scala.concurrent.duration._ import akka.pattern.ask import akka.dispatch.ExecutionContexts._ implicit val ec = global override def main(args: Array[String]) { val system = ActorSystem("System") val actor = system.actorOf(Props(new WordCounterActor(args(0)))) implicit val timeout = Timeout(25 seconds) val future = actor ? StartProcessFileMsg() future.map { result => println("Total number of words " + result) system.shutdown } } }
請注意這裡的?
方法是怎樣傳送一條訊息的。用這種方法,呼叫者可以使用返回的 Future物件,當完成之後可以打印出最後結果並最終通過停掉Actor系統退出程式。
Akka的容錯和監管者策略
在Actor系統裡,每個Actor都是其子孫的監管者。如果Actor處理訊息時失敗,它就會暫停自己及其子孫併發送一個訊息給它的監管者,通常是以異常的形式。
在Akka裡面,監管者策略是定義你的系統容錯行為的主要並且直接的機制。
在Akka裡面,一個監管者對於從子孫傳遞上來的異常的響應和處理方式稱作監管者策略。 監管者策略是定義你的系統容錯行為的主要並且直接的機制。
當一條訊息指示有一個錯誤到達了一個監管者,它會採取如下行動之一:
- 恢復孩子(及其子孫),保持內部狀態。 當孩子的狀態沒有被錯誤破壞,還可以繼續正常工作的時候,可以使用這種策略。
- 重啟孩子(及其子孫),清除內部狀態。 這種策略應用的場景和第一種正好相反。如果孩子的狀態已經被錯誤破壞,在它可以被用到Future之前有必須要重置其內部狀態。
- 永久地停掉孩子(及其子孫)。 這種策略可以用在下面的場景中:錯誤條件不能被修正,但是並不影響後面執行的操作,這些操作可以在失敗的孩子不存在的情況下完成。
- 停掉自己並向上傳播錯誤。 適用場景:當監管者不知道如何處理錯誤,就把錯誤傳遞給自己的監管者。
而且,一個Actor可以決定是否把行動應用在失敗的子孫上抑或是應用到它的兄弟上。有兩種預定義的策略:
OneForOneStrategy
:只把指定行動應用到失敗的孩子上AllForOneStrategy
:把指定行動應用到所有子孫上
下面是一個使用OneForOneStrategy
的簡單例子:
import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy._ import scala.concurrent.duration._ override val supervisorStrategy = OneForOneStrategy() { case _: ArithmeticException => Resume case _: NullPointerException => Restart case _: IllegalArgumentException => Stop case _: Exception => Escalate }
如果沒有指定策略,那麼就使用如下預設的策略:
- 如果在初始化Actor時出錯,或者Actor被結束(Killed),那麼Actor就會停止(Stopped)
- 如果有任何型別的異常出現,Actor就會重啟
Akka提供的預設策略的實現如下:
final val defaultStrategy: SupervisorStrategy = { def defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop case _: Exception ⇒ Restart } OneForOneStrategy()(defaultDecider) }
Akka也考慮到對 定製化監管者策略的實現,但正如Akka文件也提出了警告,這麼做要小心,因為錯誤的實現會產生諸如Actor系統被阻塞的問題(也就是說,其中的多個Actor被永久掛起了)。
本地透明性
Akka架構支援 本地透明性,使得Actor完全不知道他們接受的訊息是從哪裡發出來的。訊息的傳送者可能駐留在同一個JVM,也有可能是存在於其他的JVM(或者執行在同一個節點,或者執行在不同的節點)。Akka處理這些情況對於Actor(也即對於開發者)來說是完全透明的。唯一需要說明的是跨越節點的訊息必須要被序列化。
Akka架構支援本地透明性,使得Actor完全不知道他們接受的訊息是從哪裡發出來的。
Actor系統設計的初衷,就是不需要任何專門的程式碼就可以執行在分散式環境中。Akka只需要一個配置檔案(Application.Conf),用以說明發送訊息到哪些節點。下面是配置檔案的一個例子:
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { transport = "akka.remote.netty.NettyRemoteTransport" netty { hostname = "127.0.0.1" port = 2552 } } }
最後的一些提示
我們已經瞭解了Akka框架幫助完成併發和高效能的方法。然而,正如這篇教程指出的,為了充分發揮Akka的能力,在設計和實現系統時,有些要點值得考慮:
- 我們應盡最大可能為每個Actor都分配最小的任務(如上面討論的,遵守單一職責原則)
- Actor應該非同步處理事件(也就是處理訊息),不應該阻塞,否則就會發生上下文切換,影響效能。具體來說,最好是在一個Future物件裡執行阻塞操作(例如IO),這樣就不會阻塞Actor,如:
case evt => blockingCall() // BAD case evt => Future { blockingCall() // GOOD }
- 要確認你的訊息都是不可變的,因為互相傳遞訊息的Actor都在它們自己的執行緒裡併發執行。可變的訊息很有可能導致不可預期的行為。
- 由於在節點之間傳送的訊息必須是可序列化的,所以必須要記住訊息體越大,序列化、傳送和反序列化所花費的時間就越多,這也會降低效能。
結論
Akka用Scala語言寫成,簡化併為開發高併發、分散式和容錯式應用提供了便利,對開發者隱藏了很大程度的複雜性。把Akka用好肯定需要了解比這個教程更多的內容,但是希望這裡的介紹和示例能夠引起你的注意並繼續瞭解Akka。