1. 程式人生 > >Akka-CQRS(6)- read-side

Akka-CQRS(6)- read-side

時間間隔 sca 數據庫 數據操作 實現 處理 from journal rap

前面我們全面介紹了在akka-cluster環境下實現的CQRS寫端write-side。簡單來說就是把發生事件描述作為對象嚴格按發生時間順序寫入數據庫。這些事件對象一般是按照二進制binary方式如blob存入數據庫的。cassandra-plugin的表結構如下:

CREATE KEYSPACE IF NOT EXISTS akka
WITH REPLICATION = { ‘class‘ : ‘SimpleStrategy‘,‘replication_factor‘:1 };

CREATE TABLE IF NOT EXISTS akka.messages (
  used boolean static,
  persistence_id text,
  partition_nr bigint,
  sequence_nr bigint,
  timestamp timeuuid,
  timebucket text,
  writer_uuid text,
  ser_id int,
  ser_manifest text,
  event_manifest text,
  event blob,
  meta_ser_id int,
  meta_ser_manifest text,
  meta blob,
  message blob,
  tags set<text>,
  PRIMARY KEY ((persistence_id, partition_nr), sequence_nr, timestamp, timebucket))
  WITH gc_grace_seconds =864000
  AND compaction = {
    ‘class‘ : ‘SizeTieredCompactionStrategy‘,
    ‘enabled‘ : true,
    ‘tombstone_compaction_interval‘ : 86400,
    ‘tombstone_threshold‘ : 0.2,
    ‘unchecked_tombstone_compaction‘ : false,
    ‘bucket_high‘ : 1.5,
    ‘bucket_low‘ : 0.5,
    ‘max_threshold‘ : 32,
    ‘min_threshold‘ : 4,
    ‘min_sstable_size‘ : 50
    };

事件對象是存放在event裏的,是個blob類型字段。下面是個典型的寫動作示範:

  val receiveCommand: Receive = {
    case Cmd(data) =>
      persist(ActionGo) { event =>
        updateState(event)
      }
  }

這些事件描述的存寫即寫這個ActionGo時不會影響到實際業務數據狀態。真正發生作用,改變當前業務數據狀態的是在讀端read-side。也就是說在另一個線程裏有個程序也按時間順序把這些二進制格式的對象讀出來、恢復成某種結構如ActionGo類型、然後按照結構內的操作指令對業務數據進行實際操作處理,這時才會產生對業務數據的影響。做個假設:如果這些事件不會依賴時間順序的話是不是可以偷偷懶直接用一種pub/sub模式把reader放在訂閱subscriber端,如下:

//寫端
  import DistributedPubSubMediator.Publish
  val mediator = DistributedPubSub(context.system).mediator

  val receiveCommand: Receive = {
    case Cmd(data) =>
      persist(DataUpdated) { event =>
        updateState(event)
        mediator ! Publish(persistentId, event,sendOneMessageToEachGroup = true)
      }
  }

//讀端
  val mediator = DistributedPubSub(context.system).mediator
  mediator ! Subscribe(persistentId, self)
  def receive = {
    case DataUpdated: Event ⇒
        updateDataTables()
  }

這種pub/sub模式的特點是消息收發雙方耦合度非常松散,但同時也存在訂閱方sub即reader十分難以控制的問題,而且可以肯定的是訂閱到達消息無法保證是按發出時間順序接收的,我們無法控制akka傳遞消息的過程。因為業務邏輯中一個動作的發生時間順序往往會對周圍業務數據產生不同的影響,所以現在只能考慮事件源event-sourcing這種模式了。es方式的CQRS是通過數據庫表作為讀寫間隔實現寫端程序和讀端程序的分離。寫端只管往數據庫寫數據操作指令,讀端從同一數據庫位置讀出指令進行實質的數據處理操作,所以讀寫過程中會產生一定的延遲,讀端需要不斷從數據庫抽取pull事件。而具體pull的時段間隔如何設定也是一個比較棘手的問題。無論如何,akka提供了Persistence-Query作為一種CQRS讀端工具。我們先從一個簡單的cassandra-persistence-query用例開始:

// obtain read journal by plugin id
val readJournal =
  PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

// issue query to journal
val source: Source[EventEnvelope, NotUsed] =
  readJournal.eventsByPersistenceId("user-1337", 0, Long.MaxValue)

// materialize stream, consuming events
implicit val mat = ActorMaterializer()
source.runForeach { pack =>
  updateDatabase(pack.event)
}

eventsByPersistenceId(...)構建了一個akka-stream的Source[EventEnvelope,_]。這個EventEnvelope類定義如下:

/**
 * Event wrapper adding meta data for the events in the result stream of
 * [[akka.persistence.query.scaladsl.EventsByTagQuery]] query, or similar queries.
 */
final case class EventEnvelope(
  offset:        Offset,
  persistenceId: String,
  sequenceNr:    Long,
  event:         Any)

上面這個event字段就是從數據庫讀取的事件對象。EventEnvelope是以流元素的形式從數據庫中提出。eventsByPersistenceId(...)啟動了一個數據流,然後akka-persistence-query會按refresh-interval時間間隔重復運算這個流stream。refresh-interval可以在配置文件中設置,如下面的cassandra-plugin配置:

cassandra-query-journal {
  # Implementation class of the Cassandra ReadJournalProvider
  class = "akka.persistence.cassandra.query.CassandraReadJournalProvider"

  # Absolute path to the write journal plugin configuration section
  write-plugin = "cassandra-journal"

  # New events are retrieved (polled) with this interval.
  refresh-interval = 3s

...
}

以上描述的是一種接近實時的讀取模式。一般來講,為了實現高效、安全的事件存寫,我們會盡量簡化事件結構,這樣就會高概率出現一個業務操作單位需要多個事件來描述,那麽如果在完成一項業務操作單元的所有事件存寫後才開始讀端的動作不就簡單多了嗎?而且還比較容易控制。雖然這樣會造成某種延遲,但如果以業務操作為衡量單位,這種延遲應該是很正常的,可以接受的。現在每當完成一項業務的所有事件存寫後在讀端一次性成批把事件讀出來然後進行實質的數據操作,應當可取。akka-persistence-query提供了下面這個函數:

  /**
   * Same type of query as `eventsByPersistenceId` but the event stream
   * is completed immediately when it reaches the end of the "result set". Events that are
   * stored after the query is completed are not included in the event stream.
   */
  override def currentEventsByPersistenceId(
      persistenceId: String,
      fromSequenceNr: Long,
      toSequenceNr: Long): Source[EventEnvelope, NotUsed] = ...

我們可以run這個stream把數據讀入一個集合裏,然後可以在任何一個線程裏用這個集合演算業務邏輯(如我們前面提到的寫端狀態轉變維護過程),可以用router/routee模式來實現一種在集群節點中負載均衡式的分配reader-actor作業節點。

下一篇準備對應前面的CQRS Writer Actor 示範裏的akka-cluster-pos進行rCQRS-Reader-Actor示範。

Akka-CQRS(6)- read-side