Akka源碼分析-Cluster-Distributed Publish Subscribe in Cluster
在ClusterClient源碼分析中,我們知道,他是依托於“Distributed Publish Subscribe in Cluster”來實現消息的轉發的,那本文就來分析一下Pub/Sub是如何實現的。
還記得之前分析Cluster源碼的文章嗎?其實Cluster只是把集群內各個節點的信息通過gossip協議公布出來,並把節點的信息分發出來。但各個actor的地址還是需要開發者自行獲取或設計的,比如我要跟worker通信,那就需要知道這個actor在哪個節點,通過actorPath或actorRef通信。
“Distributed Publish Subscribe”就是用來屏蔽Actor位置的一個組件,通過它你可以給actor發消息而不需要知道actor的網咯位置。其實就是提供了一個類似kafka的消息發布、訂閱的機制,其實吧,如果這個功能讓你實現,你準備怎麽做?肯定是在集群層面提供一個proxy,來屏蔽目標actor的網絡位置啊。簡單來說,就是提供一個通用的actor,來對消息進行轉發,發送者只需要提供目標actor的路徑就好了(比如/user/serviceA)。不過還是那句話,akka的都是對的,akka的都是好的。akka幫你實現這個事兒,就不用你自己考慮通用、穩定的問題啦。
消息訂閱發布模式提供了一個中繼actor:akka.cluster.pubsub.DistributedPubSubMediator。它管理actor的註冊引用、分發實例引用給端actor,而且必須在所有的節點或一組節點內啟動。它可以通過DistributedPubSub擴展啟動,也可以像普通actor那樣啟動。
服務actor的註冊是最終一致的,也就是說服務信息在變化時並不能立即通知給其他節點,過一段時間參會分發給所有節點。當然了每次都是以增量的信息分發這些信息。
消息的發送有兩種模式:Send和Publish。簡單來說就是點對點、廣播。
Publish模式下,只有註冊到命名的topic的actor才會收到消息,topic是啥?。其實這才是真正的訂閱、發布模式。
class Subscriber extends Actor with ActorLogging { import DistributedPubSubMediator.{ Subscribe, SubscribeAck } val mediator = DistributedPubSub(context.system).mediator // subscribe to the topic named "content" mediator ! Subscribe("content", self) def receive = { case s: String ? log.info("Got {}", s) case SubscribeAck(Subscribe("content", None, `self`)) ? log.info("subscribing") } }
class Publisher extends Actor { import DistributedPubSubMediator.Publish // activate the extension val mediator = DistributedPubSub(context.system).mediator def receive = { case in: String ? val out = in.toUpperCase mediator ! Publish("content", out) } }
上面是官方的demo,可以看出,訂閱者actor訂閱了名為“content”的topic,在發布者actor發送指定topic的消息時,會自動收到對應的消息。怎麽樣,是不是很簡單。其實吧,mediator只需要維護一個topic到訂閱者的映射列表就好了,當收到對應topic的消息時,取出對應的訂閱者(也就是ActorRef或actorSelection)把消息轉發給他就好了。
Send模式就是一個點對點模式,每個消息被發送給一個目標,而不用知道這個目標actors的位置。既然之前我們說了,這是通過ActorPath發送的,那如果集群中同時有多個節點命中了這個ActorPath怎麽辦呢?那就路由唄,提供一個RoutingLogic
路由策略。默認策略是隨機發送,當然了我們是可以修改這個策略的。與Publish模式不同,這裏註冊服務actor是通過Put消息實現的。不過實現原理都差不多,反正都要維護列表。
class Destination extends Actor with ActorLogging { import DistributedPubSubMediator.Put val mediator = DistributedPubSub(context.system).mediator // register to the path mediator ! Put(self) def receive = { case s: String ? log.info("Got {}", s) } }
class Sender extends Actor { import DistributedPubSubMediator.Send // activate the extension val mediator = DistributedPubSub(context.system).mediator def receive = { case in: String ? val out = in.toUpperCase mediator ! Send(path = "/user/destination", msg = out, localAffinity = true) } }
當然了,我們還是可以通過SendToAll把消息發送給所有命中指定path的actor的。
這裏需要註意的是,官方的訂閱發布組件只能保證至少一次投遞,想想都是這樣的,哈哈。廢話不多說了,上代碼。
object DistributedPubSub extends ExtensionId[DistributedPubSub] with ExtensionIdProvider { override def get(system: ActorSystem): DistributedPubSub = super.get(system) override def lookup = DistributedPubSub override def createExtension(system: ExtendedActorSystem): DistributedPubSub = new DistributedPubSub(system) }
很顯然DistributedPubSub這個擴展也是可以通過配置直接實例化的,不需要我們自行寫代碼實例化。由於其源碼非常簡單就是定義並創建了mediator這個actor(DistributedPubSubMediator),下面直接轉到DistributedPubSubMediator源碼的分析。
/** * This actor manages a registry of actor references and replicates * the entries to peer actors among all cluster nodes or a group of nodes * tagged with a specific role. * * The `DistributedPubSubMediator` actor is supposed to be started on all nodes, * or all nodes with specified role, in the cluster. The mediator can be * started with the [[DistributedPubSub]] extension or as an ordinary actor. * * Changes are only performed in the own part of the registry and those changes * are versioned. Deltas are disseminated in a scalable way to other nodes with * a gossip protocol. The registry is eventually consistent, i.e. changes are not * immediately visible at other nodes, but typically they will be fully replicated * to all other nodes after a few seconds. * * You can send messages via the mediator on any node to registered actors on * any other node. There is three modes of message delivery. * * You register actors to the local mediator with [[DistributedPubSubMediator.Put]] or * [[DistributedPubSubMediator.Subscribe]]. `Put` is used together with `Send` and * `SendToAll` message delivery modes. The `ActorRef` in `Put` must belong to the same * local actor system as the mediator. `Subscribe` is used together with `Publish`. * Actors are automatically removed from the registry when they are terminated, or you * can explicitly remove entries with [[DistributedPubSubMediator.Remove]] or * [[DistributedPubSubMediator.Unsubscribe]]. * * Successful `Subscribe` and `Unsubscribe` is acknowledged with * [[DistributedPubSubMediator.SubscribeAck]] and [[DistributedPubSubMediator.UnsubscribeAck]] * replies. * * Not intended for subclassing by user code. */ @DoNotInherit class DistributedPubSubMediator(settings: DistributedPubSubSettings) extends Actor with ActorLogging with PerGroupingBuffer
PerGroupingBuffer這個trait不再分析源碼,從代碼和命名來看,就是給每個group提供一個消息緩存的列表。其實這個actor最重要的功能是要能夠感知集群節點的變化和對應服務actor的變化,並及時的把這些信息分發給其他DistributedPubSubMediator,還有就是能夠把消息路由給指定的訂閱者。為了簡化分析,我們忽略第一個功能點,只分析是如何路由消息的。分析這點需要關註幾個消息的處理邏輯:Put、Subscribe、Publish、Send、SendToAll。
先來看Subscribe 。
case msg @ Subscribe(topic, _, _) ? // each topic is managed by a child actor with the same name as the topic val encTopic = encName(topic) bufferOr(mkKey(self.path / encTopic), msg, sender()) { context.child(encTopic) match { case Some(t) ? t forward msg case None ? newTopicActor(encTopic) forward msg } }
Subscribe消息表明某個actor需要訂閱某個topic的消息,簡單來說就是先判斷是否需要緩存,不需要的話就執行{}代碼塊。很顯然,剛開始的時候是不需要緩存的。上面的邏輯就是從當前的children中查找encTopic的一個actor,然後把消息轉發給它;不存在則創建之後再轉發給它。那猜一下這個子actor的功能?其實吧,它應該是一個actor負責維護某個topic與所有訂閱者的關系,所有發給這個topic的消息都會轉發給所有的訂閱者。
def newTopicActor(encTopic: String): ActorRef = { val t = context.actorOf(Props(classOf[Topic], removedTimeToLive, routingLogic), name = encTopic) registerTopic(t) t }
很顯然newTopicActor創建了Topic這個actor,名字就是topic的值,並傳入了兩個參數:removedTimeToLive、routingLogic。第二個是路由策略。
def registerTopic(ref: ActorRef): Unit = { put(mkKey(ref), Some(ref)) context.watch(ref) }
put這個函數的功能我們先略過,其功能大概是把這個actor註冊到系統內,把它與當前地址、版本號做關聯並保存,在適當的時機分發出去。
Topic這個actor只有兩個方法,所以還需要去看下TopicLike的代碼。
可以看到TopicLike中有一個subscribers列表,這也是預期之中的。這個actor的消息會被business和defaultReceive處理,business在Topic中重新實現了,且會優先處理。
case msg @ Subscribe(_, Some(group), _) ? val encGroup = encName(group) bufferOr(mkKey(self.path / encGroup), msg, sender()) { context.child(encGroup) match { case Some(g) ? g forward msg case None ? newGroupActor(encGroup) forward msg } } pruneDeadline = None
收到Subscribe消息後,做了跟DistributedPubSubMediator類似的邏輯,又創建了一個子actor(Group),並把消息轉發給了它。其實這一點在官方也有說過,也就是說,topic也是可以分組的,一個消息並不一定會發給所有訂閱者,可以發給一組訂閱者,其實吧,這一點我不太喜歡,感覺功能有點過了,如果要對topic劃分子topic,用戶自定義實現好了啊,搞得現在源碼這麽復雜。
class Group(val emptyTimeToLive: FiniteDuration, routingLogic: RoutingLogic) extends TopicLike { def business = { case SendToOneSubscriber(msg) ? if (subscribers.nonEmpty) Router(routingLogic, (subscribers map ActorRefRoutee).toVector).route(wrapIfNeeded(msg), sender()) } }
Group的代碼還這麽簡單,它又把Subscribe發給了TopicLike的defaultReceive
def defaultReceive: Receive = { case msg @ Subscribe(_, _, ref) ? context watch ref subscribers += ref pruneDeadline = None context.parent ! Subscribed(SubscribeAck(msg), sender())
上面是defaultReceive對Subscribe消息的處理,就是watch,然後把訂閱者添加到subscribers列表中,再告訴父actor(就是Topic這個actor)訂閱成功了。
聰明的讀者可能會問了,為啥topic還需要弄個消息緩存呢?其實吧,如果是我實現,肯定不搞這麽麻煩啊。消息丟了就丟了啊,沒有訂閱者的時候,消息緩存起來等有訂閱者的時候再發送出去?哈哈,有點浪費內存啊。不過為了穩定性、功能性、完善性,akka還是做了很多額外努力的。不過吧,建議還是把這個隊列的大小調小一點,要不然太浪費內存了。不過很不幸的告訴你,目前沒有這個開關。
既然訂閱topic的邏輯跟我們的猜測差不多,那麽發布消息的邏輯就應該也符合我們的猜測嘍。其實就是獲取某個topic對應的訂閱者,然後foreach把消息發出去。
case Publish(topic, msg, sendOneMessageToEachGroup) ? if (sendOneMessageToEachGroup) publishToEachGroup(mkKey(self.path / encName(topic)), msg) else publish(mkKey(self.path / encName(topic)), msg)
簡單起見,我們只分析消息不分組的情況
def publish(path: String, msg: Any, allButSelf: Boolean = false): Unit = { val refs = for { (address, bucket) ← registry if !(allButSelf && address == selfAddress) // if we should skip sender() node and current address == self address => skip valueHolder ← bucket.content.get(path) ref ← valueHolder.ref } yield ref if (refs.isEmpty) ignoreOrSendToDeadLetters(msg) else refs.foreach(_.forward(msg)) }
還記得registry什麽時候賦值的嘛?如果忘了,可以翻翻registerTopic的代碼,因為我沒有分析,哈哈。不過不重要了,其實就是獲取當前的Topic的Group的ActorRef,然後把消息轉發給它。
case msg ? subscribers foreach { _ forward msg }
Group繼承的TopicLike中的defaultReceive方法處理了消息,其實就是把消息轉發給所有的subscribers。
pub/sub的邏輯就分析到這裏了,其實這裏面的邏輯還是有點復雜的,當然了有一部分是因為topic分組帶來的,其他的都是gossip協議分發訂閱者、發布者的相關信息帶來的。
下面分析Send模式。從Put消息的處理入手。
case Put(ref: ActorRef) ? if (ref.path.address.hasGlobalScope) log.warning("Registered actor must be local: [{}]", ref) else { put(mkKey(ref), Some(ref)) context.watch(ref) }
這就有點簡單了,就是把ref註冊一下,然後watch。這個ref的key是ActorRef值,其實就是ActorPath.toString
case Send(path, msg, localAffinity) ? val routees = registry(selfAddress).content.get(path) match { case Some(valueHolder) if localAffinity ? (for { routee ← valueHolder.routee } yield routee).toVector case _ ? (for { (_, bucket) ← registry valueHolder ← bucket.content.get(path) routee ← valueHolder.routee } yield routee).toVector } if (routees.isEmpty) ignoreOrSendToDeadLetters(msg) else Router(routingLogic, routees).route(wrapIfNeeded(msg), sender())
其實就是從registry中優先找當前節點的訂閱者,然後通過Router和指定的策略把消息發送出去,這個比pub/sub模式稍微簡單點。wrapIfNeeded的功能不再分析,其實就是為了防止與用戶本身的路由消息發生沖突。
關於節點信息同步,感興趣的讀者可以自行閱讀源碼,不過我看下來還是有幾個問題的。比如當前註冊信息的版本是通過時間戳來標誌的,如果節點間時間不同步,會發生意外的結果啊;另外所謂的gossip協議,其實就是隨機把註冊信息發送給其他節點,也就是說集群內的節點都會把消息按照心跳時間,把註冊信息隨機發送給本身節點以外的節點,達到最終註冊信息的同步。如果是我來實現,直接就是粗暴的廣播註冊信息,哈哈,不過這在集群規模比較大的時候比較耗時,啊哈哈。
Distributed Publish Subscribe in Cluster
Akka源碼分析-Cluster-Distributed Publish Subscribe in Cluster