1. 程式人生 > >Akka併發程式設計——第六節:Actor模型(五)

Akka併發程式設計——第六節:Actor模型(五)

本將主要內容:
1. !訊息傳送,Fire-and-Forget訊息模型
2. ?訊息傳送,Send-And-Receive-Future訊息模型

Akka提供了兩種訊息模型:fire-and-forget和Send-And-Receive-Future。fire-and-forget是一種單向訊息傳送模型,指的是非同步傳送訊息,通過非同步傳送訊息且訊息傳送後可以立即返回,Akka中使用?方法進行fire-and-forget訊息傳送,如stringActor!”Creating Actors with implicit val context”,它的意思是當前傳送方Aactor向stringActor傳送字串訊息”Creating Actors with implicit val context”,傳送完該訊息後立即返回,而無需等待stringActor的返回,!還有個過載的方法tell;Send-And-Receive-Future指的是非同步傳送訊息則是一種雙向訊息傳送模型,向目標Actor傳送完訊息後,然後返回一個Future作為後期可能的返回,當前傳送方Actor將等待目標Actor的返回,Akka中使用?方法進行Send-And-Receive-Future訊息的傳送,它也同樣有一個過載的方法ask

1. !訊息傳送,Fire-and-Forget訊息模型

/**
  * 訊息處理:!(Fire-Forget)
  */
object Example12 extends  App{
  import akka.actor.Actor
  import akka.actor.Props
  import akka.event.Logging
  import akka.actor.ActorSystem

 //定義幾種不同的訊息
  case class Start(var msg:String)
  case class Run(var msg:String)
  case class
Stop(var msg:String)
class ExampleActor extends Actor { val other = context.actorOf(Props[OtherActor], "OtherActor") val log = Logging(context.system, this) def receive={ //使用fire-and-forget訊息模型向OtherActor傳送訊息,隱式地傳遞sender case Start(msg) => other ! msg //使用fire-and-forget訊息模型向OtherActor傳送訊息,直接呼叫tell方法,顯式指定sender
case Run(msg) => other.tell(msg, sender) } } class OtherActor extends Actor{ val log = Logging(context.system, this) def receive ={ case s:String=>log.info("received message:\n"+s) case _ ⇒ log.info("received unknown message") } } //建立ActorSystem,ActorSystem為建立和查詢Actor的入口 //ActorSystem管理的Actor共享配置資訊如分發器(dispatchers)、部署(deployments)等 val system = ActorSystem("MessageProcessingSystem") //建立ContextActor val exampleActor = system.actorOf(Props[ExampleActor],name="ExampleActor") //使用fire-and-forget訊息模型向exampleActor傳送訊息 exampleActor!Run("Running") exampleActor!Start("Starting") //關閉ActorSystem system.shutdown() }

程式碼執行結果如下:

[INFO] [03/20/2016 20:57:43.665] [MessageProcessingSystem-akka.actor.default-dispatcher-5] [akka://MessageProcessingSystem/user/ExampleActor/OtherActor] received message:
Running
[INFO] [03/20/2016 20:57:43.672] [MessageProcessingSystem-akka.actor.default-dispatcher-5] [akka://MessageProcessingSystem/user/ExampleActor/OtherActor] received message:
Starting

在ExampleActor中,通過隱式變數context建立了OtherActor例項:val other = context.actorOf(Props[OtherActor], “OtherActor”),在ExampleActor的receive方法中,處理兩種不同型別的訊息例如:

  //使用fire-and-forget訊息模型向OtherActor傳送訊息,隱式地傳遞sender
  case Start(msg) => other ! msg
  //使用fire-and-forget訊息模型向OtherActor傳送訊息,直接呼叫tell方法,顯式指定sender
  case Run(msg) => other.tell(msg, sender)

處理Start型別的訊息時,直接使用!進行訊息傳送,而處理Run型別的訊息時,使用的是tell方法,可以看到使用tell方法需要顯式地指定其sender,而使用!進行訊息傳送則不需要,事實上!方法通過隱式值傳入需要的Sender,對比!與tell方法的定義便很容易理解

//!方法的定義
def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit
//tell方法的定義
final def tell(msg: Any, sender: ActorRef): Unit = this.!(msg)(sender)

可以看到,tell方法的實現依賴於!方法。如果在一個Actor當中使用!方法時,例如ExampleActor中使用的other ! msg向OtherActor傳送訊息,則sender隱式為ExampleActor,如果不是在Actor中使用則預設為Actor.noSender,即sender為null。

2. ?訊息傳送,Send-And-Receive-Future訊息模型

理解了Fire-And-Forget訊息模型後,接著對Send-And-Receive-Future訊息模型進行介紹,下面的程式碼給出了其使用示例。

/**
  * 訊息處理:?(Send-And-Receive-Future)
  */
object Example13 extends  App{
  import akka.actor.Actor
  import akka.actor.Props
  import akka.event.Logging
  import akka.actor.ActorSystem
  import scala.concurrent.Future
  import akka.pattern.ask
  import akka.util.Timeout
  import scala.concurrent.duration._
  import akka.pattern.pipe
  import scala.concurrent.ExecutionContext.Implicits.global

  //訊息:個人基礎資訊
  case class BasicInfo(id:Int,val name:String, age:Int)
  //訊息:個人興趣資訊
  case class InterestInfo(id:Int,val interest:String)
  //訊息: 完整個人資訊
  case class Person(basicInfo: BasicInfo,interestInfo: InterestInfo)


  //基礎資訊對應Actor
  class BasicInfoActor extends Actor{
    val log = Logging(context.system, this)
    def receive = {
      //處理送而來的使用者ID,然後將結果傳送給sender(本例中對應CombineActor)
      case id:Int ⇒log.info("id="+id);sender!new BasicInfo(id,"John",19)
      case _      ⇒ log.info("received unknown message")
    }
  }

  //興趣愛好對應Actor
  class InterestInfoActor extends Actor{
    val log = Logging(context.system, this)
    def receive = {
      //處理髮送而來的使用者ID,然後將結果傳送給sender(本例中對應CombineActor)
      case id:Int ⇒log.info("id="+id);sender!new InterestInfo(id,"足球")
      case _      ⇒ log.info("received unknown message")
    }
  }

  //Person完整資訊對應Actor
  class PersonActor extends Actor{
    val log = Logging(context.system, this)
    def receive = {
      case person: Person =>log.info("Person="+person)
      case _      ⇒ log.info("received unknown message")
    }
  }


  class CombineActor extends Actor{
    implicit val timeout = Timeout(5 seconds)
    val basicInfoActor = context.actorOf(Props[BasicInfoActor],name="BasicInfoActor")
    val interestInfoActor = context.actorOf(Props[InterestInfoActor],name="InterestInfoActor")
    val personActor = context.actorOf(Props[PersonActor],name="PersonActor")
    def receive = {
      case id: Int =>
        val combineResult: Future[Person] =
          for {
            //向basicInfoActor傳送Send-And-Receive-Future訊息,mapTo方法將返回結果對映為BasicInfo型別
            basicInfo <- ask(basicInfoActor, id).mapTo[BasicInfo]
            //向interestInfoActor傳送Send-And-Receive-Future訊息,mapTo方法將返回結果對映為InterestInfo型別
            interestInfo <- ask(interestInfoActor, id).mapTo[InterestInfo]
          } yield Person(basicInfo, interestInfo)

        //將Future結果傳送給PersonActor
       pipe(combineResult).to(personActor)
    }
  }

  val _system = ActorSystem("Send-And-Receive-Future")
  val combineActor = _system.actorOf(Props[CombineActor],name="CombineActor")
  combineActor ! 12345
  Thread.sleep(5000)
  _system.shutdown

}

程式碼執行結果如下:

[INFO][03/20/2016 22:55:11.208][Send-And-Receive-Future-akka.actor.default-dispatcher-3][akka://Send-And-Receive-Future/user/CombineActor/BasicInfoActor] id=12345
[INFO][03/20/2016 22:55:11.220][Send-And-Receive-Future-akka.actor.default-dispatcher-2][akka://Send-And-Receive-Future/user/CombineActor/InterestInfoActor] id=12345
[INFO][03/20/2016 22:55:11.223][Send-And-Receive-Future-akka.actor.default-dispatcher-4][akka://Send-And-Receive-Future/user/CombineActor/PersonActor] Person=Person(BasicInfo(12345,John,19),InterestInfo(12345,足球))

程式碼中定義了3種類型的訊息,分別是個人基礎資訊case class BasicInfo(id:Int,val name:String, age:Int)、個人興趣資訊case class InterestInfo(id:Int,val interest:String)以及完整個人資訊case class Person(basicInfo: BasicInfo,interestInfo: InterestInfo),然後為這3種類型的訊息定義了相應的Actor即BasicInfoActor、InterestInfoActor和PersonActor,在CombineActor分別建立相應Actor的例項,receive方法中使用ask向BasicInfoActor、InterestInfoActor傳送Send-And-Receive-Future模型訊息,BasicInfoActor、InterestInfoActor中的receive方法接收到傳送來的Int型別訊息並分別使用!向CombineActor傳送BasicInfo、InterestInfo訊息,將結果儲存在Future[Person]中,然後通過程式碼pipe(combineResult).to(personActor)將結果傳送給PersonActor。

Scala學習(公眾微訊號:ScalaLearning)每天為大家帶來一點Scala語言、Spark、Kafka、Flink、AKKA等大資料技術乾貨及相關技術資訊。技術永無止境,勇攀高峰,一往直前!
覺得文章不錯?掃描關注

這裡寫圖片描述