1. 程式人生 > >Akka源碼分析-Actor&ActorContext&ActorRef&ActorCell

Akka源碼分析-Actor&ActorContext&ActorRef&ActorCell

sem factor RKE 安全性 dde 執行上下文 abs returns cli

  分析源碼的過程中我們發現,Akka出現了Actor、ActorRef、ActorCell、ActorContext等幾個相似的概念,它們之間究竟有什麽區別和聯系呢?

/**
 * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the ‘Actor Model‘:
 * <a href="http://en.wikipedia.org/wiki/Actor_model">http://en.wikipedia.org/wiki/Actor_model</a>
 *
 * An actor has a well-defined (non-cyclic) life-cycle.
 *  - ‘‘RUNNING‘‘ (created and started actor) - can receive messages
 *  - ‘‘SHUTDOWN‘‘ (when ‘stop‘ is invoked) - can‘t do anything
 *
 * The Actor‘s own [[akka.actor.ActorRef]] is available as `self`, the current
 * message’s sender as `sender()` and the [[akka.actor.ActorContext]] as
 * `context`. The only abstract method is `receive` which shall return the
 * initial behavior of the actor as a partial function (behavior can be changed
 * using `context.become` and `context.unbecome`).
 *
 * This is the Scala API (hence the Scala code below), for the Java API see [[akka.actor.AbstractActor]].
 *
 * {{{
 * class ExampleActor extends Actor {
 *
 *   override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
 *     case _: ArithmeticException      => Resume
 *     case _: NullPointerException     => Restart
 *     case _: IllegalArgumentException => Stop
 *     case _: Exception                => Escalate
 *   }
 *
 *   def receive = {
 *                                      // directly calculated reply
 *     case Request(r)               => sender() ! calculate(r)
 *
 *                                      // just to demonstrate how to stop yourself
 *     case Shutdown                 => context.stop(self)
 *
 *                                      // error kernel with child replying directly to ‘sender()‘
 *     case Dangerous(r)             => context.actorOf(Props[ReplyToOriginWorker]).tell(PerformWork(r), sender())
 *
 *                                      // error kernel with reply going through us
 *     case OtherJob(r)              => context.actorOf(Props[ReplyToMeWorker]) ! JobRequest(r, sender())
 *     case JobReply(result, orig_s) => orig_s ! result
 *   }
 * }
 * }}}
 *
 * The last line demonstrates the essence of the error kernel design: spawn
 * one-off actors which terminate after doing their job, pass on `sender()` to
 * allow direct reply if that is what makes sense, or round-trip the sender
 * as shown with the fictitious JobRequest/JobReply message pair.
 *
 * If you don’t like writing `context` you can always `import context._` to get
 * direct access to `actorOf`, `stop` etc. This is not default in order to keep
 * the name-space clean.
 */
trait Actor {

  // to make type Receive known in subclasses without import
  type Receive = Actor.Receive

  /**
   * Scala API: Stores the context for this actor, including self, and sender.
   * It is implicit to support operations such as `forward`.
   *
   * WARNING: Only valid within the Actor itself, so do not close over it and
   * publish it to other threads!
   *
   * [[akka.actor.ActorContext]] is the Scala API. `getContext` returns a
   * [[akka.actor.AbstractActor.ActorContext]], which is the Java API of the actor
   * context.
   */
  implicit val context: ActorContext = {
    val contextStack = ActorCell.contextStack.get
    if ((contextStack.isEmpty) || (contextStack.head eq null))
      throw ActorInitializationException(
        s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " +
          "You have to use one of the ‘actorOf‘ factory methods to create a new actor. See the documentation.")
    val c = contextStack.head
    ActorCell.contextStack.set(null :: contextStack)
    c
  }

  /**
   * The ‘self‘ field holds the ActorRef for this actor.
   * <p/>
   * Can be used to send messages to itself:
   * <pre>
   * self ! message
   * </pre>
   */
  implicit final val self = context.self //MUST BE A VAL, TRUST ME

  /**
   * The reference sender Actor of the last received message.
   * Is defined if the message was sent from another Actor,
   * else `deadLetters` in [[akka.actor.ActorSystem]].
   *
   * WARNING: Only valid within the Actor itself, so do not close over it and
   * publish it to other threads!
   */
  final def sender(): ActorRef = context.sender()

  /**
   * Scala API: This defines the initial actor behavior, it must return a partial function
   * with the actor logic.
   */
  //#receive
  def receive: Actor.Receive
  //#receive

  /**
   * INTERNAL API.
   *
   * Can be overridden to intercept calls to this actor‘s current behavior.
   *
   * @param receive current behavior.
   * @param msg current message.
   */
  @InternalApi
  protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
    // optimization: avoid allocation of lambda
    if (receive.applyOrElse(msg, Actor.notHandledFun).asInstanceOf[AnyRef] eq Actor.NotHandled) {
      unhandled(msg)
    }
  }

  /**
   * INTERNAL API.
   *
   * Can be overridden to intercept calls to `preStart`. Calls `preStart` by default.
   */
  @InternalApi
  protected[akka] def aroundPreStart(): Unit = preStart()

  /**
   * INTERNAL API.
   *
   * Can be overridden to intercept calls to `postStop`. Calls `postStop` by default.
   */
  @InternalApi
  protected[akka] def aroundPostStop(): Unit = postStop()

  /**
   * INTERNAL API.
   *
   * Can be overridden to intercept calls to `preRestart`. Calls `preRestart` by default.
   */
  @InternalApi
  protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = preRestart(reason, message)

  /**
   * INTERNAL API.
   *
   * Can be overridden to intercept calls to `postRestart`. Calls `postRestart` by default.
   */
  @InternalApi
  protected[akka] def aroundPostRestart(reason: Throwable): Unit = postRestart(reason)

  /**
   * User overridable definition the strategy to use for supervising
   * child actors.
   */
  def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy

  /**
   * User overridable callback.
   * <p/>
   * Is called when an Actor is started.
   * Actors are automatically started asynchronously when created.
   * Empty default implementation.
   */
  @throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest
  //#lifecycle-hooks
  def preStart(): Unit = ()

  //#lifecycle-hooks

  /**
   * User overridable callback.
   * <p/>
   * Is called asynchronously after ‘actor.stop()‘ is invoked.
   * Empty default implementation.
   */
  @throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest
  //#lifecycle-hooks
  def postStop(): Unit = ()

  //#lifecycle-hooks

  /**
   * Scala API: User overridable callback: ‘‘‘By default it disposes of all children and then calls `postStop()`.‘‘‘
   * @param reason the Throwable that caused the restart to happen
   * @param message optionally the current message the actor processed when failing, if applicable
   * <p/>
   * Is called on a crashed Actor right BEFORE it is restarted to allow clean
   * up of resources before Actor is terminated.
   */
  @throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest
  //#lifecycle-hooks
  def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    context.children foreach { child ?
      context.unwatch(child)
      context.stop(child)
    }
    postStop()
  }

  //#lifecycle-hooks

  /**
   * User overridable callback: By default it calls `preStart()`.
   * @param reason the Throwable that caused the restart to happen
   * <p/>
   * Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
   */
  @throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest
  //#lifecycle-hooks
  def postRestart(reason: Throwable): Unit = {
    preStart()
  }
  //#lifecycle-hooks

  /**
   * User overridable callback.
   * <p/>
   * Is called when a message isn‘t handled by the current behavior of the actor
   * by default it fails with either a [[akka.actor.DeathPactException]] (in
   * case of an unhandled [[akka.actor.Terminated]] message) or publishes an [[akka.actor.UnhandledMessage]]
   * to the actor‘s system‘s [[akka.event.EventStream]]
   */
  def unhandled(message: Any): Unit = {
    message match {
      case Terminated(dead) ? throw DeathPactException(dead)
      case _                ? context.system.eventStream.publish(UnhandledMessage(message, sender(), self))
    }
  }
}

  Actor這個特質是直接面向開發者的,這裏我就直接貼出了官方源碼。從代碼來看,trait Actor提供了對消息的處理,actor生命周期接口的暴露,還有當前執行上下文的引用(配置、父子actor關系等)。簡單來說,它就是面向開發者的,擁有開發者定義、使用actor的所有接口、字段和配置信息。

  我們把trait Actor的字段和方法進行分類,有4類:

  1.執行上下文信息。context:ActorContext、self:ActorRef、sender():ActorRef

  2.生命周期管理。aroundReceive、aroundPreStart、aroundPostStop、aroundPreRestart、aroundPostRestart、preStart、postStop、preRestart、postRestart

  3.actor行為定義。receive、unhandled

  4.監督策略。supervisorStrategy: SupervisorStrategy

  在trait Actor中有一個context: ActorContext,這是一個重要的字段,有必要對其進行分析。

implicit val context: ActorContext = {
    val contextStack = ActorCell.contextStack.get
    if ((contextStack.isEmpty) || (contextStack.head eq null))
      throw ActorInitializationException(
        s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " +
          "You have to use one of the ‘actorOf‘ factory methods to create a new actor. See the documentation.")
    val c = contextStack.head
    ActorCell.contextStack.set(null :: contextStack)
    c
  }

  下面是contextStack的定義。

val contextStack = new ThreadLocal[List[ActorContext]] {
    override def initialValue: List[ActorContext] = Nil
  }

  綜合上下文,簡單來說context就是取出當前ActorCell.contextStack的head並返回,然後設置head為null,但context的實現方式值得研究。Actor是一個trait,裏面定義了字段,該字段只有在初始化的時候才知道確定的值。之前我們分析過,Actor的實例化也是通過接收消息(Create)異步完成的。那麽為啥是通過ThreadLocal來賦值呢?按我的理解,因為這是在另一個線程中完成初始化並賦值的,線程間共享變量就用了ThreadLocal;另外還為了防止用戶直接通過new來創建Actor,因為如果直接new的話,當前線程應該是沒有給ActorCell.contextStack賦值的,也無法賦值,它是一個私有變量,所以就只能通過接收到創建Actor並給context賦值。那麽究竟是在哪裏賦值的呢?

//This method is in charge of setting up the contextStack and create a new instance of the Actor
  protected def newActor(): Actor = {
    contextStack.set(this :: contextStack.get)
    try {
      behaviorStack = emptyBehaviorStack
      val instance = props.newActor()

      if (instance eq null)
        throw ActorInitializationException(self, "Actor instance passed to actorOf can‘t be ‘null‘")

      // If no becomes were issued, the actors behavior is its receive method
      behaviorStack = if (behaviorStack.isEmpty) instance.receive :: behaviorStack else behaviorStack
      instance
    } finally {
      val stackAfter = contextStack.get
      if (stackAfter.nonEmpty)
        contextStack.set(if (stackAfter.head eq null) stackAfter.tail.tail else stackAfter.tail) // pop null marker plus our context
    }
  }

  有一個非常奇怪的地方,contextStack是一個List,按我的理解這個既然是ThreadLocal了,每個線程get/set都是相對獨立的,互不影響,直接set當前值應該也是可以的。查了一下這段代碼的提交歷史,好像一直都是一個List或stack,且與Akka的官方維護人員溝通,也沒有明確的答案。

  通過上面的代碼我們知道是把當前的this賦值給了context,而this是一個ActorCell,ActorCell繼承了AbstractActor.ActorContext,AbstractActor.ActorContext繼承了akka.actor.ActorContext,而trait Actor中的context就是一個akka.actor.ActorContext,所以賦值沒有問題。

  從ActorContext的定義以及賦值邏輯來看,它就是ActorCell的一個視圖,或者說一個切面。也就是說開發者在繼承Actor的時候,能夠通過ActorContext獲取ActorCell的當前信息(例如字段值)或能力(例如become當前actor行為)。另外ActorContext還繼承了ActorRefFactory,還具有創建或停止actor的能力。

  Actor和ActorContext我們基本就分析完畢了,ActorCell之前也分析的差不多了,下面我們來看看ActorRef。

abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable {
  scalaRef: InternalActorRef ?

  /**
   * Returns the path for this actor (from this actor up to the root actor).
   */
  def path: ActorPath

  /**
   * Comparison takes path and the unique id of the actor cell into account.
   */
  final def compareTo(other: ActorRef) = {
    val x = this.path compareTo other.path
    if (x == 0) if (this.path.uid < other.path.uid) -1 else if (this.path.uid == other.path.uid) 0 else 1
    else x
  }

  /**
   * Sends the specified message to this ActorRef, i.e. fire-and-forget
   * semantics, including the sender reference if possible.
   *
   * Pass [[akka.actor.ActorRef]] `noSender` or `null` as sender if there is nobody to reply to
   */
  final def tell(msg: Any, sender: ActorRef): Unit = this.!(msg)(sender)

  /**
   * Forwards the message and passes the original sender actor as the sender.
   *
   * Works, no matter whether originally sent with tell/‘!‘ or ask/‘?‘.
   */
  def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender())

  /**
   * INTERNAL API
   * Is the actor shut down?
   * The contract is that if this method returns true, then it will never be false again.
   * But you cannot rely on that it is alive if it returns false, since this by nature is a racy method.
   */
  @deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2")
  private[akka] def isTerminated: Boolean

  final override def hashCode: Int = {
    if (path.uid == ActorCell.undefinedUid) path.hashCode
    else path.uid
  }

  /**
   * Equals takes path and the unique id of the actor cell into account.
   */
  final override def equals(that: Any): Boolean = that match {
    case other: ActorRef ? path.uid == other.path.uid && path == other.path
    case _               ? false
  }

  override def toString: String =
    if (path.uid == ActorCell.undefinedUid) s"Actor[${path}]"
    else s"Actor[${path}#${path.uid}]"
}

  首先ActorRef是可以序列化的,這是一個很重要的特性,意味著可以跨JVM、跨網絡傳輸,這有什麽用呢?如果你對Akka有點了解的話,一定會知道位置透明是它的特性之一。ActorRef是實現位置透明的重要的技術點。

  細心的讀者會發現,ActorRef字段和函數並不多。只有一個字段path:ActorPath,代表actor的路徑,ActorPath相信你一定也有了解,這是標誌當前actor在actor樹形層級的位置,當然還有其他信息(例如所在節點、協議、和當前actor的uid)。僅有的幾個函數比較重要的也就是tell/forward/isTerminated,其中isTerminated還被廢棄了。那也就意味著,Actor的功能大概僅限於:發送消息、序列化傳輸、獲取當前ActorPath。

  不過還要一點需要說明:scalaRef: InternalActorRef ?。這個類型限定意味著混入ActorRef的特質必須擴展自InternalActorRef。其實Scala的這個特性我是不太喜歡的,既然要求混入ActorRef的子類必須擴展自InternalActorRef,那還不如直接把ActorRef的接口放到InternalActorRef或者反過來。不過估計還是為了封裝吧。

/**
 * Internal trait for assembling all the functionality needed internally on
 * ActorRefs. NOTE THAT THIS IS NOT A STABLE EXTERNAL INTERFACE!
 *
 * DO NOT USE THIS UNLESS INTERNALLY WITHIN AKKA!
 */
private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope ?

  之前我們分析過,ActorSystem最終通過LocalActorRefProvider的actorOf函數返回的應該就是InternalActorRef的子類RepointableActorRef。通過源碼我們知道,InternalActorRef又提供了以下幾類信息或功能:Actor生命周期管理(start/resume/suspend/restart/stop/sendSystemMessage)、父子actor信息、provider信息。當然還有是否為本地actor的標誌。本地actor往往意味著它的郵箱可以直接獲取到。

  我們還需要繼續對InternalActorRef的子類ActorRefWithCell進行分析,因為它有可能是聯系ActorCell的關鍵。

/**
 * Common trait of all actor refs which actually have a Cell, most notably
 * LocalActorRef and RepointableActorRef. The former specializes the return
 * type of `underlying` so that follow-up calls can use invokevirtual instead
 * of invokeinterface.
 */
private[akka] abstract class ActorRefWithCell extends InternalActorRef { this: ActorRefScope ?
  def underlying: Cell
  def children: immutable.Iterable[ActorRef]
  def getSingleChild(name: String): InternalActorRef
}

  根據官方註釋及其命名,我們知道ActorRefWithCell是一個繼承了InternalActorRef,同時又擁有Cell功能的抽象類(underlying字段)。RepointableActorRef就是直接繼承ActorRefWithCell的。前文我們也分析過RepointableActorRef在初始化的時候對underlying賦值的,也就意味著RepointableActorRef擁有ActorCell的實例。

技術分享圖片

  上圖我繪制了這幾個概念之間的關系,通過分析我們知道,trait Actor擁有ActorContext實例;ActorContext其實就是ActorCell的一個視圖或者說部分功能集合;ActorCell中同時擁有Actor、ActorRef的實例;ActorRef也擁有ActorCell的實例。

  我們來簡單概括一下這幾個概念的功能區別,trait Actor是提供給開發者定義actor的行為的,開發者只需要實現具體的接口就可以實現actor生命周期和行為的定制化開發;ActorContext則提供了ActorCell的一個視圖或功能集合,主要是為了隔離開發者對ActorCell的直接引用,畢竟ActorCell的內容太多,而且如果不加限制的使用,則會影響線程安全性;ActorCell則提供了一個actor全部的功能,初始化、生命周期管理、dispatcher、mailbox等,這些都是Akka框架開發者需要關註的內部實現邏輯;ActorRef則又提供了Actor對外開放的功能,發送消息等,主要是為了給其他開發者或Actor提供發送消息的能力。

  怎麽樣,分析到這裏是不是感覺Akka這幾個概念還是比較清晰和嚴謹的呢。當然這也設計相對來說就復雜了點,不過也沒辦法,設計精良的框架,哪個不復雜呢?

Akka源碼分析-Actor&ActorContext&ActorRef&ActorCell