Akka併發程式設計——第四節:Actor模型(三)
本將主要內容:
1. Actor引用、Actor路徑
1. Actor引用、Actor路徑
下圖是Akka官方文件中給出的一張圖
該圖清晰地說明了ActorPath,ActorRef,Actor及ActorSystem之間的關係,並說明了Actor整體的層次結構。前面我們提到,Akka應用程式會持有一個名稱為user的Actor,該Actor被稱為guardian supervisor(守衛監督器),無論是ActorSystem建立的Actor還是通過ActorContext建立的Actor都為user的子類,它是最頂級的Actor。
(一)ActorRef
對於ActorRef,我們已經很熟悉了,通過呼叫ActorSystem.actorOf方法可以建立Actor,返回的便是ActorRef,例如程式碼
//建立FirstActor物件
val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
返回的便是FirstActor的ActorRef物件,ActorRef最重要的作用便是向Actor傳送訊息,例如
//向myactor傳送訊息
myactor!"test"
myactor! 123
另外,還可以通過context隱式物件獲取父Actor和子Actor的ActorRef,示例程式碼如下:
/*
*Actor API:成員變數self及sender()方法的使用
*/
object Example_07 extends App{
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
class FirstActor extends Actor with ActorLogging{
//通過context.actorOf方法建立Actor
var child:ActorRef = _
override def preStart(): Unit ={
log.info("preStart() in FirstActor")
//通過context上下文建立Actor
child = context.actorOf(Props[MyActor], name = "myActor")
}
def receive = {
//向MyActor傳送訊息
case x => child ! x;log.info("received "+x)
}
}
class MyActor extends Actor with ActorLogging{
var parentActorRef:ActorRef=_
override def preStart(): Unit ={
//通過context.parent獲取其父Actor的ActorRef
parentActorRef=context.parent
}
def receive = {
case "test" => log.info("received test");parentActorRef!"message from ParentActorRef"
case _ => log.info("received unknown message");
}
}
val system = ActorSystem("MyActorSystem")
val systemLog=system.log
//建立FirstActor物件
val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
//獲取ActorPath
val myActorPath=system.child("firstActor")
//通過system.actorSelection方法獲取ActorRef
val myActor1=system.actorSelection(myActorPath)
systemLog.info("準備向myactor傳送訊息")
//向myActor1傳送訊息
myActor1!"test"
myActor1! 123
Thread.sleep(5000)
//關閉ActorSystem,停止程式的執行
system.shutdown()
}
程式碼執行結果
[INFO] [04/02/2016 20:28:08.941] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
[INFO] [04/02/2016 20:28:08.942] [main] [ActorSystem(MyActorSystem)] 準備向myactor傳送訊息
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received 123
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received test
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received message from ParentActorRef
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message
程式碼
class MyActor extends Actor with ActorLogging{
var parentActorRef:ActorRef=_
override def preStart(): Unit ={
//通過context.parent獲取其父Actor的ActorRef
parentActorRef=context.parent
}
def receive = {
case "test" => log.info("received test");parentActorRef!"message from ParentActorRef"
case _ => log.info("received unknown message");
}
}
中,使用
//通過context.parent獲取其父Actor的ActorRef
parentActorRef=context.parent
獲取MyActor 的直接父Actor的ActorRef,在本例中為FirstActor,因為在FirstActor中,我們使用
//通過context上下文建立Actor
child = context.actorOf(Props[MyActor], name = "myActor")
建立了MyActor,FirstActor便自動成為MyActor的直接Supervisor。如此便可以通過程式碼
parentActorRef!"message from ParentActorRef"
傳送訊息。
另外,還可以通過
//建立FirstActor物件
val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
//獲取ActorPath
val myActorPath=system.child("firstActor")
//通過system.actorSelection方法獲取ActorRef
val myActor1=system.actorSelection(myActorPath)
system.actorSelection(myActorPath)方法獲取相應的ActorRef。然後再使用獲取到的ActorRef向Acto傳送訊息
//向myActor1傳送訊息
myActor1!"test"
myActor1! 123
現在,讓我們來總結一下獲取ActorRef的方法:
(1)通過ActorSystem的actorOf方法,不過這種方式是通過建立Actor,然後返回其ActorRef
(2)通過context.actorOf方法,這種方式也是通過建立Actor,然後返回其ActorRef
(3)通過context.parent、context.self、context.children方法獲取當前Actor的父Actor、當前Actor及子Actor的ActorRef,這種方式是獲取已經建立好的Actor的ActorRef
(4)通過val myActor1=system.actorSelection(myActorPath)方法來獲取ActorRef,這種方式也是獲取已經建立好的Actor的ActorRef
(二)ActorPath
在前面的例子中,我們通過
val myActorPath=system.child("firstActor")
已經使用到了ActorPath。在Akka中,ActorPath採用統一資源定位符的方式進行組織,例如
//本地ActorPath
"akka://my-sys/user/service-a/worker1"
//遠端ActorPath
"akka.tcp://[email protected]:5678/user/service-b"
本地ActorPath是Akka併發程式設計中的ActorPath表示方式,而遠端ActorPath是Akka分散式程式設計中常用的ActorPath表示方式,”akka.tcp://[email protected]:5678/user/service-b”中的TCP表示使用的是TCP協議,也可以使用UPD協議,使用UDP協議的遠端ActorPath表示方式有如下形式
"akka.udp://my-sys@host.example.com:5678/user/service-b"
ActorPath當中,akka.udp表示的是使用的協議,my-sys表示的是ActorSytem名稱,host.example.com:5678為遠端機器地址及埠,user為guardian supervisor,service-b為當前應用程式的頂級Actor(通過system.actorOf方法建立的)
/*
*ActorPath
*/
object Example_08 extends App{
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
class FirstActor extends Actor with ActorLogging{
//通過context.actorOf方法建立Actor
var child:ActorRef = _
override def preStart(): Unit ={
log.info("preStart() in FirstActor")
//通過context上下文建立Actor
child = context.actorOf(Props[MyActor], name = "myActor")
}
def receive = {
//向MyActor傳送訊息
case x => child ! x;log.info("received "+x)
}
}
class MyActor extends Actor with ActorLogging{
def receive = {
case "test" => log.info("received test");
case _ => log.info("received unknown message");
}
}
val system = ActorSystem("MyActorSystem")
val systemLog=system.log
//建立FirstActor物件
val firstactor = system.actorOf(Props[FirstActor], name = "firstActor")
//獲取ActorPath
val firstActorPath=system.child("firstActor")
systemLog.info("firstActorPath--->{}",firstActorPath)
//通過system.actorSelection方法獲取ActorRef
val myActor1=system.actorSelection(firstActorPath)
//直接指定其路徑
val myActor2=system.actorSelection("/user/firstActor")
//使用相對路徑
val myActor3=system.actorSelection("../firstActor")
systemLog.info("準備向myactor傳送訊息")
//向myActor1傳送訊息
myActor2!"test"
myActor2! 123
Thread.sleep(5000)
//關閉ActorSystem,停止程式的執行
system.shutdown()
}
程式碼執行結果:
[INFO][04/02/2016 21:04:59.612][main][ActorSystem(MyActorSystem)] firstActorPath--->akka://MyActorSystem/user/firstActor
[INFO][04/02/2016 21:04:59.612][MyActorSystem-akka.actor.default-dispatcher-2][akka://MyActorSystem/user/firstActor] preStart() in FirstActor
[INFO][04/02/2016 21:04:59.615][main][ActorSystem(MyActorSystem)] 準備向myactor傳送訊息
[INFO][04/02/2016 21:04:59.616][MyActorSystem-akka.actor.default-dispatcher-4][akka://MyActorSystem/user/firstActor] received test
[INFO][04/02/2016 21:04:59.616][MyActorSystem-akka.actor.default-dispatcher-4][akka://MyActorSystem/user/firstActor] received 123
[INFO][04/02/2016 21:04:59.616][MyActorSystem-akka.actor.default-dispatcher-3][akka://MyActorSystem/user/firstActor/myActor] received test
[INFO][04/02/2016 21:04:59.616][MyActorSystem-akka.actor.default-dispatcher-3][akka://MyActorSystem/user/firstActor/myActor] received unknown message
本例中的重點程式碼如下
//獲取ActorPath
val firstActorPath=system.child("firstActor")
systemLog.info("myActorPath--->{}",firstActorPath)
//通過system.actorSelection方法獲取ActorRef
val myActor1=system.actorSelection(firstActorPath)
//直接指定其路徑
val myActor2=system.actorSelection("/user/firstActor")
//使用相對路徑
val myActor3=system.actorSelection("../firstActor")
通過 val firstActorPath=system.child(“firstActor”)構造一個ActorPath,然後使用
val myActor1=system.actorSelection(firstActorPath)
獲取路徑下的ActorRef,除這種方式外,還可以通過絕對路徑 val myActor2=system.actorSelection(“/user/firstActor”)及相對路徑 val myActor3=system.actorSelection(“../firstActor”)的方式獲取ActorRef
,需要注意的是絕對路徑使用的是guardian supevisor,即/user/firstActor的這種方式。
如果要獲取myActor,則基方法是型別的,例如
//獲取ActorPath
val myActorPath=system.child("firstActor").child("myActor")
systemLog.info("firstActorPath--->{}",myActorPath)
//通過system.actorSelection方法獲取ActorRef
val myActor1=system.actorSelection(myActorPath)
//直接指定其路徑
val myActor2=system.actorSelection("/user/firstActor/myActor")
//使用相對路徑
val myActor3=system.actorSelection("../firstActor/myActor")
完整程式碼如下:
/*
*ActorPath,獲取myActor
*/
object Example_09 extends App{
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
class FirstActor extends Actor with ActorLogging{
//通過context.actorOf方法建立Actor
var child:ActorRef = _
override def preStart(): Unit ={
log.info("preStart() in FirstActor")
//通過context上下文建立Actor
child = context.actorOf(Props[MyActor], name = "myActor")
}
def receive = {
//向MyActor傳送訊息
case x => child ! x;log.info("received "+x)
}
}
class MyActor extends Actor with ActorLogging{
def receive = {
case "test" => log.info("received test");
case _ => log.info("received unknown message");
}
}
val system = ActorSystem("MyActorSystem")
val systemLog=system.log
//建立FirstActor物件
val firstactor = system.actorOf(Props[FirstActor], name = "firstActor")
//獲取ActorPath
val myActorPath=system.child("firstActor").child("myActor")
systemLog.info("firstActorPath--->{}",myActorPath)
//通過system.actorSelection方法獲取ActorRef
val myActor1=system.actorSelection(myActorPath)
//直接指定其路徑
val myActor2=system.actorSelection("/user/firstActor/myActor")
//使用相對路徑
val myActor3=system.actorSelection("../firstActor/myActor")
systemLog.info("準備向myactor傳送訊息")
//向myActor1傳送訊息
myActor1!"test"
myActor1! 123
Thread.sleep(5000)
//關閉ActorSystem,停止程式的執行
system.shutdown()
}
程式碼執行結果:
[INFO][04/02/2016 21:21:14.377][main][ActorSystem(MyActorSystem)] firstActorPath--->akka://MyActorSystem/user/firstActor/myActor
[INFO][04/02/2016 21:21:14.377][MyActorSystem-akka.actor.default-dispatcher-2][akka://MyActorSystem/user/firstActor] preStart() in FirstActor
[INFO][04/02/2016 21:21:14.381][main][ActorSystem(MyActorSystem)] 準備向myactor傳送訊息
[INFO][04/02/2016 21:21:14.382][MyActorSystem-akka.actor.default-dispatcher-4][akka://MyActorSystem/user/firstActor/myActor] received test
[INFO][04/02/2016 21:21:14.382][MyActorSystem-akka.actor.default-dispatcher-4][akka://MyActorSystem/user/firstActor/myActor] received unknown message
關於遠端ActorRef的獲取,我們將在後續章節中進行講述。
Scala學習(公眾微訊號:ScalaLearning)每天為大家帶來一點Scala語言、Spark、Kafka、Flink、AKKA等大資料技術乾貨及相關技術資訊。技術永無止境,勇攀高峰,一往直前!
覺得文章不錯?掃描關注