1. 程式人生 > >【Akka】Akka入門程式設計例項

【Akka】Akka入門程式設計例項

引言

這篇文章主要是第一次學習Akka程式設計,先試試水,探探坑,對Akka和SBT的使用有一個直觀的瞭解,以幾個簡單的akka程式設計例項來說明akka的使用。希望在日後的學習和程式設計中,能有更多自己的體會和經驗總結來分享。

Actor模型

Actor例項可以想象成是伺服器上的Web服務,你無法控制,只能通過傳送訊息去請求執行任務或查詢資訊,而不能直接在Web服務中修改狀態或者處理資源。通過傳送不可改變的訊息,雖然看上去有些限制,但是可以很簡單安全的編寫併發程式。

Actor系統的形象理解

一個actor是基於Actor系統的最小單元,就像面向物件系統中的物件例項一樣,它也封裝了狀態和行為。我們無法窺探actor內部的資訊,只能通過傳送訊息來請求狀態資訊(就像是問一個人,他感覺如何)。actor中有一個存放不可變狀態資訊的信箱。我們通過傳送資訊和actor進行通訊,當actor收到資訊之後,它會運用相關演算法來處理具體的資訊。
在一個應用程式中,多個actor構成了一套層級系統,像是一個家族或者一個商業組織。一個actor可以認為是一個商業組織的個人。一個actor有一個父親,稱為監督者(supervisor),還有好多孩子,可以認為,在一個商業組織中,主席(actor)下面有多個副主席,副主席也有很多下屬隨從。
Actor系統的最佳實踐是“委派任務

”,尤其是當actor的行為被阻塞的時候。可以想象,在實際商業活動中,主席將要做的工作分配給下面的幾個副主席去分別執行,而副主席也會將子任務分配給自己的隨從,直到該任務被下屬們執行完畢。

處理故障

Actor模型的一個重要內容是處理故障。在工作工程中,如果出現錯誤或者丟擲異常,actor和其子actor都將暫停,然後傳送一條資訊給監督者(supervisor)actor,報告出現故障的訊號。
根據工作任務和故障的性質,監督者actor將會作出幾種選擇:

  • 恢復下屬actor,保留內部狀態
  • 重啟下屬actor,清空狀態
  • 終止下屬actor
  • 上報故障

Hello,Actor例項

現在我用一個最簡單的actor程式設計例項來介紹akka程式設計,先給出程式碼:

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props

class HelloActor extends Actor{
  def receive = {
    case "hello"  => println("hello back to you.")
    case _        => println("huh?")
  }
}

object Test1_HelloActor extends
App {
// actor need an ActorSystem val system = ActorSystem("HelloSystem") // create and start the actor val helloActor = system.actorOf(Props[HelloActor], name="helloActor") // send two messages helloActor ! "hello" helloActor ! "what" // shutdown the actor system system.shutdown }

程式碼註解:

  • Actor由HelloActor定義
  • HelloActor的行為有receive方法定義實現,其中使用了模式匹配表示式
  • HelloActor接收字串hello作為訊息,做出相應列印動作
  • Test1_HelloActor的object用來測試actor
  • ActorSystem接收一個name引數,並且通過system.actorOf建立actor例項
  • 建立Actor例項名為helloActor,其建構函式沒有引數
  • Actor建立後自動執行,不需呼叫start或者run方法
  • 通過!方法來發送訊息

ActorSystem

一個actor system是actors的層級集團,分享公共配置資訊(比如分發器dispatchers,部署deployments,遠端功能remote capabilities,地址addresses)。它同時也是建立和查詢actors的入口。ActorSystem是為你的應用程式分配執行緒資源的結構。

ActorRef

當你呼叫ActorSystemactorOf方法時,將建立並返回一個ActorRef的例項:
def actorOf(props: Props, name: String): ActorRef

這個引用用來處理actor,你可以將其看做是處理實際actor的代理人(broker)或包裝外觀(facade)。ActorRef防止你破壞Actor模型,比如直接處理Actor例項,或直接修改Actor例項中的變數。所以只能通過給actor傳送訊息方式來執行任務,這種“袖手旁觀(不干涉,hands-off)”的方法幫助鞏固適宜的程式設計實踐。

ActorRef有以下特點:

  • 它是不可變的
  • 它與actor實體是一對一的關係
  • 它是可序列化的,網路可感知的。這使得你可以在網路環境中傳送一個ActorRef

Actor之間的通訊例項

下面給出的是兩個actor例項相互發送訊息進行通訊的PingPong示例:

import akka.actor._

case object PingMessage
case object PongMessage
case object StartMessage
case object StopMessage

class Ping(pong: ActorRef) extends Actor{
  var count = 0
  def incrementAndPrint {count += 1; println(s"$count:ping")}
  def receive = {
    case StartMessage =>
      incrementAndPrint
      pong ! PongMessage
    case PingMessage =>
      incrementAndPrint
      if(count > 99) {
        sender ! StopMessage
        println("ping stopped")
        context.stop(self)
      }
      else
        sender ! PongMessage
    case _ => println("Ping got unexpected information")
  }
}

class Pong extends Actor {
  var count = 0
  def receive = {
    case StopMessage =>
      println("pong stopped")
      context.stop(self)
    case PongMessage =>
      count += 1
      println(s"$count:pong")
      sender ! PingMessage
    case _ => println("Pong got unexpected information")
  }
}

object PingPangTest extends App{
  val system = ActorSystem("PingPongTest")
  val pongActor = system.actorOf(Props[Pong], name="pong")
  val pingActor = system.actorOf(Props(new Ping(pongActor)),
                                  name = "ping")
  pingActor ! StartMessage
}

程式碼註釋:

  • 建立ActorSystem之後;
  • 建立Pong的actor例項(pongActor物件其實是ActorRef的例項);
  • 之後建立Ping的actor例項,其建構函式接受ActorRef引數;
  • 通過給pingActor傳送一個StartMessage訊息來啟動pingActor和pongActor的具體動作;
  • Ping Actor和Pong Actor通過PingMessage和PongMessage相互發送訊息,sender用來引用訊息傳送源Actor;
  • Ping通過計數,知道進行了100次訊息的傳送之後,傳送StopMessage來終止actor。分別呼叫自己的context.stop方法來結束

啟動Actor

在ActorSystem層面,通過呼叫system.actorOf方法來建立actors;在actor內部,通過呼叫context.actorOf方法來建立子actor。
下面給出一個ParentChild示例:

import akka.actor._

case class CreateChild (name: String)
case class Name (name: String)

class Child extends Actor {
  var name = "No name"
  override def postStop: Unit = {
    println(s"D'oh! They killed me ($name): ${self.path}")
  }
  def receive = {
    case Name(name) => this.name = name
    case _ => println(s"Child $name got message.")
  }
}

class Parent extends Actor {
  def receive = {
    case CreateChild(name) =>
      // Parent creates a new Child here
      println(s"Parent about to create Child ($name) ...")
      val child = context.actorOf(Props[Child], name=s"$name")
      child ! Name(name)
    case _ => println(s"Parent got some other message.")
  }
}

object ParentChildDemo extends App{
  val actorSystem = ActorSystem("ParentChildTest")
  val parent = actorSystem.actorOf(Props[Parent], name="Parent")

  // send messages to Parent to create to child actors
  parent ! CreateChild("XiaoMing")
  parent ! CreateChild("XiaoLiang")
  Thread.sleep(500)

  // lookup XiaoMing, the kill it
  println("Sending XiaoMing a PoisonPill ... ")
  val xiaoming = actorSystem.actorSelection("/user/Parent/XiaoMing")
  xiaoming ! PoisonPill
  println("XiaoMing was killed")

  Thread.sleep(5000)
  actorSystem.shutdown
}

列印結果:

Parent about to create Child (XiaoMing) ...
Parent about to create Child (XiaoLiang) ...
Sending XiaoMing a PoisonPill ...
XiaoMing was killed
D'oh! They killed me (XiaoMing): akka://ParentChildTest/user/Parent/XiaoMing
D'oh! They killed me (XiaoLiang): akka://ParentChildTest/user/Parent/XiaoLiang

終止Actor

在ActorSystem層面,通過system.stop(actorRef)來終止一個actor;在actor內部,使用context.stop(actorRef)來結束一個actor。
如果當前有正在處理的訊息,對該訊息的處理將在actor被終止之前完成,但是郵箱中的後續訊息將不會被處理。預設情況下這些訊息會被送到 ActorSystem的dead letter mailbox, 但是這取決於郵箱的實現。

actor終止的相關處理

actor的終止分兩步: 第一步actor將停止對郵箱的處理,向所有子actor傳送終止命令,然後處理來自子actor的終止訊息直到所有的子actor都完成終止, 最後終止自己(呼叫postStop,銷燬郵箱,向DeathWatch釋出Terminated,通知其監管者)。這個過程保證actor系統中的子樹以一種有序的方式終止,將終止命令傳播到葉子結點並收集它們回送的確認訊息給被終止的監管者。如果其中某個actor沒有響應(i.e.由於處理訊息用了太長時間以至於沒有收到終止命令), 整個過程將會被阻塞。

在 ActorSystem.shutdown被呼叫時, 系統根監管actor會被終止,以上的過程將保證整個系統的正確終止。

postStop() hook是在actor被完全終止以後呼叫的。這是為了清理資源:

override def postStop() = {
  // 關閉檔案或資料庫連線
}

PoisonPill和gracefulStop

還有其他兩種方式,傳送PoisonPill訊息或者使用gracefulStop終止。
你也可以向actor傳送akka.actor.PoisonPill訊息,這個訊息處理完成後actor會被終止。PoisonPill與普通訊息一樣被放進佇列,因此會在已經入佇列的其它訊息之後被執行。

如果你想等待終止過程的結束,或者組合若干actor的終止次序,可以使用gracefulStop。
下面給出gracefulStop的程式碼示例:

import akka.actor._
import akka.pattern.gracefulStop
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.postfixOps

case object TestActorStop

class TestActor extends Actor {
  def receive = {
    case TestActorStop =>
      context.stop(self)
    case _ => println("TestActor got message")
  }
  override def postStop {println("TestActor: postStop")}
}

object GracefulStopTest extends App{
  val system = ActorSystem("GracefulStopTest")
  val testActor = system.actorOf(Props[TestActor], name="TestActor")
   // try to stop the actor graceful
  try {
    val stopped: Future[Boolean] = gracefulStop(testActor, 2 seconds, TestActorStop)
    Await.result(stopped, 3 seconds)
    println("testActor was stopped")
  } catch {
    case e: akka.pattern.AskTimeoutException => e.printStackTrace
  } finally {
    system.shutdown
  }
}

gracefulStop(actorRef, timeout)將返回一個Future例項,當目標actor有處理相關終止動作的訊息時,會執行成功。
上面示例中,通過傳送TestActorStop訊息來終止actor;如果沒有處理終止的工作,當超過2s後,Future丟擲akka.pattern.AskTimeoutException異常。預設情況下,gracefulStop將傳送PoisonPill訊息。

Kill訊息

當深入Akka actors,我們將認識監督者策略(supervisor strategies)概念。當實現了監督者策略,向actor傳送一個Kill訊息,這可以用來重新啟動actor。如果使用預設的監督者策略,Kill訊息將終止目標actor。
下面是示例程式碼:

import akka.actor._

class Number5 extends Actor {
  def receive = {
    case _ => println("Number 5 got a message")
  }
  override def preStart { println("Number 5 is alive")}
  override def postStop { println("Number 5::postStop called")}
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    println("Number 5::preRestart called")
  }
  override def postRestart(reason: Throwable): Unit = {
    println("Number 5::postRestart called")
  }
}

object KillTest extends App{
  val system = ActorSystem("KillTestSystem")
  val number5 = system.actorOf(Props[Number5], name="Number5")
  number5 ! "hello"
  number5 ! Kill
  system.shutdown
}

列印的資訊:

Number 5 is alive
Number 5 got a message
[ERROR] [01/17/2016 19:20:09.342] [KillTestSystem-akka.actor.default-dispatcher-3] [akka://KillTestSystem/user/Number5] Kill (akka.actor.ActorKilledException)
Number 5::postStop called