Akka源碼分析-Event Bus
akka中的EventBus其實是不常用,也最容易被忽略的一個組件。
但如果你深入Cluster的實現就會發現,這個東西其實還挺有用的,而且它是ActorSystem系統中所有事件消息的一個橫切面,通過它你可以訂閱特定類型的消息,然後做出相應的動作。那讀者可能會問了,這個訂閱消息也很簡單的啊,我自己實現不就好了。嗯,其實你這個想法是對的,akka所有的功能都是基於actor和Actor模型的,所有復雜的功能實現起來都不是特別麻煩,至少實現的模型不會很復雜。不過你可能用不好這個EventBus,因為你並不一定會用,或者說不知道什麽時候用。
對於Event Bus,也就是事件總線,普通場景下個人建議不要使用。Event Bus會使本來就復雜的消息通信更加復雜, 如果不用,開發過程中你明確知道跟某個actor通信的都有哪些actor,也就是說他們之間的通信協議是明確的。僅僅做到這一點,就會使actor系統很復雜了,再用個Event Bus把事件發送出去,會導致消息更加分散,某種意義上也是一種耦合。比如你把消息A發布出去,但卻不知道誰在訂閱它,如果某個版本升級你不消息忘了發布這個消息,那其他actor還能正常工作嗎?這明顯是給自己找麻煩。
那什麽時候用呢?或者說使用的時候都有哪些限制呢?大概有兩種情況吧:1.發布的都是系統消息,跟業務無關;2.為了考慮系統後期的擴展和升級(當然了需要滿足第一個條件)。第一個規則是啥意思呢?就是你發布的消息不會變化或者不會有大的變化,比如只是發布了某個特定actor啟動、停止、退出的系統消息,這些消息無論格式還是內容都是固定的。如果後期系統功能升級,需要監控這些消息,由於消息固定,所以不會給版本帶來很大的問題。再加上不是業務消息,所以也不會給業務造成什麽影響。
廢話不多說,來看看它的實現。當然EventBus實現比較復雜,簡單起見,我們只分析Event Stream。
// this provides basic logging (to stdout) until .start() is called below val eventStream = new EventStream(this, DebugEventStream) eventStream.startStdoutLogger(settings)
在ActorSystemImpl中有上面兩行代碼,創建了一個eventStream,官方文檔說,提供了一個基本的日誌功能。其實這句話我覺得不應該說,容易給大家造成誤解。大家肯定想,既然這個是用來做日誌的,就沒啥用了唄。如果有這個認識的話,再對akka做擴展的時候會走很大的彎路。其實akka系統通過eventStream發布了很多重要的系統消息,比如actor生命周期狀態、remote模式下網絡生命周期事件,如果能夠合理的使用好這些系統消息,會給我們帶來極大的方便,偷偷的告訴你,cluster就是訂閱了一些網絡狀態事件實現了許多重要的功能。
/** * An Akka EventStream is a pub-sub stream of events both system and user generated, * where subscribers are ActorRefs and the channels are Classes and Events are any java.lang.Object. * EventStreams employ SubchannelClassification, which means that if you listen to a Class, * you‘ll receive any message that is of that type or a subtype. * * The debug flag in the constructor toggles if operations on this EventStream should also be published * as Debug-Events */ class EventStream(sys: ActorSystem, private val debug: Boolean) extends LoggingBus with SubchannelClassification
Akka EventStream是一個發布-訂閱事件流,包括系統和用戶產生的數據。訂閱某個特定類型的消息,不一定會收到對應的消息,前提是你自己或系統調用EventStream的發布接口把消息發布了出去。
/** * Classification which respects relationships between channels: subscribing * to one channel automatically and idempotently subscribes to all sub-channels. */ trait SubchannelClassification { this: EventBus ?
SubchannelClassification,子頻道分類器,根據官方描述大概知道,它會自動的訂閱所有子頻道的消息。大概是會自動訂閱某個父類所有子類的消息吧。頻道是啥?當然是一個類或者接口了啊。
LoggingBus具體做啥的就不分析了,反正是跟記日誌有關的。不過從它的繼承關系來看,它直接決定了EventStream是一個EventBus的某個子類。這個繼承關系我覺得官方實現的不夠合理,畢竟記日誌只是EventStream一個功能。EventStream首先應該是一個EventBus,只不過混入了Logging的功能而已,現在直接繼承LoggingBus從而繼承EventBus,顯得不夠優化!
class DeadLetterListener extends Actor { def receive = { case d: DeadLetter ? println(d) } } val listener = system.actorOf(Props[DeadLetterListener]) system.eventStream.subscribe(listener, classOf[DeadLetter])
這是官方的一個例子,非常簡單,就是調用subscribe方法,訂閱了DeadLetter類型的消息,把消息發送給DeadLetterListener這個actor。那麽來看看subscribe如何實現,不過在這之前還需要看看它是如何初始化的。在ActorSystem的start方法中調用了eventStream.startUnsubscriber(),對eventStream實現了初始化。
/** * ‘‘Must‘‘ be called after actor system is "ready". * Starts system actor that takes care of unsubscribing subscribers that have terminated. */ def startUnsubscriber(): Unit = // sys may be null for backwards compatibility reasons if (sys ne null) EventStreamUnsubscriber.start(sys, this)
其中sys就是我們傳入的ActorSystem實例。
/** * INTERNAL API * * Provides factory for [[akka.event.EventStreamUnsubscriber]] actors with **unique names**. * This is needed if someone spins up more [[EventStream]]s using the same [[akka.actor.ActorSystem]], * each stream gets it‘s own unsubscriber. */ private[akka] object EventStreamUnsubscriber { private val unsubscribersCount = new AtomicInteger(0) final case class Register(actor: ActorRef) final case class UnregisterIfNoMoreSubscribedChannels(actor: ActorRef) private def props(eventStream: EventStream, debug: Boolean) = Props(classOf[EventStreamUnsubscriber], eventStream, debug) def start(system: ActorSystem, stream: EventStream) = { val debug = system.settings.config.getBoolean("akka.actor.debug.event-stream") system.asInstanceOf[ExtendedActorSystem] .systemActorOf(props(stream, debug), "eventStreamUnsubscriber-" + unsubscribersCount.incrementAndGet()) } }
官方說EventStreamUnsubscriber是個工廠類,用來給EventStreamUnsubscriber提供一個唯一的名字,如果開發者啟動了多個EventStream不至於會出現沖突。其實吧,個人覺得完全沒必要,多創建一個EventStream,這都屬於高級用法了,akka還沒普及,遠到不了這個地步。
/** * INTERNAL API * * Watches all actors which subscribe on the given eventStream, and unsubscribes them from it when they are Terminated. * * Assumptions note: * We do not guarantee happens-before in the EventStream when 2 threads subscribe(a) / unsubscribe(a) on the same actor, * thus the messages sent to this actor may appear to be reordered - this is fine, because the worst-case is starting to * needlessly watch the actor which will not cause trouble for the stream. This is a trade-off between slowing down * subscribe calls * because of the need of linearizing the history message sequence and the possibility of sometimes * watching a few actors too much - we opt for the 2nd choice here. */ protected[akka] class EventStreamUnsubscriber(eventStream: EventStream, debug: Boolean = false) extends Actor
從官方註釋來看,EventStreamUnsubscriber是所有訂閱eventStream的監督者,當訂閱者(也就是某個actor)stop的時候,把對應的訂閱消息移除,以便發送不必要的消息。那EventStreamUnsubscriber和EventStream的關系是怎麽樣的呢?其實吧,這裏又做了一個分層,EventStreamUnsubscriber負責監控對應的actor,把消息發送個它,而EventStream負責訂閱相關的狀態維護。
初始化完成後,下面來看subscribe的實現。
override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { if (subscriber eq null) throw new IllegalArgumentException("subscriber is null") if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "subscribing " + subscriber + " to channel " + channel)) registerWithUnsubscriber(subscriber) super.subscribe(subscriber, channel) }
@tailrec private def registerWithUnsubscriber(subscriber: ActorRef): Unit = { // sys may be null for backwards compatibility reasons if (sys ne null) initiallySubscribedOrUnsubscriber.get match { case value @ Left(subscribers) ? if (!initiallySubscribedOrUnsubscriber.compareAndSet(value, Left(subscribers + subscriber))) registerWithUnsubscriber(subscriber) case Right(unsubscriber) ? unsubscriber ! EventStreamUnsubscriber.Register(subscriber) } }
/** Either the list of subscribed actors, or a ref to an [[akka.event.EventStreamUnsubscriber]] */ private val initiallySubscribedOrUnsubscriber = new AtomicReference[Either[Set[ActorRef], ActorRef]](Left(Set.empty))
initiallySubscribedOrUnsubscriber的定義還是很奇怪的,不過根據上下文來分析,registerWithUnsubscriber應該就是給EventStreamUnsubscriber發送EventStreamUnsubscriber.Register(subscriber)消息,然後調用super.subscribe
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscriptions.synchronized { val diff = subscriptions.addValue(to, subscriber) addToCache(diff) diff.nonEmpty }
super.subscribe是在SubchannelClassification中實現的。
// must be lazy to avoid initialization order problem with subclassification private lazy val subscriptions = new SubclassifiedIndex[Classifier, Subscriber]()
第一行的addVelue,應該就是把類型和對應的Subscriber做索引,當然了同一個Classifier是可以有多個訂閱者的。Subscriber是啥?當然是一個ActorRef了。這個在EventStream繼承的ActorEventBus中定義。
@volatile private var cache = Map.empty[Classifier, Set[Subscriber]]
cache其實就是一個map,保存類型與訂閱者集合的映射。邏輯是不是也很清晰呢?簡單來說,訂閱某個消息,就是把消息的類型和對應的actorRef做一個綁定,然後在某個對應類型的消息產生時,調用actorRef的tell函數就行了。
def publish(event: Event): Unit = { val c = classify(event) val recv = if (cache contains c) cache(c) // c will never be removed from cache else subscriptions.synchronized { if (cache contains c) cache(c) else { addToCache(subscriptions.addKey(c)) cache(c) } } recv foreach (publish(event, _)) }
那我們來看看publish的具體實現,EventStream中定義了Event就是一個AnyRef,其實就是可以發布任意引用類型的消息。這段代碼也比較容易理解,在分析classify之前可以猜一猜,其實就是找出傳入的AnyRef具體類型,然後從cache中找到對應的訂閱者,在調用publish發布消息。
protected def classify(event: AnyRef): Class[_] = event.getClass
EventStream重寫了classify函數,很簡單,就是getClass。
protected def publish(event: AnyRef, subscriber: ActorRef) = { if (sys == null && subscriber.isTerminated) unsubscribe(subscriber) else subscriber ! event }
publish呢?就是調用subscriber的! 方法,把消息發送出去。
其實分析到這裏,基本就結束了,特別簡單。訂閱消息就是把對應的類型和actor關聯起來,publish的時候通過消息的類型找到對應的訂閱者(也就是actor),把消息發給訂閱者就結束了,自己實現也特別簡單。不過為了通用和穩定,akka還是做了很多工作的。比如某個actor被Terminat的時候,可以自動取消訂閱,畢竟actor還可能意外終止,沒有來得及調用unsubscribe方法取消訂閱。
EventStream就分析到這裏了,不過介紹這個知識點有兩個出發點。首先這個EventStream作為所有消息的截面,特殊情況下,還是很有用的。另外就是在分析cluster的時候,這個點還是比較重要的,畢竟cluster用eventStream實現了某些特殊功能,雖然這點我不太喜歡。
Akka源碼分析-Event Bus