1. 程式人生 > >Akka源碼分析-Persistence-AtLeastOnceDelivery

Akka源碼分析-Persistence-AtLeastOnceDelivery

per 而且 interval hal 估計 spa extends boolean 字段

  使用過akka的應該都知道,默認情況下,消息是按照最多一次發送的,也就是tell函數會盡量把消息發送出去,如果發送失敗,不會重發。但有些業務場景,消息的發送需要滿足最少一次,也就是至少要成功發送一次。akka在Persistence的基礎之上提供了at-least-once傳遞的語法。

  簡單來說akka中的at-least-once機制,會在規定時間內等待消息接收成功的確認消息。如果收到,則發送成功;否則,嘗試重發;超過重試次數則不再重發。

  其實如果不看akka的源碼,讓我們自己來實現至少一次的語法,實現基本功能也非常簡單。首先,我們會在內存中保存消息ID與消息的映射列表,如果指定時間內收到確認消息,則從列表中移除;否則進行重發,到達指定次數之後將為確認的消息從列表移除。重發通過定時器來實現,也就是在固定時間間隔發送心跳消息,發送哪些已經發送且規定時間內沒有收到確認消息的消息。當然了,這個映射列表不能無限增長,要不然內存就爆了,所以這應該是一個固定長度的列表。當然了,我們還可以把未確認消息保存到redis等第三方緩存中去,來避免OOM和actor重啟後未確認消息列表丟失的問題。

  那akka是如何實現的呢?既然at-least-once是在Persistence章節出現,所以未確認消息會通過Persistence機制來持久化嘍?

  在閱讀akka的at-least-once源碼之前,有些概念和結論需要說明一下,具體可參見官網,但為了更好的閱讀源碼,這裏再啰嗦幾句。

  • 發送方actor會在內存中保存為確認消息的列表,為了防止actor重啟導致內存數據丟失,需要手動調用持久化相關的函數,來保存內存數據。AtLeastOnceDelivery只提供接口,不負責自動持久化。
  • AtLeastOnceDelivery通過deliver方法發送消息,開發者必須在手動通過confirmDelivery來確認消息已經收到。
  • 每個消息都必須有一個消息ID,這個消息ID由AtLeastOnceDelivery生成。
  • AtLeastOnceDelivery給ActorSelectioin發送消息,這就意味著是通過ActorPath給actor發消息。如果某個actor被開發者stop只有,重新actorOf創建,可能會收到上一個實例的消息。這一點開發者需要特別註意。
  • 由於未確認消息保存在內存,可能造成OOM,且在超過內存列表長度的時候,投遞消息時會發生異常,需要開發者自行處理該異常。
  • 消息發送不再保證順序,因為消息可能重發,會打亂順序。

  首先來看下官網的demo。

case class Msg(deliveryId: Long, s: String)
case class Confirm(deliveryId: Long)

sealed trait Evt
case class MsgSent(s: String) extends Evt
case class MsgConfirmed(deliveryId: Long) extends Evt

class MyPersistentActor(destination: ActorSelection)
  extends PersistentActor with AtLeastOnceDelivery {

  override def persistenceId: String = "persistence-id"

  override def receiveCommand: Receive = {
    case s: String           ? persist(MsgSent(s))(updateState)
    case Confirm(deliveryId) ? persist(MsgConfirmed(deliveryId))(updateState)
  }

  override def receiveRecover: Receive = {
    case evt: Evt ? updateState(evt)
  }

  def updateState(evt: Evt): Unit = evt match {
    case MsgSent(s) ?
      deliver(destination)(deliveryId ? Msg(deliveryId, s))

    case MsgConfirmed(deliveryId) ? confirmDelivery(deliveryId)
  }
}

class MyDestination extends Actor {
  def receive = {
    case Msg(deliveryId, s) ?
      // ...
      sender() ! Confirm(deliveryId)
  }
}

  demo很簡單,我們需要關註它都繼承了哪些接口:PersistentActor、AtLeastOnceDelivery。之前說過,至少一次是基於持久化的,所以能不能只繼承AtLeastOnceDelivery呢?能!AtLeastOnceDelivery本身就已經繼承PersistentActor了。

/**
 * Scala API: Mix-in this trait with your `PersistentActor` to send messages with at-least-once
 * delivery semantics to destinations. It takes care of re-sending messages when they
 * have not been confirmed within a configurable timeout. Use the [[AtLeastOnceDeliveryLike#deliver]] method to
 * send a message to a destination. Call the [[AtLeastOnceDeliveryLike#confirmDelivery]] method when the destination
 * has replied with a confirmation message.
 *
 * At-least-once delivery implies that original message send order is not always retained
 * and the destination may receive duplicate messages due to possible resends.
 *
 * The interval between redelivery attempts can be defined by [[AtLeastOnceDeliveryLike#redeliverInterval]].
 * After a number of delivery attempts a [[AtLeastOnceDelivery.UnconfirmedWarning]] message
 * will be sent to `self`. The re-sending will still continue, but you can choose to call
 * [[AtLeastOnceDeliveryLike#confirmDelivery]] to cancel the re-sending.
 *
 * The `AtLeastOnceDelivery` trait has a state consisting of unconfirmed messages and a
 * sequence number. It does not store this state itself. You must persist events corresponding
 * to the `deliver` and `confirmDelivery` invocations from your `PersistentActor` so that the
 * state can be restored by calling the same methods during the recovery phase of the
 * `PersistentActor`. Sometimes these events can be derived from other business level events,
 * and sometimes you must create separate events. During recovery calls to `deliver`
 * will not send out the message, but it will be sent later if no matching `confirmDelivery`
 * was performed.
 *
 * Support for snapshots is provided by [[AtLeastOnceDeliveryLike#getDeliverySnapshot]] and [[AtLeastOnceDeliveryLike#setDeliverySnapshot]].
 * The `AtLeastOnceDeliverySnapshot` contains the full delivery state, including unconfirmed messages.
 * If you need a custom snapshot for other parts of the actor state you must also include the
 * `AtLeastOnceDeliverySnapshot`. It is serialized using protobuf with the ordinary Akka
 * serialization mechanism. It is easiest to include the bytes of the `AtLeastOnceDeliverySnapshot`
 * as a blob in your custom snapshot.
 *
 * @see [[AtLeastOnceDeliveryLike]]
 * @see [[AbstractPersistentActorWithAtLeastOnceDelivery]] for Java API
 */
trait AtLeastOnceDelivery extends PersistentActor with AtLeastOnceDeliveryLike

  請大家仔細、認真的閱讀官方源碼註釋,它說明了AtLeastOnceDelivery這個接口幾個非常重要的概念和細節,當然了,前面我們也已經提前說過了。

  AtLeastOnceDelivery源碼很多,從頭分析有點費事,簡單起見,還是從官方demo用法入手。demo中發消息時調用了deliver,收到確認消息時調用了confirmDelivery,那就從這兩個函數入手。

 /**
   * Scala API: Send the message created by the `deliveryIdToMessage` function to
   * the `destination` actor. It will retry sending the message until
   * the delivery is confirmed with [[#confirmDelivery]]. Correlation
   * between `deliver` and `confirmDelivery` is performed with the
   * `deliveryId` that is provided as parameter to the `deliveryIdToMessage`
   * function. The `deliveryId` is typically passed in the message to the
   * destination, which replies with a message containing the same `deliveryId`.
   *
   * The `deliveryId` is a strictly monotonically increasing sequence number without
   * gaps. The same sequence is used for all destinations of the actor, i.e. when sending
   * to multiple destinations the destinations will see gaps in the sequence if no
   * translation is performed.
   *
   * During recovery this method will not send out the message, but it will be sent
   * later if no matching `confirmDelivery` was performed.
   *
   * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]
   * if [[#numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
   */
  def deliver(destination: ActorSelection)(deliveryIdToMessage: Long ? Any): Unit = {
    internalDeliver(destination)(deliveryIdToMessage)
  }

  註釋中說deliveryIdToMessage是一個函數,它根據消息ID構建Any類型的消息。它會一直重試發送消息,直到通過confirmDelivery來確認消息送達。deliveryId是一個自增長的序列號,步長為1。在恢復時不會對外發送消息,但之後會重新發送未確認消息。如果未確認消息到達maxUnconfirmedMessages閾值,則會拋出異常AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException。

 private[akka] final def internalDeliver(destination: ActorSelection)(deliveryIdToMessage: Long ? Any): Unit = {
    val isWildcardSelection = destination.pathString.contains("*")
    require(!isWildcardSelection, "Delivering to wildcard actor selections is not supported by AtLeastOnceDelivery. " +
      "Introduce an mediator Actor which this AtLeastOnceDelivery Actor will deliver the messages to," +
      "and will handle the logic of fan-out and collecting individual confirmations, until it can signal confirmation back to this Actor.")
    internalDeliver(ActorPath.fromString(destination.toSerializationFormat))(deliveryIdToMessage)
  }

  internalDeliver首先會校驗ActorSelection中不能包含*號,它說你可以自己實現,其實就是用一個中轉actor來做匯總。具體為啥就不討論了,反正註意這一點就好了。

private[akka] final def internalDeliver(destination: ActorPath)(deliveryIdToMessage: Long ? Any): Unit = {
    if (unconfirmed.size >= maxUnconfirmedMessages)
      throw new MaxUnconfirmedMessagesExceededException(
        s"Too many unconfirmed messages, maximum allowed is [$maxUnconfirmedMessages]")

    val deliveryId = nextDeliverySequenceNr()
    val now = if (recoveryRunning) { System.nanoTime() - redeliverInterval.toNanos } else System.nanoTime()
    val d = Delivery(destination, deliveryIdToMessage(deliveryId), now, attempt = 0)

    if (recoveryRunning)
      unconfirmed = unconfirmed.updated(deliveryId, d)
    else
      send(deliveryId, d, now)
  }

  第一個if語句就不說了,就是判斷當前內存列表大小的,而且未保存消息是保存在unconfirmed中的。nextDeliverySequenceNr用來生成消息ID。之後創建了Delivery消息,封裝了一些參數,大家要註意下這個case class各個字段的值。最後調用send發送Delivery消息。

private def send(deliveryId: Long, d: Delivery, timestamp: Long): Unit = {
    context.actorSelection(d.destination) ! d.message
    unconfirmed = unconfirmed.updated(deliveryId, d.copy(timestamp = timestamp, attempt = d.attempt + 1))
  }

  send非常簡單,就是把源消息發送出去,然後把對應的Delivery數據保存到unconfirmed中。

private var unconfirmed = immutable.SortedMap.empty[Long, Delivery]

  unconfirmed是一個有序的Map,按照序列號排序。

/**
   * Call this method when a message has been confirmed by the destination,
   * or to abort re-sending.
   * @see [[#deliver]]
   * @return `true` the first time the `deliveryId` is confirmed, i.e. `false` for duplicate confirm
   */
  def confirmDelivery(deliveryId: Long): Boolean = {
    if (unconfirmed.contains(deliveryId)) {
      unconfirmed -= deliveryId
      true
    } else false
  }

  confirmDelivery怎麽實現的呢?就是從unconfirmed中移除對應deliveryId的數據,當然了沒有收到deliveryId的確認消息,這個列表是不會移除相關數據的。這是不是太簡單了點。你是不是想說一句f**k。哈哈,我也想說。不過消息重發是如何實現的呢?前文分析過,是通過心跳來實現的,這就會涉及到一個timer,timer一般會在preStart來實現或者在字段初始化時設定。不過翻遍AtLeastOnceDeliveryLike居然沒找到相關的代碼,只找到了一個timer的定義。

// will be started after recovery completed
  private var redeliverTask: Option[Cancellable] = None
private def startRedeliverTask(): Unit = {
    val interval = redeliverInterval / 2
    redeliverTask = Some(
      context.system.scheduler.schedule(interval, interval, self, RedeliveryTick)(context.dispatcher))
  }

  官方註釋說,會在recovery完成的時候啟動這個timmer。如果你看過之前關於持久化的文章,就一定知道,在PersistentActor啟動時候,會首先進行recovery操作,而不管這是第一次啟動,還是一次重啟。所有消息都恢復之後,會再發送恢復成功的消息,同時調用onReplaySuccess方法。而在AtLeastOnceDeliveryLike中也覆蓋了onReplaySuccess。其實我們可以把onReplaySuccess看做普通actor的preStart函數。

override private[akka] def onReplaySuccess(): Unit = {
    redeliverOverdue()
    startRedeliverTask()
    super.onReplaySuccess()
  }

  redeliverOverdue先不分析,可以看到startRedeliverTask的調用,也就是啟動了一個timer。

/**
   * Interval between redelivery attempts.
   *
   * The default value can be configured with the
   * `akka.persistence.at-least-once-delivery.redeliver-interval`
   * configuration key. This method can be overridden by implementation classes to return
   * non-default values.
   */
  def redeliverInterval: FiniteDuration = defaultRedeliverInterval

  private val defaultRedeliverInterval: FiniteDuration =
    Persistence(context.system).settings.atLeastOnceDelivery.redeliverInterval

  根據startRedeliverTask的源碼來看,它以akka.persistence.at-least-once-delivery.redeliver-interval配置的一半時間作為間隔給self發送RedeliveryTick消息。

override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit =
    message match {
      case RedeliveryTick ?
        redeliverOverdue()

      case x ?
        super.aroundReceive(receive, message)
    }

  很顯然收到RedeliveryTick消息,調用了redeliverOverdue方法,這個方法應該就是在重新投遞未確認的消息嘍。

private def redeliverOverdue(): Unit = {
    val now = System.nanoTime()
    val deadline = now - redeliverInterval.toNanos
    var warnings = Vector.empty[UnconfirmedDelivery]

    unconfirmed
      .iterator
      .filter { case (_, delivery) ? delivery.timestamp <= deadline }
      .take(redeliveryBurstLimit)
      .foreach {
        case (deliveryId, delivery) ?
          send(deliveryId, delivery, now)

          if (delivery.attempt == warnAfterNumberOfUnconfirmedAttempts)
            warnings :+= UnconfirmedDelivery(deliveryId, delivery.destination, delivery.message)
      }

    if (warnings.nonEmpty)
      self ! UnconfirmedWarning(warnings)
  }

  這段源碼還算簡單,就是用當前時間減去重發時間間隔,把小於該時間的消息重新發送,也就是說在一個時間間隔內的消息不會重發(因為還沒有到重發的時間,這部分消息會在下一個時間間隔到達的時候發送)。

/**
   * Maximum number of unconfirmed messages that will be sent at each redelivery burst
   * (burst frequency is half of the redelivery interval).
   * If there‘s a lot of unconfirmed messages (e.g. if the destination is not available for a long time),
   * this helps to prevent an overwhelming amount of messages to be sent at once.
   *
   * The default value can be configured with the
   * `akka.persistence.at-least-once-delivery.redelivery-burst-limit`
   * configuration key. This method can be overridden by implementation classes to return
   * non-default values.
   */
  def redeliveryBurstLimit: Int = defaultRedeliveryBurstLimit

  redeliveryBurstLimit這個參數也需要註意下,也就是說在重發未確認消息時,不能一次性發送完,而是有一個最大值限制的。不要問我為啥,要是我,就直接一次性全部發送了,堆積的消息還留著撐爆內存啊。不過akka是個通用的、穩定的框架,考慮一下也不算壞事吧。

  註意,如果消息重試次數達到warnAfterNumberOfUnconfirmedAttempts這個閾值的話,會給self發送給一個UnconfirmedWarning消息,這跟直接丟棄好像不符合嘛,而且只有在等於閾值的時候才會發送UnconfirmedWarning消息。好丟臉,居然跟預想的不一樣。不過這估計是為了考慮靈活性,如果到達重試閾值都沒收到確認消息,需要開發者在收到UnconfirmedWarning消息後自行處理。如何處理?三個方案吧,手動調用confirmDelivery丟棄該消息,即發送失敗;調用confirmDelivery先把該消息從內存中移除,然後再調用deliver相關的邏輯,做下一次重試,不過此時要做好重試消息的區分;忽略該消息。

  分析到這裏,AtLeastOnceDelivery機制就基本清楚了,那讀者可能會問,這就是把未確認消息先保存在內存,等收到確認消息後再從內存中移除,這是不是太簡單了。如果內存爆了,或者actor失敗重啟了,消息不就丟了?既然是基於持久化的,為啥不把未確認消息持久化呢?我想大概有幾點可以說明吧,保存在內存就是為了快,如果每發一個消息都需要持久化,性能上跟不上,而且還涉及到順序寫和隨機讀兩次IO。當然了,AtLeastOnceDelivery提供了持久化的接口。

/**
   * Full state of the `AtLeastOnceDelivery`. It can be saved with [[PersistentActor#saveSnapshot]].
   * During recovery the snapshot received in [[SnapshotOffer]] should be set
   * with [[#setDeliverySnapshot]].
   *
   * The `AtLeastOnceDeliverySnapshot` contains the full delivery state, including unconfirmed messages.
   * If you need a custom snapshot for other parts of the actor state you must also include the
   * `AtLeastOnceDeliverySnapshot`. It is serialized using protobuf with the ordinary Akka
   * serialization mechanism. It is easiest to include the bytes of the `AtLeastOnceDeliverySnapshot`
   * as a blob in your custom snapshot.
   */
  def getDeliverySnapshot: AtLeastOnceDeliverySnapshot =
    AtLeastOnceDeliverySnapshot(
      deliverySequenceNr,
      unconfirmed.map { case (deliveryId, d) ? UnconfirmedDelivery(deliveryId, d.destination, d.message) }(breakOut))

  這個是啥呢?其實就是為你構建一個AtLeastOnceDeliverySnapshot,這個值包含當前的發送序列號和未確認的消息列表。

/**
   * If snapshot from [[#getDeliverySnapshot]] was saved it will be received during recovery
   * in a [[SnapshotOffer]] message and should be set with this method.
   */
  def setDeliverySnapshot(snapshot: AtLeastOnceDeliverySnapshot): Unit = {
    deliverySequenceNr = snapshot.currentDeliveryId
    val now = System.nanoTime()
    unconfirmed = snapshot.unconfirmedDeliveries.map(d ?
      d.deliveryId → Delivery(d.destination, d.message, now, 0))(breakOut)
  }

  還有一個就是setDeliverySnapshot,他就是從AtLeastOnceDeliverySnapshot中,恢復當前的發送序列號和未確認消息列表。

  AtLeastOnceDelivery關於持久化,就提供了這兩個接口,僅此而已!那啥時候調用setDeliverySnapshot呢,調用setDeliverySnapshot時的AtLeastOnceDeliverySnapshot參數從哪裏獲取呢?哈哈,你猜。

  其實,這是AtLeastOnceDelivery靈活的地方,它讓你自己去實現。怎麽實現呢?如果你用過akka的持久化接口,就一定知道除了persist還有一個saveSnapshot函數,用來保存當前狀態的快照。這就簡單了,你可以給self發一個定時消息,或者在當前未確認消息達到一定值的時候,通過getDeliverySnapshot函數,獲取當前未確認消息的快照,調用saveSnapshot保存起來。在receiveRecover方法中,收到SnapshotOffer消息後,再調用setDeliverySnapshot設置當前未確認消息。腫麽樣是不是很簡單呢。

  AtLeastOnceDelivery就是太簡單了,所以還是有點問題的。比如發送的消息中只包含一個發送序列號,並沒有消息ID和重試次數相關的信息,那如何區別消息是重試的,還是第一次發送的呢?我覺得這是一個很大的bug啊,因為我自己是無法區分消息是不是重發的!這樣在使用AtLeastOnceDelivery時,就註意幾點。

  • 消息最好包含消息ID,即唯一值。
  • 對消息的處理一定要冪等,也就是說收到重復的消息不會影響業務邏輯。
  • 如果對消息的順序要求嚴格,一定要仔細研究這其中的邏輯關系。

Akka源碼分析-Persistence-AtLeastOnceDelivery