Akka筆記之訊息傳遞
英文原文連結,譯文連結,原文作者:Arun Manivannan ,譯者:有孚
在Akka筆記第一篇的介紹中,我們大致介紹了下Akka工具箱中的Actor。在第二篇當中,我們來看一下Actor訊息傳遞的功能。這裡還是延用之前使用的那個學生-老師的例子。
在Actor訊息的第一部分中,我們會建立一個老師的Actor,但學生Actor則先不建立,而是使用一個叫做StudentSimulatorApp的主程式。
仔細回顧下學生-老師模型
我們現在只考慮StudentSimulatorApp傳送給TeacherActor的訊息。這裡我所說的StudentSimulatorApp指的是一個正常的主程式。
從圖中可以看到:
(如果有陌生的術語,沒關係,後面我們會詳細解釋的)
1. 學生建立了一個叫ActorSystem的東西。
2. 他通過ActorSystem來建立了一個叫ActorRef的物件。QuoteRequest訊息就是傳送給ActorRef的(它是TeacherActor的一個代理)
3. ActorRef將訊息傳送給Dispatcher
4. Dispatcher將訊息投遞到目標Actor的郵箱中。
5. 隨後Dispatcher將Mailbox扔給一個執行緒去執行(這點下節會重點講到)
6. MailBox將訊息出隊並最終將其委託給真實的Teacher Actor的接收方法去處理。
正如我所說的,看不懂也別擔心。現在我們來一步步地詳細地分析下。全部講完後你可以再回過頭來看下這五個步驟。
STUDENTSIMULATORAPP程式
我們用這個STUDENTSIMULATORAPP來啟動JVM並初始化ActorSystem。
從圖中可以看到,StudentSimulatorApp
1. 建立了一個ActorSystem
2. 通過ActorSystem建立了一個Teacher Actor的代理(ActorRef)
3. 將QuoteRequest訊息傳送給代理
我們現在只關注這三點。
1. 建立ActorSystem
ActorSystem是進入到Actor的世界的一扇大門。通過它你可以建立或中止Actor。甚至還可以把整個Actor環境給關閉掉。
另一方面來說,Actor是一個分層的結構,ActorSystem之於Actor有點類似於java.lang.Object或者scala.Any的角色——也就是說,它是所有Actor的根物件。當你通過ActorSystem的actorOf方法建立了一個Actor時,你其實建立的是ActorSystem下面的一個Actor。
初始化ActorSystem的程式碼是這樣的:
val system=ActorSystem("UniversityMessageSystem")
UniversityMessageSystem只是你給ActorSystem起的一個可愛的名字而已。
2. 建立一個TeacherActor的代理?
我們來看下下面這段程式碼:
val teacherActorRef:ActorRef=actorSystem.actorOf(Props[TeacherActor])
actorOf是ActorSystem中建立Actor的方法。但是正如你所看到的,它並不會返回我們所需要的TeacherActor。它返回的是一個ActorRef。
這個ActorRef扮演了真實的Actor的一個代理的角色。客戶端並不會直接和Actor通訊。這也正是Actor模型中避免直接訪問TeacherActor中任何的自定義/私有方法或者變數的一種方式。
再重複一遍,訊息只會傳送給ActorRef,最終才會到達真正的Actor。你是絕對無法直接和Actor進行通訊的。如果你真的找到了什麼拙劣的方式來直接通訊,大家會恨你入骨的。
將訊息傳送給代理
還是隻有一行程式碼。你只需告訴說把QuoteRequest訊息傳送到ActorRef就好了。Actor中的這個告訴的方式就是一個!號。(ActorRef中確實也有一個tell方法,不過它只是把這個呼叫委託給了!號)
//send a message to the Teacher Actor teacherActorRef!QuoteRequest
這就可以了!
如果你認為我在騙你的話,看一下下面StudentSimulatorApp的完整程式碼:
STUDENTSIMULATORAPP.SCALA
package me.rerun.akkanotes.messaging.actormsg1 import akka.actor.ActorSystem import akka.actor.Props import akka.actor.actorRef2Scala import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._ object StudentSimulatorApp extends App{ //Initialize the ActorSystem val actorSystem=ActorSystem("UniversityMessageSystem") //construct the Teacher Actor Ref val teacherActorRef=actorSystem.actorOf(Props[TeacherActor]) //send a message to the Teacher Actor teacherActorRef!QuoteRequest //Let's wait for a couple of seconds before we shut down the system Thread.sleep (2000) //Shut down the ActorSystem. actorSystem.shutdown() }
好吧,我承認我撒了點小謊。你還得關掉ActorSystem,不然JVM會一直執行下去的。我還讓主執行緒睡眠了一小會兒,以便給點時間讓TeacherActor去完成它的任務。我知道這聽起來很愚蠢。別擔心。後面我們會通過些優雅的測試用例來替換掉這種取巧的方式。
訊息
我們剛傳送了一個QuoteMessage給ActorRef,但是,還壓根兒沒看著過這個訊息類呢!
說曹操,曹操到:
(實踐中推薦你把訊息封裝成一個好點的物件,這樣維護起來容易些)
TeacherProtocol
package me.rerun.akkanotes.messaging.protocols object TeacherProtocol{ case class QuoteRequest() case class QuoteResponse(quoteString:String) }
正如你所想的那樣,QuoteRequest就是傳送給TeacherActor的那個訊息。Actor會回覆一個QuoteResponse。
分發器及郵箱
ActorRef把訊息處理功能委託給了Dispatcher。實際上,當我們建立ActorSystem和ActorRef的時候,就已經建立了一個Dispatcher和MailBox了。我們來看下它們是幹什麼的。
郵箱
每個Actor都有一個MailBox(後面會介紹一種特殊的情況)。在我們這個比喻當中,每個老師也有一個郵箱。老師得去檢查郵箱並處理訊息。在Actor的世界中,則是另一種形式——郵箱一有機會就會要求Actor去完成自己的任務。
同樣的,郵箱裡也有一個佇列來以FIFO的方式來儲存並處理訊息——它和實際的郵箱還有點不同,真實的郵箱新的信總是在最上面的。
現在講到分發器了
Dispatcher會完成一些很酷的事。從它的角度來看,它只是從ActorRef中取出一條訊息然後將它傳給了MailBox。但是,在這後面發生了一件不可意義的事情:
Dispatcher會封裝一個ExecutorService(ForkJoinPoll或者ThreadPoolExecutor)。它把MailBox扔到ExecutorService中去執行。
看下Dispatcher裡面的一段程式碼:
protected[akka] override def registerForExecution(mbox: Mailbox, ...): Boolean = { ... try { executorService execute mbox ... }
什麼,你是說要執行一下郵箱?
是的。我們看到MailBox中包含了佇列裡面的訊息。由於Executor得去執行MailBox,所以它得是一個Thread型別。是的沒錯。MailBox的宣告及構造器就是這樣的。
下面是MailBox的簽名信息。
private[akka] abstract class Mailbox(val messageQueue: MessageQueue) extends SystemMessageQueue with Runnable
TeacherActor
當MailBox的run方法執行的時候,它會從佇列中取出一條訊息,然後將它傳給Actor去處理。
當你把訊息傳給ActorRef的時候,最終呼叫的實際是目標Actor裡面的一個receive方法。
TeacherActor只是一個很簡單的類,它有一個名言的列表,而receive方法很明顯就是用來處理訊息的。
來看下程式碼:
TeacherActor.scala
package me.rerun.akkanotes.messaging.actormsg1 import scala.util.Random import akka.actor.Actor import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._ /* * Your Teacher Actor class. * * The class could use refinement by way of * using ActorLogging which uses the EventBus of the Actor framework * instead of the plain old System out * */ class TeacherActor extends Actor { val quotes = List( "Moderation is for cowards", "Anything worth doing is worth overdoing", "The trouble is you think you have time", "You never gonna know if you never even try") def receive = { case QuoteRequest => { import util.Random //Get a random Quote from the list and construct a response val quoteResponse=QuoteResponse(quotes(Random.nextInt(quotes.size))) println (quoteResponse) } } }
TeacherActor的receive方法的模式匹配只會匹配一種訊息——QuoteRequest (事實上,模式匹配中最好匹配下預設的情況,不過這個就說來話長了)
receive方法做的就是
1. 匹配QuoteRequest的模式
2. 從名言列表中隨機選取一條
3. 構造出一個QuoteResponse
4. 將QuoteResponse列印到控制檯上
程式碼
整個專案的程式碼可以從Github中下載到。
本文最早釋出於我的個人部落格: Java譯站