Akka源碼分析-Remote-發消息
上一篇博客我們介紹了remote模式下Actor的創建,其實與local的創建並沒有太大區別,一般情況下還是使用LocalActorRef創建了Actor。那麽發消息是否意味著也是相同的呢?
既然actorOf還是委托給了LocalActorRef,那麽在本地創建的Actor發消息還是跟以前一樣的,那麽如果如何給遠程的Actor發消息呢?我們一般是通過actorSelection或者給遠程Actor發送一個Identify消息,來接收對應的ActorRef,然後再發消息。我們來分析一下這兩者的區別。
首先來看actorSelection,不管是用ActorSystem或者ActorContext的actorSelection方法,最終都是調用了ActorRefFactory對應的方法。
/** * Construct an [[akka.actor.ActorSelection]] from the given path, which is * parsed for wildcards (these are replaced by regular expressions * internally). No attempt is made to verify the existence of any part of * the supplied path, it is recommended to send a message and gather the * replies in order to resolve the matching set of actors. */ def actorSelection(path: String): ActorSelection = path match { case RelativeActorPath(elems) ? if (elems.isEmpty) ActorSelection(provider.deadLetters, "") else if (elems.head.isEmpty) ActorSelection(provider.rootGuardian, elems.tail) else ActorSelection(lookupRoot, elems) case ActorPathExtractor(address, elems) ? ActorSelection(provider.rootGuardianAt(address), elems) case _ ? ActorSelection(provider.deadLetters, "") }
我們發現它支持兩種類型的path:RelativeActorPath、ActorPathExtractor。
/** * Extractor for so-called “relative actor paths” as in “relative URI”, not in * “relative to some actor”. Examples: * * * "grand/child" * * "/user/hello/world" */ object RelativeActorPath extends PathUtils { def unapply(addr: String): Option[immutable.Seq[String]] = { try { val uri = new URI(addr) if (uri.isAbsolute) None else Some(split(uri.getRawPath, uri.getRawFragment)) } catch { case _: URISyntaxException ? None } } }
RelativeActorPath提取器比較簡單,就是創建了一個URI對象,然後判斷其是否為Absolute,如果是就返回None,如果不是就返回對應的elemes。對於遠程Actor,我們一般會指定主機名、端口號,例如akka.tcp://[email protected]:2552/user/actorName,根據URI的定義,這個URI的schema是akka.tcp,很顯然是Absolute,那就會返回None。
/** * Given an ActorPath it returns the Address and the path elements if the path is well-formed */ object ActorPathExtractor extends PathUtils { def unapply(addr: String): Option[(Address, immutable.Iterable[String])] = try { val uri = new URI(addr) uri.getRawPath match { case null ? None case path ? AddressFromURIString.unapply(uri).map((_, split(path, uri.getRawFragment).drop(1))) } } catch { case _: URISyntaxException ? None } }
ActorPathExtractor這個提取器的名稱定義的是有問題的,既然actorSelection只支持兩種類型的路徑選擇:本地和遠程。第一個解析器定義成相對路徑,那麽後面一個就直接是絕對路徑好了啊,為啥用ActorPathExtractor這樣蹩腳的命名?難道本地模式下,就不是ActorPath提取器了?我們來看看對於akka.tcp://[email protected]:2552/user/actorName提取出了什麽。經調試,address是akka.tcp://[email protected]:2552,elems就是後面的user、actorName了。
也就是說remote模式下,如果有host、prot等信息就會返回ActorSelection(provider.rootGuardianAt(address), elems)這個類。不過好像無論哪種情況都返回這個類,好尷尬啊,但傳入的第一個參數是不同的:provider.rootGuardianAt(address)。也就是說actorSelection這個函數是不區分當前的模式的,只要含有host/port就會傳入provider.rootGuardianAt(address),否則就傳入provider.rootGuardian。如果在local模式下,也強制用actorSelection查找遠程Actor會發生什麽呢?我們來看看LocalActorRefProvider。
override def rootGuardianAt(address: Address): ActorRef = if (address == rootPath.address) rootGuardian else deadLetters
local模式下,如果待查詢actor的地址就是本地地址,則直接在本地返回查找;否則就返回deadLetters。其實是無法查找遠程actor的。那麽RemoteActorRefProvider呢?
def rootGuardianAt(address: Address): ActorRef = { if (hasAddress(address)) rootGuardian else try { new RemoteActorRef(transport, transport.localAddressForRemote(address), RootActorPath(address), Nobody, props = None, deploy = None) } catch { case NonFatal(e) ? log.error(e, "No root guardian at [{}]", address) new EmptyLocalActorRef(this, RootActorPath(address), eventStream) } }
當然了,它也會判斷一下本地地址是否包含待查詢地址(防止多網卡或其他特殊情況),如果包含,則意味著是本地Actor交給rootGuardian;否則就創建RemoteActorRef。
分析到這裏我們知道了,其實在remote模式下,actorSelection返回了一個RemoteActorRef,還記得這個類的作用嘛?我們之前簡單分析過,它其實是對遠程Acotor的一個本地網絡代理,也就是說所有通過actorSelection發送給遠程actor的消息,都會經過他中轉。
我們繼續分析ActorSelection的源碼
/** * Construct an ActorSelection from the given string representing a path * relative to the given target. This operation has to create all the * matching magic, so it is preferable to cache its result if the * intention is to send messages frequently. */ def apply(anchorRef: ActorRef, elements: Iterable[String]): ActorSelection = { val compiled: immutable.IndexedSeq[SelectionPathElement] = elements.collect({ case x if !x.isEmpty ? if ((x.indexOf(‘?‘) != -1) || (x.indexOf(‘*‘) != -1)) SelectChildPattern(x) else if (x == "..") SelectParent else SelectChildName(x) })(scala.collection.breakOut) new ActorSelection with ScalaActorSelection { override val anchor = anchorRef override val path = compiled } }
很顯然這裏的anchorRef是上面創建的RemoteActorRef實例,其中ActorSelection的anchor(錨定)是anchorRef。至此,一個ActorSelection創建完畢。那麽如何發消息呢?這就需要分析tell或者!方法了。
def tell(msg: Any, sender: ActorRef): Unit = ActorSelection.deliverSelection(anchor.asInstanceOf[InternalActorRef], sender, ActorSelectionMessage(msg, path, wildcardFanOut = false))
其實乍一看,我們應該明白,這就是在deliverSelection函數內部,把消息封裝成ActorSelectionMessage發送給了anchor。
該函數首先判斷sel的elements是否為空,很顯然不為空,進入rec函數。該函數比較復雜而且還是一個尾遞歸函數,但我們知道此處的ref就是RemoteActorRef,那麽RemoteActorRef是不是一個ActorRefWithCell呢?
private[akka] class RemoteActorRef private[akka] ( remote: RemoteTransport, val localAddressToUse: Address, val path: ActorPath, val getParent: InternalActorRef, props: Option[Props], deploy: Option[Deploy]) extends InternalActorRef with RemoteRef
那麽rec就會走到case _的邏輯,也就是把消息轉發給了前面創建的RemoteActorRef,我們來看看這個示例是如何實現tell的。
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = { if (message == null) throw InvalidMessageException("Message is null") try remote.send(message, OptionVal(sender), this) catch handleException(message, sender) }
RemoteActorRef這個類,通過remote把消息發送出去了,那麽remote是什麽呢?RemoteTransport是不是很熟悉?在ActorSystem啟動的時候我們分析過這個對象,它是Remoting類的實例,Remoting裏面send方法是怎樣的呢?
override def send(message: Any, senderOption: OptionVal[ActorRef], recipient: RemoteActorRef): Unit = endpointManager match { case Some(manager) ? manager.tell(Send(message, senderOption, recipient), sender = senderOption getOrElse Actor.noSender) case None ? throw new RemoteTransportExceptionNoStackTrace("Attempted to send remote message but Remoting is not running.", null) }
它又把消息轉發給了manager,而manager就是endpointManager。endpointManager是不是也比較眼熟呢?前面文章中我們也見到過,這是一個EndpointManager實例,而EndpointManager是一個Actor。請註意這裏用Send又對message進行了封裝。EndpointManager是如何對Send消息進行反應的呢?
case s @ Send(message, senderOption, recipientRef, _) ? val recipientAddress = recipientRef.path.address def createAndRegisterWritingEndpoint(): ActorRef = { endpoints.registerWritableEndpoint( recipientAddress, uid = None, createEndpoint( recipientAddress, recipientRef.localAddressToUse, transportMapping(recipientRef.localAddressToUse), settings, handleOption = None, writing = true)) } endpoints.writableEndpointWithPolicyFor(recipientAddress) match { case Some(Pass(endpoint, _)) ? endpoint ! s case Some(Gated(timeOfRelease)) ? if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint() ! s else extendedSystem.deadLetters ! s case Some(Quarantined(uid, _)) ? // timeOfRelease is only used for garbage collection reasons, therefore it is ignored here. We still have // the Quarantined tombstone and we know what UID we don‘t want to accept, so use it. createAndRegisterWritingEndpoint() ! s case None ? createAndRegisterWritingEndpoint() ! s }
分析以上邏輯,簡單來看,會先判斷是不是存在一個endpoint,如果存在說明鏈接已經建立,可以直接發送,否則出於其他狀態,就重新創建endpoint,然後把消息轉發給該endpoint。
def registerWritableEndpoint(address: Address, uid: Option[Int], endpoint: ActorRef): ActorRef = addressToWritable.get(address) match { case Some(Pass(e, _)) ? throw new IllegalArgumentException(s"Attempting to overwrite existing endpoint [$e] with [$endpoint]") case _ ? // note that this overwrites Quarantine marker, // but that is ok since we keep the quarantined uid in addressToRefuseUid addressToWritable += address → Pass(endpoint, uid) writableToAddress += endpoint → address endpoint }
registerWritableEndpoint沒有太復雜的邏輯,就是查詢addressToWritable這個HashMap,如果不存在則把對應的endpoint加入緩存,並返回endpoint。而endpoint是通過createEndpoint創建的。
private def createEndpoint( remoteAddress: Address, localAddress: Address, transport: AkkaProtocolTransport, endpointSettings: RemoteSettings, handleOption: Option[AkkaProtocolHandle], writing: Boolean): ActorRef = { require(transportMapping contains localAddress, "Transport mapping is not defined for the address") // refuseUid is ignored for read-only endpoints since the UID of the remote system is already known and has passed // quarantine checks val refuseUid = endpoints.refuseUid(remoteAddress) if (writing) context.watch(context.actorOf( RARP(extendedSystem).configureDispatcher(ReliableDeliverySupervisor.props( handleOption, localAddress, remoteAddress, refuseUid, transport, endpointSettings, AkkaPduProtobufCodec, receiveBuffers)).withDeploy(Deploy.local), "reliableEndpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next())) else context.watch(context.actorOf( RARP(extendedSystem).configureDispatcher(EndpointWriter.props( handleOption, localAddress, remoteAddress, refuseUid, transport, endpointSettings, AkkaPduProtobufCodec, receiveBuffers, reliableDeliverySupervisor = None)).withDeploy(Deploy.local), "endpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next())) }
createEndpoint最終創建了ReliableDeliverySupervisor這個Actor,也就是說RemoteActorRef最終又把消息發送給了ReliableDeliverySupervisor,ReliableDeliverySupervisor收到消息去調用handleSend方法。
private def handleSend(send: Send): Unit = if (send.message.isInstanceOf[SystemMessage]) { val sequencedSend = send.copy(seqOpt = Some(nextSeq())) tryBuffer(sequencedSend) // If we have not confirmed the remote UID we cannot transfer the system message at this point just buffer it. // GotUid will kick resendAll() causing the messages to be properly written. // Flow control by not sending more when we already have many outstanding. if (uidConfirmed && resendBuffer.nonAcked.size <= settings.SysResendLimit) writer ! sequencedSend } else writer ! send
除去特殊情況,用戶發的普通消息又發送給了writer,艾瑪我去,真是繞啊。writer是什麽呢?
var writer: ActorRef = createWriter()
private def createWriter(): ActorRef = { context.watch(context.actorOf(RARP(context.system).configureDispatcher(EndpointWriter.props( handleOrActive = currentHandle, localAddress = localAddress, remoteAddress = remoteAddress, refuseUid, transport = transport, settings = settings, AkkaPduProtobufCodec, receiveBuffers = receiveBuffers, reliableDeliverySupervisor = Some(self))).withDeploy(Deploy.local), "endpointWriter")) }
很顯然這又是一個ACor!!!哎,繼續查找EndpointWriter這個Actor嘍
def receive = if (handle.isEmpty) initializing else writing
val writing: Receive = { case s: Send ? if (!writeSend(s)) { enqueueInBuffer(s) scheduleBackoffTimer() context.become(buffering) } // We are in Writing state, so buffer is empty, safe to stop here case FlushAndStop ? flushAndStop() case AckIdleCheckTimer if ackDeadline.isOverdue() ? trySendPureAck() }
這個Actor會先判斷是否已經初始化,這裏就假設初始化吧,初始化之後就會進入writing這個偏函數,對send類型的消息,又調用了writeSend函數。
這個函數簡單來看,就是調用codec對消息進行序列化,然後創建了一個pdu,最終把pdu通過handle的write發送出去。handle又是什麽呢?
var handle: Option[AkkaProtocolHandle] = handleOrActive
private[remote] class AkkaProtocolHandle( _localAddress: Address, _remoteAddress: Address, val readHandlerPromise: Promise[HandleEventListener], _wrappedHandle: AssociationHandle, val handshakeInfo: HandshakeInfo, private val stateActor: ActorRef, private val codec: AkkaPduCodec) extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, AkkaScheme) { override def write(payload: ByteString): Boolean = wrappedHandle.write(codec.constructPayload(payload)) override def disassociate(): Unit = disassociate(Unknown) def disassociate(info: DisassociateInfo): Unit = stateActor ! DisassociateUnderlying(info) }
handle最終是一個AkkaProtocolHandle,這個對象我們不再具體分析,我們可以認為這是一個本地與遠程地址鏈接的通道,通過這個通道就可以與遠程actor發送消息了。
分析到這個地方,actorSelection與遠程通信的過程大概就梳理清楚了。為了方便理解,作者特意辛苦的畫了一個流程圖,以供參考。細心的讀者一定會問,那我的消息通過handle發送出去了,對方怎麽接收呢?接收之後怎麽發送到指定actor的郵箱呢?這一點我們後面再分析。
Akka源碼分析-Remote-發消息