akka-typed(7) - cluster:sharding, 叢集分片
在使用akka-typed的過程中發現有很多地方都簡化了不少,變得更方便了,包括:Supervision,只要用Behaviors.supervise()把Behavior包住,很容易就可以實現這個actor的SupervisorStrategy.restartWithBackoff策略了。然後叢集化的group router使用起來也很方便,再就是叢集分片cluster-sharding了。下面我們就通過一個例子來介紹cluster-sharding的具體使用方法。
首先,分片的意思是指在叢集中多個節點上部署某種actor,即entity,的構建機制。entity的構建是動態的,ClusterSharding系統根據各節點的負載情況決定到底在哪個節點構建entity,然後返回ShardRegion:一個該類entity具體的構建工具及訊息中介。也就是說我們可以把同樣的一種運算通過entityId指定給任何一個entity,但具體這個entity生存在叢集哪個節點上人工是無法確定的,完全靠ClusterSharding引導。先設計一個簡單功能的actor,測試它作為一個entity的工作細節:
object Counter { sealed trait Command extends CborSerializable case object Increment extends Command final case class GetValue(replyTo: ActorRef[Response]) extends Command case object StopCounter extends Command private case object Idle extends Command sealed trait Response extends CborSerializable case class SubTtl(entityId: String, ttl: Int) extends Response val TypeKey = EntityTypeKey[Command]("Counter") def apply(nodeAddress: String, entityContext: EntityContext[Command]): Behavior[Command] = { Behaviors.setup { ctx => def updated(value: Int): Behavior[Command] = { Behaviors.receiveMessage[Command] { case Increment => ctx.log.info("******************{} counting at {},{}",ctx.self.path,nodeAddress,entityContext.entityId) updated(value + 1) case GetValue(replyTo) => ctx.log.info("******************{} get value at {},{}",ctx.self.path,nodeAddress,entityContext.entityId) replyTo ! SubTtl(entityContext.entityId,value) Behaviors.same case Idle => entityContext.shard ! ClusterSharding.Passivate(ctx.self) Behaviors.same case StopCounter => Behaviors.stopped(() => ctx.log.info("************{} stopping ... passivated for idling.", entityContext.entityId)) } } ctx.setReceiveTimeout(30.seconds, Idle) updated(0) } } }
cluster-sharding的機制是這樣的:在每個(或指定的)節點上構建部署一個某種EntityType的ShardRegion。這樣系統可以在任何部署了ShardRegion的節點上構建這種entity。然後ClusterSharding系統會根據entityId來引導訊息至正確的接收物件。我們再看看ShardRegion的部署是如何實現的吧:
object EntityManager { sealed trait Command case class AddOne(counterId: String) extends Command case class GetSum(counterId: String ) extends Command case class WrappedTotal(res: Counter.Response) extends Command def apply(): Behavior[Command] = Behaviors.setup { ctx => val cluster = Cluster(ctx.system) val sharding = ClusterSharding(ctx.system) val entityType = Entity(Counter.TypeKey) { entityContext => Counter(cluster.selfMember.address.toString,entityContext) }.withStopMessage(Counter.StopCounter) sharding.init(entityType) val counterRef: ActorRef[Counter.Response] = ctx.messageAdapter(ref => WrappedTotal(ref)) Behaviors.receiveMessage[Command] { case AddOne(cid) => val entityRef: EntityRef[Counter.Command] = sharding.entityRefFor(Counter.TypeKey, cid) entityRef ! Counter.Increment Behaviors.same case GetSum(cid) => val entityRef: EntityRef[Counter.Command] = sharding.entityRefFor(Counter.TypeKey, cid) entityRef ! Counter.GetValue(counterRef) Behaviors.same case WrappedTotal(ttl) => ttl match { case Counter.SubTtl(eid,subttl) => ctx.log.info("***********************{} total: {} ",eid,subttl) } Behaviors.same } } }
太簡單了, sharding.ini(entityType)一個函式完成了一個節點分片部署。系統通過sharding.init(entityType)來實現ShardRegion構建。這個entityType代表某種特殊actor模版,看看它的構建函式:
object Entity { /** * Defines how the entity should be created. Used in [[ClusterSharding#init]]. More optional * settings can be defined using the `with` methods of the returned [[Entity]]. * * @param typeKey A key that uniquely identifies the type of entity in this cluster * @param createBehavior Create the behavior for an entity given a [[EntityContext]] (includes entityId) * @tparam M The type of message the entity accepts */ def apply[M](typeKey: EntityTypeKey[M])( createBehavior: EntityContext[M] => Behavior[M]): Entity[M, ShardingEnvelope[M]] = new Entity(createBehavior, typeKey, None, Props.empty, None, None, None, None, None) }
這個函式需要一個EntityTyeKey和一個構建Behavior的函式createBehavior,產生一個Entity型別。Entity型別定義如下:
final class Entity[M, E] private[akka] ( val createBehavior: EntityContext[M] => Behavior[M], val typeKey: EntityTypeKey[M], val stopMessage: Option[M], val entityProps: Props, val settings: Option[ClusterShardingSettings], val messageExtractor: Option[ShardingMessageExtractor[E, M]], val allocationStrategy: Option[ShardAllocationStrategy], val role: Option[String], val dataCenter: Option[DataCenter]) { /** * [[akka.actor.typed.Props]] of the entity actors, such as dispatcher settings. */ def withEntityProps(newEntityProps: Props): Entity[M, E] = copy(entityProps = newEntityProps) /** * Additional settings, typically loaded from configuration. */ def withSettings(newSettings: ClusterShardingSettings): Entity[M, E] = copy(settings = Option(newSettings)) /** * Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated. * If this is not defined it will be stopped automatically. * It can be useful to define a custom stop message if the entity needs to perform * some asynchronous cleanup or interactions before stopping. */ def withStopMessage(newStopMessage: M): Entity[M, E] = copy(stopMessage = Option(newStopMessage)) /** * * If a `messageExtractor` is not specified the messages are sent to the entities by wrapping * them in [[ShardingEnvelope]] with the entityId of the recipient actor. That envelope * is used by the [[HashCodeMessageExtractor]] for extracting entityId and shardId. The number of * shards is then defined by `numberOfShards` in `ClusterShardingSettings`, which by default * is configured with `akka.cluster.sharding.number-of-shards`. */ def withMessageExtractor[Envelope](newExtractor: ShardingMessageExtractor[Envelope, M]): Entity[M, Envelope] = new Entity( createBehavior, typeKey, stopMessage, entityProps, settings, Option(newExtractor), allocationStrategy, role, dataCenter) /** * Allocation strategy which decides on which nodes to allocate new shards, * [[ClusterSharding#defaultShardAllocationStrategy]] is used if this is not specified. */ def withAllocationStrategy(newAllocationStrategy: ShardAllocationStrategy): Entity[M, E] = copy(allocationStrategy = Option(newAllocationStrategy)) /** * Run the Entity actors on nodes with the given role. */ def withRole(newRole: String): Entity[M, E] = copy(role = Some(newRole)) /** * The data center of the cluster nodes where the cluster sharding is running. * If the dataCenter is not specified then the same data center as current node. If the given * dataCenter does not match the data center of the current node the `ShardRegion` will be started * in proxy mode. */ def withDataCenter(newDataCenter: DataCenter): Entity[M, E] = copy(dataCenter = Some(newDataCenter)) private def copy( createBehavior: EntityContext[M] => Behavior[M] = createBehavior, typeKey: EntityTypeKey[M] = typeKey, stopMessage: Option[M] = stopMessage, entityProps: Props = entityProps, settings: Option[ClusterShardingSettings] = settings, allocationStrategy: Option[ShardAllocationStrategy] = allocationStrategy, role: Option[String] = role, dataCenter: Option[DataCenter] = dataCenter): Entity[M, E] = { new Entity( createBehavior, typeKey, stopMessage, entityProps, settings, messageExtractor, allocationStrategy, role, dataCenter) } }
這裡面有許多方法用來控制Entity的構建和作業。
然後我們把這個EntityManager當作RootBehavior部署到多個節點上去:
object ClusterShardingApp { def main(args: Array[String]): Unit = { if (args.isEmpty) { startup("shard", 25251) startup("shard", 25252) startup("shard", 25253) startup("front", 25254) } else { require(args.size == 2, "Usage: role port") startup(args(0), args(1).toInt) } } def startup(role: String, port: Int): Unit = { // Override the configuration of the port when specified as program argument val config = ConfigFactory .parseString(s""" akka.remote.artery.canonical.port=$port akka.cluster.roles = [$role] """) .withFallback(ConfigFactory.load("cluster")) val entityManager = ActorSystem[EntityManager.Command](EntityManager(), "ClusterSystem", config) ... }
一共設定了3個role=shard節點和1個front節點。
在front節點上對entityId分別為9013,9014,9015,9016幾個entity傳送訊息:
def startup(role: String, port: Int): Unit = { // Override the configuration of the port when specified as program argument val config = ConfigFactory .parseString(s""" akka.remote.artery.canonical.port=$port akka.cluster.roles = [$role] """) .withFallback(ConfigFactory.load("cluster")) val entityManager = ActorSystem[EntityManager.Command](EntityManager(), "ClusterSystem", config) if (role == "front") { entityManager ! EntityManager.AddOne("9013") entityManager ! EntityManager.AddOne("9014") entityManager ! EntityManager.AddOne("9013") entityManager ! EntityManager.AddOne("9015") entityManager ! EntityManager.AddOne("9013") entityManager ! EntityManager.AddOne("9014") entityManager ! EntityManager.AddOne("9014") entityManager ! EntityManager.AddOne("9013") entityManager ! EntityManager.AddOne("9015") entityManager ! EntityManager.AddOne("9015") entityManager ! EntityManager.AddOne("9016") entityManager ! EntityManager.GetSum("9014") entityManager ! EntityManager.GetSum("9015") entityManager ! EntityManager.GetSum("9013") entityManager ! EntityManager.GetSum("9016") }
以下是部分運算結果顯示:
15:12:10.073 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/786/9014 counting at akka://[email protected]:25253,9014 15:12:10.106 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/786/9014 counting at akka://[email protected]:25253,9014 15:12:10.106 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/786/9014 counting at akka://[email protected]:25253,9014 15:12:10.106 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/785/9013 counting at akka://[email protected]:25251,9013 15:12:10.107 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/785/9013 counting at akka://[email protected]:25251,9013 15:12:10.107 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/785/9013 counting at akka://[email protected]:25251,9013 15:12:10.107 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/785/9013 counting at akka://[email protected]:25251,9013 15:12:10.109 [ClusterSystem-akka.actor.default-dispatcher-19] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/787/9015 counting at akka://[email protected]:25254,9015 15:12:10.110 [ClusterSystem-akka.actor.default-dispatcher-19] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/787/9015 counting at akka://[email protected]:25254,9015 15:12:10.110 [ClusterSystem-akka.actor.default-dispatcher-19] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/787/9015 counting at akka://[email protected]:25254,9015 15:12:10.110 [ClusterSystem-akka.actor.default-dispatcher-19] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/787/9015 get value at akka://[email protected]:25254,9015 15:12:10.112 [ClusterSystem-akka.actor.default-dispatcher-18] INFO com.learn.akka.EntityManager$ - ***********************9015 total: 3 15:12:10.149 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/786/9014 get value at akka://[email protected]:25253,9014 15:12:10.149 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/785/9013 get value at akka://[email protected]:25251,9013 15:12:10.169 [ClusterSystem-akka.actor.default-dispatcher-18] INFO com.learn.akka.EntityManager$ - ***********************9014 total: 3 15:12:10.169 [ClusterSystem-akka.actor.default-dispatcher-18] INFO com.learn.akka.EntityManager$ - ***********************9013 total: 4 15:12:10.171 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/788/9016 counting at akka://[email protected]:25251,9016 15:12:10.171 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/788/9016 get value at akka://[email protected]:25251,9016 15:12:10.172 [ClusterSystem-akka.actor.default-dispatcher-18] INFO com.learn.akka.EntityManager$ - ***********************9016 total: 1 15:19:32.176 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ************9013 stopping ... passivated for idling. 15:19:52.529 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ************9014 stopping ... passivated for idling. 15:19:52.658 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ************9016 stopping ... passivated for idling. 15:19:52.662 [ClusterSystem-akka.actor.default-dispatcher-14] INFO com.learn.akka.Counter$ - ************9015 stopping ... passivated for idling.
下面是本次示範的完整原始碼:
ClusterSharding.scala
package com.learn.akka import scala.concurrent.duration._ import akka.actor.typed._ import akka.actor.typed.scaladsl._ import akka.cluster.sharding.typed.scaladsl.EntityContext import akka.cluster.sharding.typed.scaladsl.Entity import akka.persistence.typed.PersistenceId //#sharding-extension import akka.cluster.sharding.typed.ShardingEnvelope import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.cluster.sharding.typed.scaladsl.EntityRef import com.typesafe.config.ConfigFactory import akka.cluster.typed.Cluster //#counter object Counter { sealed trait Command extends CborSerializable case object Increment extends Command final case class GetValue(replyTo: ActorRef[Response]) extends Command case object StopCounter extends Command private case object Idle extends Command sealed trait Response extends CborSerializable case class SubTtl(entityId: String, ttl: Int) extends Response val TypeKey = EntityTypeKey[Command]("Counter") def apply(nodeAddress: String, entityContext: EntityContext[Command]): Behavior[Command] = { Behaviors.setup { ctx => def updated(value: Int): Behavior[Command] = { Behaviors.receiveMessage[Command] { case Increment => ctx.log.info("******************{} counting at {},{}",ctx.self.path,nodeAddress,entityContext.entityId) updated(value + 1) case GetValue(replyTo) => ctx.log.info("******************{} get value at {},{}",ctx.self.path,nodeAddress,entityContext.entityId) replyTo ! SubTtl(entityContext.entityId,value) Behaviors.same case Idle => entityContext.shard ! ClusterSharding.Passivate(ctx.self) Behaviors.same case StopCounter => Behaviors.stopped(() => ctx.log.info("************{} stopping ... passivated for idling.", entityContext.entityId)) } } ctx.setReceiveTimeout(30.seconds, Idle) updated(0) } } } object EntityManager { sealed trait Command case class AddOne(counterId: String) extends Command case class GetSum(counterId: String ) extends Command case class WrappedTotal(res: Counter.Response) extends Command def apply(): Behavior[Command] = Behaviors.setup { ctx => val cluster = Cluster(ctx.system) val sharding = ClusterSharding(ctx.system) val entityType = Entity(Counter.TypeKey) { entityContext => Counter(cluster.selfMember.address.toString,entityContext) }.withStopMessage(Counter.StopCounter) sharding.init(entityType) val counterRef: ActorRef[Counter.Response] = ctx.messageAdapter(ref => WrappedTotal(ref)) Behaviors.receiveMessage[Command] { case AddOne(cid) => val entityRef: EntityRef[Counter.Command] = sharding.entityRefFor(Counter.TypeKey, cid) entityRef ! Counter.Increment Behaviors.same case GetSum(cid) => val entityRef: EntityRef[Counter.Command] = sharding.entityRefFor(Counter.TypeKey, cid) entityRef ! Counter.GetValue(counterRef) Behaviors.same case WrappedTotal(ttl) => ttl match { case Counter.SubTtl(eid,subttl) => ctx.log.info("***********************{} total: {} ",eid,subttl) } Behaviors.same } } } object ClusterShardingApp { def main(args: Array[String]): Unit = { if (args.isEmpty) { startup("shard", 25251) startup("shard", 25252) startup("shard", 25253) startup("front", 25254) } else { require(args.size == 2, "Usage: role port") startup(args(0), args(1).toInt) } } def startup(role: String, port: Int): Unit = { // Override the configuration of the port when specified as program argument val config = ConfigFactory .parseString(s""" akka.remote.artery.canonical.port=$port akka.cluster.roles = [$role] """) .withFallback(ConfigFactory.load("cluster")) val entityManager = ActorSystem[EntityManager.Command](EntityManager(), "ClusterSystem", config) if (role == "front") { entityManager ! EntityManager.AddOne("9013") entityManager ! EntityManager.AddOne("9014") entityManager ! EntityManager.AddOne("9013") entityManager ! EntityManager.AddOne("9015") entityManager ! EntityManager.AddOne("9013") entityManager ! EntityManager.AddOne("9014") entityManager ! EntityManager.AddOne("9014") entityManager ! EntityManager.AddOne("9013") entityManager ! EntityManager.AddOne("9015") entityManager ! EntityManager.AddOne("9015") entityManager ! EntityManager.AddOne("9016") entityManager ! EntityManager.GetSum("9014") entityManager ! EntityManager.GetSum("9015") entityManager ! EntityManager.GetSum("9013") entityManager ! EntityManager.GetSum("9016") } } }
cluster.conf
akka { actor { provider = cluster serialization-bindings { "com.learn.akka.CborSerializable" = jackson-cbor } } remote { artery { canonical.hostname = "127.0.0.1" canonical.port = 0 } } cluster { seed-nodes = [ "akka://[email protected]:25251", "akka://[email protected]:25252"] } }
&n