1. 程式人生 > >Akka源碼分析-Cluster-DistributedData

Akka源碼分析-Cluster-DistributedData

最終 take stop already cer perf get proto writer

  上一篇博客我們研究了集群的分片源碼,雖然akka的集群分片的初衷是用來解決actor分布的,但如果我們稍加改造就可以很輕松的開發出一個簡單的分布式緩存系統,怎麽做?哈哈很簡單啊,實體actor的id就是key,actor的狀態就是value,而且還可以無鎖的改變狀態。

  其實akka的DistributedData有點類似緩存系統,當你需要在集群中分享數據的話,DistributedData就非常有用了。可以通過跟K/V緩存系統類似的API來存取數據,不過DistributedData中南的數據是 Conflict Free Replicated Data Types (CRDTs),即無沖突可復制數據類型,CRDT我也不太熟,就不介紹了,感興趣的同學可以自行谷歌。我們姑且先認為它是用來解決數據復制的最終一致性的吧。

  Akka Distributed Data的所有數據實體分布在所有節點或一組節點上,這是通過基於gossip協議的復制來實現的。可以有更細粒度的一致性讀寫控制。CRDT可以在沒有協調器的情況下對數據進行更新,所有的一致性更新會被所有節點通過可監控的合並操作解決。數據的狀態最終達到一致。

  akka.cluster.ddata.Replicator這個actor提供數據交互的API,Replicator需要在所有節點都要啟動,想想都知道為啥,畢竟它需要分發數據啊。由於Replicator只是一個具有特殊功能的普通actor,它可以正常的啟動,但要註意各個參數的一致,當然也可以通過akka.cluster.ddata.DistributedData插件來啟動。

  通過Replicator我們可以Update(更新)、Get(獲取)、Subscribe(訂閱)、Delete(刪除)對應的數據。

  Akka Distributed Data支持的數據類型必須是收斂的CRDT,且繼承ReplicatedData特質,也就是說都必須提供單調的合並函數,並且狀態變化總是收斂的。akka內置的數據類型有:

  • Counters: GCounter, PNCounter
  • Sets: GSet, ORSet
  • Maps: ORMap, ORMultiMap, LWWMap, PNCounterMap
  • Registers: LWWRegister, Flag

  GCounter是一個只增長的計數器,它只能增加,不能減少。它以類似於向量時鐘的方式工作,跟蹤所有節點的值,以最大值進行合並。如果同時需要對計數器遞增和遞減 ,就需要使用PNCounter(正負計數器)了。PNCounter單獨對遞增和遞減進行跟蹤,二者都是以內部的GCounter來表示,合並的時候也是通過GCounter。

  GSet是一個只能增加元素的集合;ORSet(observed-remove set)可以同時增加、刪除元素。ORSet有一個版本向量,它在增加元素的時候遞增。版本向量被一個叫“birth dot”的對象跟蹤。

  ORMap(observed-remove map)是一個map,其key可以是任何類型,values必須是ReplicatedData類型,它支持增加、更新、刪除。如果增加和刪除同時執行,則增加會成功。如果多個更新同時執行,則values會被合並。

  ORMultiMap (observed-remove multi-map)是一個多值映射的map,其values是ORSet類型。

  PNCounterMap (positive negative counter map) 是一個命名計數器,其values是PNCounte類型。

  LWWMap (last writer wins map)是一個map,其values是LWWRegister (last writer wins register)。

  Flag是一個布爾值,初始值是false,可以被設置成ture。且一旦設置成ture就不能再改變。

  LWWRegister (last writer wins register)可以保存任何能序列化的值。它保存最後更新的值,其實“最後”是很難判斷的,因為在分布式環境下,各個節點很難達到絕對的時間一致的狀態。且如果時間一致,會以IP地址最小的值為準。這也就意味著LWWRegister的值並不一定是物理上最新的值,也就意味著不一定是一致性更新,說白了就不是真的最終一致。

  說了那麽多,Akka Distributed Data其實不是一個緩存系統,它並不適用於所有類型的問題,最終一致性也並不一定符合所有的場景。而且它也不是為大數據準備的,頂級實體的數量不應超過10萬。當有新節點加入集群的時候,所有的數據都會被轉移到新節點。所有的數據都是在內存中的,這也是不適合大數據的另外一個原因。當數據實體變化的時候,它的所有狀態可能會被復制到其他所有節點,如果它支持增量CRDT也是可以增量賦值的。

  有讀者看到這裏可能會問,你為啥講了這麽多的概念性的東西,而沒有看源碼,那是因為我覺得分布式數據的重點在於概念,而且akka的這一特性很少有人用。

class DataBot extends Actor with ActorLogging {
  import DataBot._

  val replicator = DistributedData(context.system).replicator
  implicit val node = Cluster(context.system)

  import context.dispatcher
  val tickTask = context.system.scheduler.schedule(5.seconds, 5.seconds, self, Tick)

  val DataKey = ORSetKey[String]("key")

  replicator ! Subscribe(DataKey, self)

  def receive = {
    case Tick ?
      val s = ThreadLocalRandom.current().nextInt(97, 123).toChar.toString
      if (ThreadLocalRandom.current().nextBoolean()) {
        // add
        log.info("Adding: {}", s)
        replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_ + s)
      } else {
        // remove
        log.info("Removing: {}", s)
        replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_ - s)
      }

    case _: UpdateResponse[_] ? // ignore

    case c @ Changed(DataKey) ?
      val data = c.get(DataKey)
      log.info("Current elements: {}", data.elements)
  }

  override def postStop(): Unit = tickTask.cancel()

}

  上面是官方demo,介紹的是ORSet這個數據類型的Subscribe、Update的操作。可以看到,數據的所有操作都是通過replicator來實現的,而且都是發消息的形式。所以按照這個思路,我們首先就需要看看DistributedData(context.system).replicator的源碼。

/**
 * Akka extension for convenient configuration and use of the
 * [[Replicator]]. Configuration settings are defined in the
 * `akka.cluster.ddata` section, see `reference.conf`.
 */
class DistributedData(system: ExtendedActorSystem) extends Extension {

  private val config = system.settings.config.getConfig("akka.cluster.distributed-data")
  private val settings = ReplicatorSettings(config)

  /**
   * Returns true if this member is not tagged with the role configured for the
   * replicas.
   */
  def isTerminated: Boolean = Cluster(system).isTerminated || !settings.roles.subsetOf(Cluster(system).selfRoles)

  /**
   * `ActorRef` of the [[Replicator]] .
   */
  val replicator: ActorRef =
    if (isTerminated) {
      system.log.warning("Replicator points to dead letters: Make sure the cluster node is not terminated and has the proper role!")
      system.deadLetters
    } else {
      val name = config.getString("name")
      system.systemActorOf(Replicator.props(settings), name)
    }
}

  DistributedData這個擴展似乎有點太簡單了哈,就是用systemActorOf創建了一個Replicator。

/**
 * A replicated in-memory data store supporting low latency and high availability
 * requirements.
 *
 * The `Replicator` actor takes care of direct replication and gossip based
 * dissemination of Conflict Free Replicated Data Types (CRDTs) to replicas in the
 * the cluster.
 * The data types must be convergent CRDTs and implement [[ReplicatedData]], i.e.
 * they provide a monotonic merge function and the state changes always converge.
 *
 * You can use your own custom [[ReplicatedData]] or [[DeltaReplicatedData]] types,
 * and several types are provided by this package, such as:
 *
 * <ul>
 * <li>Counters: [[GCounter]], [[PNCounter]]</li>
 * <li>Registers: [[LWWRegister]], [[Flag]]</li>
 * <li>Sets: [[GSet]], [[ORSet]]</li>
 * <li>Maps: [[ORMap]], [[ORMultiMap]], [[LWWMap]], [[PNCounterMap]]</li>
 * </ul>
 *
 * The `Replicator` actor must be started on each node in the cluster, or group of
 * nodes tagged with a specific role. It communicates with other `Replicator` instances
 * with the same path (without address) that are running on other nodes . For convenience it
 * can be used with the [[DistributedData]] extension but it can also be started as an ordinary
 * actor using the `Replicator.props`. If it is started as an ordinary actor it is important
 * that it is given the same name, started on same path, on all nodes.
 *
 * The protocol for replicating the deltas supports causal consistency if the data type
 * is marked with [[RequiresCausalDeliveryOfDeltas]]. Otherwise it is only eventually
 * consistent. Without causal consistency it means that if elements ‘c‘ and ‘d‘ are
 * added in two separate `Update` operations these deltas may occasionally be propagated
 * to nodes in different order than the causal order of the updates. For this example it
 * can result in that set {‘a‘, ‘b‘, ‘d‘} can be seen before element ‘c‘ is seen. Eventually
 * it will be {‘a‘, ‘b‘, ‘c‘, ‘d‘}.
 *
 * == CRDT Garbage ==
 *
 * One thing that can be problematic with CRDTs is that some data types accumulate history (garbage).
 * For example a `GCounter` keeps track of one counter per node. If a `GCounter` has been updated
 * from one node it will associate the identifier of that node forever. That can become a problem
 * for long running systems with many cluster nodes being added and removed. To solve this problem
 * the `Replicator` performs pruning of data associated with nodes that have been removed from the
 * cluster. Data types that need pruning have to implement [[RemovedNodePruning]]. The pruning consists
 * of several steps:
 * <ol>
 * <li>When a node is removed from the cluster it is first important that all updates that were
 * done by that node are disseminated to all other nodes. The pruning will not start before the
 * `maxPruningDissemination` duration has elapsed. The time measurement is stopped when any
 * replica is unreachable, but it‘s still recommended to configure this with certain margin.
 * It should be in the magnitude of minutes.</li>
 * <li>The nodes are ordered by their address and the node ordered first is called leader.
 * The leader initiates the pruning by adding a `PruningInitialized` marker in the data envelope.
 * This is gossiped to all other nodes and they mark it as seen when they receive it.</li>
 * <li>When the leader sees that all other nodes have seen the `PruningInitialized` marker
 * the leader performs the pruning and changes the marker to `PruningPerformed` so that nobody
 * else will redo the pruning. The data envelope with this pruning state is a CRDT itself.
 * The pruning is typically performed by "moving" the part of the data associated with
 * the removed node to the leader node. For example, a `GCounter` is a `Map` with the node as key
 * and the counts done by that node as value. When pruning the value of the removed node is
 * moved to the entry owned by the leader node. See [[RemovedNodePruning#prune]].</li>
 * <li>Thereafter the data is always cleared from parts associated with the removed node so that
 * it does not come back when merging. See [[RemovedNodePruning#pruningCleanup]]</li>
 * <li>After another `maxPruningDissemination` duration after pruning the last entry from the
 * removed node the `PruningPerformed` markers in the data envelope are collapsed into a
 * single tombstone entry, for efficiency. Clients may continue to use old data and therefore
 * all data are always cleared from parts associated with tombstoned nodes. </li>
 * </ol>
 */
final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLogging 

  Replicator支持低延遲和高可用的內存存儲,而且就是普通的actor。這個actor的字段和方法很多,但有一個字段需要我們註意。

  // the actual data
  var dataEntries = Map.empty[KeyId, (DataEnvelope, Digest)]
type KeyId = String
    // Gossip Status message contains SHA-1 digests of the data to determine when
    // to send the full data
    type Digest = ByteString

  官方註釋說這是實際的數據,而且,KeyId是一個String類型數據,value是一個元組,元組的第一個元素是一個數據envelop,它包含數據實體,保持當前實體的修剪進程。

    /**
     * The `DataEnvelope` wraps a data entry and carries state of the pruning process for the entry.
     */
    final case class DataEnvelope(
      data:          ReplicatedData,
      pruning:       Map[UniqueAddress, PruningState] = Map.empty,
      deltaVersions: VersionVector                    = VersionVector.empty)
      extends ReplicatorMessage

  我們知道對所有數據的操作都是通過replicator發消息來完成的,那就來看receive的源碼。

  def receive =
    if (hasDurableKeys) load
    else normalReceive

   actor剛啟動的時候,可能會處於load階段,這個我們先忽略。

val normalReceive: Receive = {
    case Get(key, consistency, req)             ? receiveGet(key, consistency, req)
    case u @ Update(key, writeC, req)           ? receiveUpdate(key, u.modify, writeC, req)
    case Read(key)                              ? receiveRead(key)
    case Write(key, envelope)                   ? receiveWrite(key, envelope)
    case ReadRepair(key, envelope)              ? receiveReadRepair(key, envelope)
    case DeltaPropagation(from, reply, deltas)  ? receiveDeltaPropagation(from, reply, deltas)
    case FlushChanges                           ? receiveFlushChanges()
    case DeltaPropagationTick                   ? receiveDeltaPropagationTick()
    case GossipTick                             ? receiveGossipTick()
    case ClockTick                              ? receiveClockTick()
    case Status(otherDigests, chunk, totChunks) ? receiveStatus(otherDigests, chunk, totChunks)
    case Gossip(updatedData, sendBack)          ? receiveGossip(updatedData, sendBack)
    case Subscribe(key, subscriber)             ? receiveSubscribe(key, subscriber)
    case Unsubscribe(key, subscriber)           ? receiveUnsubscribe(key, subscriber)
    case Terminated(ref)                        ? receiveTerminated(ref)
    case MemberWeaklyUp(m)                      ? receiveWeaklyUpMemberUp(m)
    case MemberUp(m)                            ? receiveMemberUp(m)
    case MemberRemoved(m, _)                    ? receiveMemberRemoved(m)
    case evt: MemberEvent                       ? receiveOtherMemberEvent(evt.member)
    case UnreachableMember(m)                   ? receiveUnreachable(m)
    case ReachableMember(m)                     ? receiveReachable(m)
    case GetKeyIds                              ? receiveGetKeyIds()
    case Delete(key, consistency, req)          ? receiveDelete(key, consistency, req)
    case RemovedNodePruningTick                 ? receiveRemovedNodePruningTick()
    case GetReplicaCount                        ? receiveGetReplicaCount()
    case TestFullStateGossip(enabled)           ? fullStateGossipEnabled = enabled
  }

  normalReceive的分支很多,它支持的所有操作都在這裏了,我們先來看Subscrive消息。

def receiveSubscribe(key: KeyR, subscriber: ActorRef): Unit = {
    newSubscribers.addBinding(key.id, subscriber)
    if (!subscriptionKeys.contains(key.id))
      subscriptionKeys = subscriptionKeys.updated(key.id, key)
    context.watch(subscriber)
  }

  也比較簡單,就是把key.id和subscriber進行綁定,然後wartch。那麽KeyR又是什麽類型呢?

private[akka] type KeyR = Key[ReplicatedData]

/**
 * Key for the key-value data in [[Replicator]]. The type of the data value
 * is defined in the key. Keys are compared equal if the `id` strings are equal,
 * i.e. use unique identifiers.
 *
 * Specific classes are provided for the built in data types, e.g. [[ORSetKey]],
 * and you can create your own keys.
 */
abstract class Key[+T <: ReplicatedData](val id: Key.KeyId) extends Serializable

  keyR其實就是對應數據類型的key,不再深入研究。下面來看Updae是如何操作的。

def receiveUpdate(key: KeyR, modify: Option[ReplicatedData] ? ReplicatedData,
                    writeConsistency: WriteConsistency, req: Option[Any]): Unit = {
    val localValue = getData(key.id)

    def deltaOrPlaceholder(d: DeltaReplicatedData): Option[ReplicatedDelta] = {
      d.delta match {
        case s @ Some(_) ? s
        case None        ? Some(NoDeltaPlaceholder)
      }
    }

    Try {
      localValue match {
        case Some(DataEnvelope(DeletedData, _, _)) ? throw new DataDeleted(key, req)
        case Some(envelope @ DataEnvelope(existing, _, _)) ?
          modify(Some(existing)) match {
            case d: DeltaReplicatedData if deltaCrdtEnabled ?
              (envelope.merge(d.resetDelta.asInstanceOf[existing.T]), deltaOrPlaceholder(d))
            case d ?
              (envelope.merge(d.asInstanceOf[existing.T]), None)
          }
        case None ? modify(None) match {
          case d: DeltaReplicatedData if deltaCrdtEnabled ?
            (DataEnvelope(d.resetDelta), deltaOrPlaceholder(d))
          case d ? (DataEnvelope(d), None)
        }
      }
    } match {
      case Success((envelope, delta)) ?
        log.debug("Received Update for key [{}]", key)

        // handle the delta
        delta match {
          case Some(d) ? deltaPropagationSelector.update(key.id, d)
          case None    ? // not DeltaReplicatedData
        }

        // note that it‘s important to do deltaPropagationSelector.update before setData,
        // so that the latest delta version is used
        val newEnvelope = setData(key.id, envelope)

        val durable = isDurable(key.id)
        if (isLocalUpdate(writeConsistency)) {
          if (durable)
            durableStore ! Store(key.id, new DurableDataEnvelope(newEnvelope),
              Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), replyTo)))
          else
            replyTo ! UpdateSuccess(key, req)
        } else {
          val (writeEnvelope, writeDelta) = delta match {
            case Some(NoDeltaPlaceholder) ? (newEnvelope, None)
            case Some(d: RequiresCausalDeliveryOfDeltas) ?
              val v = deltaPropagationSelector.currentVersion(key.id)
              (newEnvelope, Some(Delta(newEnvelope.copy(data = d), v, v)))
            case Some(d) ? (newEnvelope.copy(data = d), None)
            case None    ? (newEnvelope, None)
          }
          val writeAggregator =
            context.actorOf(WriteAggregator.props(key, writeEnvelope, writeDelta, writeConsistency,
              req, nodes, unreachable, replyTo, durable)
              .withDispatcher(context.props.dispatcher))
          if (durable) {
            durableStore ! Store(key.id, new DurableDataEnvelope(newEnvelope),
              Some(StoreReply(UpdateSuccess(key, req), StoreFailure(key, req), writeAggregator)))
          }
        }
      case Failure(e: DataDeleted[_]) ?
        log.debug("Received Update for deleted key [{}]", key)
        replyTo ! e
      case Failure(e) ?
        log.debug("Received Update for key [{}], failed: {}", key, e.getMessage)
        replyTo ! ModifyFailure(key, "Update failed: " + e.getMessage, e, req)
    }
  }

  這段代碼的邏輯也很簡單,其實就是通過ID獲取本節點的值,然後用自定義的modify函數,對其進行修改,修改之後通過merge方法,修改本地變量的值。

def getData(key: KeyId): Option[DataEnvelope] = dataEntries.get(key).map { case (envelope, _) ? envelope }

  getData也非常簡單,其實就是去map中找對應key的值。

  其實分析到這裏,對數據的操作就不需要再分析了,為啥呢?所有的增刪改查,基本都是修改當前actor的dataEntries來完成的。基於我們之前分析源碼的知識來看,同步機制也不需要再深入研究了(我們的系列定位就是簡單初級的源碼入門)。因為可以猜到,一定有一個定時器,會把當前的dataEntries通過gossip協議分發出去,當其他節點收到對應的數據後,會調用CRDT數據類型的merge來應用修改。由於CRDT的特性,所以merge的時候不需要考慮沖突的問題,所以經過一輪的gossip廣播,所有節點的數據可以達到最終一致,在最終一致之前,節點是看不到對應的變化的數據的。

 def write(key: KeyId, writeEnvelope: DataEnvelope): Option[DataEnvelope] = {
    getData(key) match {
      case someEnvelope @ Some(envelope) if envelope eq writeEnvelope ? someEnvelope
      case Some(DataEnvelope(DeletedData, _, _))                      ? Some(DeletedEnvelope) // already deleted
      case Some(envelope @ DataEnvelope(existing, _, _)) ?
        try {
          // DataEnvelope will mergeDelta when needed
          val merged = envelope.merge(writeEnvelope).addSeen(selfAddress)
          Some(setData(key, merged))
        } catch {
          case e: IllegalArgumentException ?
            log.warning(
              "Couldn‘t merge [{}], due to: {}", key, e.getMessage)
            None
        }
      case None ?
        // no existing data for the key
        val writeEnvelope2 =
          writeEnvelope.data match {
            case d: ReplicatedDelta ?
              val z = d.zero
              writeEnvelope.copy(data = z.mergeDelta(d.asInstanceOf[z.D]))
            case _ ?
              writeEnvelope
          }

        val writeEnvelope3 = writeEnvelope2.addSeen(selfAddress)
        Some(setData(key, writeEnvelope3))
    }
  }

  上面是真正對數據進行write的源碼,也可以對我們的猜測進行佐證。

  好了,akka的Cluster-Distributed-Data源碼就分析到這裏了,讀者可能會問,為啥分析的這麽淺顯。有兩個方面的考慮,首先是這個特性應用範圍比較有限,而且基於內存,保存的數據不會太大,另外讀者覺得使用sharding能更好的解決數據共享的問題,而且還沒有鎖。如果讀者有興趣,可以自行研讀這一部分的源碼。

Distributed Data

Akka源碼分析-Cluster-DistributedData