1. 程式人生 > >Akka Cluster之叢集分片

Akka Cluster之叢集分片

一、介紹 

當您需要在叢集中的多個節點之間分配Actor,並希望能夠使用其邏輯識別符號與它們進行互動時,叢集分片是非常有用的。你無需關心Actor在叢集中的物理位置,因為這可能也會隨著時間的推移而發生變化。

例如,它可以是代表域驅動設計術語中聚合根的參與者。在這裡,我們稱這些Actore為“實體”。這些Actor通常具有持久狀態,但此功能不僅限於具有持久化狀態的Actor。

當你有一個很消耗資源的Actor,例如佔記憶體或者CPU,把它放在一臺機器上可能吃不消,這時候叢集分片就能夠提供很好的幫助,將這些Actor分散在叢集中的多個結點上。

二、依賴

<dependency>
  <groupId>com.typesafe.akka</groupId> <artifactId>akka-cluster-sharding_2.12</artifactId> <version>2.5.18</version> </dependency>

三、例子

假設我們有如下一個需要使用叢集分片模式的實體Actor:

case object Increment case object Decrement final case class Get(counterId: Long) final case class EntityEnvelope(id: Long, payload: Any) case object Stop final case class CounterChanged(delta: Int) class Counter extends PersistentActor { import ShardRegion.Passivate context.setReceiveTimeout(120.seconds) // self.path.name is the entity identifier (utf-8 URL-encoded) override def persistenceId: String = "Counter-" + self.path.name var count = 0 def updateState(event: CounterChanged): Unit = count += event.delta override def receiveRecover: Receive = { case evt: CounterChanged  updateState(evt) } override def receiveCommand: Receive = { case Increment  persist(CounterChanged(+1))(updateState) case Decrement  persist(CounterChanged(-1))(updateState) case Get(_)  sender() ! count case ReceiveTimeout  context.parent ! Passivate(stopMessage = Stop) case Stop  context.stop(self) } }

上面這個Actor使用PersistActor中的事件來源模式儲存其內部狀態,當然其不一定需要是持久化的Actor。使用持久化的好處是,如果節點之間的實體發生故障或遷移,它能夠恢復其狀態。

我們要使用叢集的分片模式,通常就需要在群集中每個節點上的系統啟動時,使用ClusterSharding.start方法註冊支援的實體型別,這樣我們就能在所有將承載分片的節點上執行這個方法來部署分片。其中,ClusterSharding.start如下:

val counterRegion: ActorRef = ClusterSharding(system).start( typeName = "Counter", entityProps = Props[Counter], settings = ClusterShardingSettings(system), extractEntityId = extractEntityId, extractShardId = extractShardId)

其中,start方法返回了ShardRegion,是個ActorRef型別。ShardRegion是一個特殊的Actor,負責管理可能多個分片(shard)內稱為Entity的Actor例項。這些分片可能是分佈在不同的叢集節點上的,外界通過ShardRegion與其轄下Entities溝通。從start函式引數entityProps我們看到:每個分片中只容許一個種類的Actor;具體的Entity例項是由另一個內部Actor即shard構建的,shard可以在一個分片中構建多個Entity例項。多shard多entity的特性可以從extractShardId,extractEntityId這兩個方法中得到一些資訊。我們說過Actor自編碼即entity-id是Cluster-Sharding的核心元素。在entity-id這個自編碼中還包含了shard-id,所以使用者可以通過entity-id的編碼規則來設計整個分片系統包括每個ShardRegion下shard和entity的數量。當ShardRegion得到一個entity-id後,首先從中抽取shard-id,如果shard-id在叢集中不存在的話就按叢集各節點負載情況在其中一個節點上構建新的shard;然後再用entity-id在shard-id分片中查詢entity,如果不存在就構建一個新的entity例項。整個shard和entity的構建過程都是通過使用者提供的函式extractShardId和extractEntityId實現的,Cluster-Sharding就是通過這兩個函式按使用者的要求來構建和使用shard和entity的。下面我們看下這種自編碼的例子:

val extractEntityId: ShardRegion.ExtractEntityId = { case EntityEnvelope(id, payload)  (id.toString, payload) case msg @ Get(id)  (id.toString, msg) } val numberOfShards = 100 val extractShardId: ShardRegion.ExtractShardId = { case EntityEnvelope(id, _)  (id % numberOfShards).toString case Get(id)  (id % numberOfShards).toString case ShardRegion.StartEntity(id)  // StartEntity is used by remembering entities feature (id.toLong % numberOfShards).toString }

在大多數情況下工作正常的簡單分片演算法是獲取hashCode實體識別符號的模數為分數的絕對值。下面我們可以通過如下程式測試:

val counterRegion: ActorRef = ClusterSharding(system).shardRegion("Counter") counterRegion ! Get(123) expectMsg(0) counterRegion ! EntityEnvelope(123, Increment) counterRegion ! Get(123) expectMsg(1)