1. 程式人生 > >SDP(0):Streaming-Data-Processor - Data Processing with Akka-Stream

SDP(0):Streaming-Data-Processor - Data Processing with Akka-Stream

數據庫管理 新的 集成 部分 ont lock 感覺 sharding 數據源

再有兩天就進入2018了,想想還是要準備一下明年的工作方向。回想當初開始學習函數式編程時的主要目的是想設計一套標準API給那些習慣了OOP方式開發商業應用軟件的程序員們,使他們能用一種接近傳統數據庫軟件編程的方式來實現多線程,並行運算,分布式的數據處理應用程序,前提是這種編程方式不需要對函數式編程語言、多線程軟件編程以及集群環境下的分布式軟件編程方式有很高的經驗要求。前面試著發布了一個基於scalaz-stream-fs2的數據處理工具開源項目。該項目基本實現了多線程的數據庫數據並行處理,能充分利用域內服務器的多核CPU環境以streaming,non-blocking方式提高數據處理效率。最近剛完成了對整個akka套裝(suite)的了解,感覺akka是一套理想的分布式編程工具:一是actor模式提供了多種多線程編程方式,再就是akka-cluster能輕松地實現集群式的分布式編程,而集群環境變化只需要調整配置文件,無需改變代碼。akka-stream是一套功能更加完整和強大的streaming工具庫,那麽如果以akka-stream為基礎,設計一套能在集群環境裏進行分布式多線程並行數據處理的開源編程工具應該可以是2018的首要任務。同樣,用戶還是能夠按照他們熟悉的數據庫應用編程方式輕松實現分布式多線程並行數據處理程序的開發。

我把一般中小企業的IT系統分成兩大部分:一是實時的數據采集(輸入)部分,二是批量數據抽取、分析、處理部分。為了讓傳統中小型企業IT軟件編程人員能開發服務器集群環境上數據平臺(如雲端數據平臺)運行的軟件系統,我打算通過這個DSP(Streaming-Data-Processor)項目來實現上面提到的第二部分。第一部分可以用CQRS(Command-Query-Responsibility-Separation)即讀寫分離架構和事件記錄(event-sourcing)模式來實現一種高效快速響應、安全穩定運行的數據采集體系。這部分我會在完成SDP項目後以akka-persistence為核心,通過akka-http,AMQP如RabitMQ等技術來實現。

按一般的scala和akka的編程方式編寫多線程分布式數據庫管理軟件時一是要按照akka代碼模式,使用scala編程語言的一些較深的語法;二是需要涉及異步Async調用,集群Cluster節點任務部署及Streaming對外集成actor運算模式的細節,用戶需要具備一定的scala,akka使用經驗。再接下來就需要按業務流程把各業務環節分解成不依賴順序的功能模塊,然後把這些分拆出來的功能分派給集群中不同的節點上去運算處理。而對於SDP用戶來說,具備最基本的scala知識,無需了解akka、actor、threads、cluster,只要按照SDP自定義的業務處理流模式就可以編制多線程分布式數據處理程序了。下面我就用一些文字及偽代碼來描述一下SDP的結構和功能:

總體來說SDP是由一或多個Stream組成的;每個Stream就代表一段程序。一段完整的程序Stream是由流元素源Source、處理節點Process-Node(Flow)及數據輸出終點Sink三個環節組成,下面是一個典型的程序框架:

  def load(qry: Query): PRG[R,M] = ???
  def process1: PRG[R,M] = ???
  def process2: PRG[R,M] = ???
  def recursiveProcess(prg: PRG[R,M]): PRG[R,M] = ???
  def results: PRG = ???
  
  load(qryOrders).process1.process2.recursiveProcess(subprogram).results.run

從上面的示範中我們可以看到所有定義的函數都產生PRG[R,M]類型結果。其中R類型就是stream的元素,它流動貫穿了程序的所有環節。就像下水道網絡運作原理一樣:汙水由源頭Source流入終點Sink,在途中可能經過多個汙水處理節點Node。每一個節點代表對管道中流淌汙水處理的方式,包括分叉引流、並叉合流、添加化學物質、最後通過終點把處理過的水向外輸出。在PRG中流動的R類型可能是數據如數據庫表的一行,又或者是一條Sring類型的query如plain-sql,可以用JDBC來運行。cassandra的CQL也是String類型的。Slick,Quill,ScalikeJDBC和一些其它ORM的Query都可以產生plain-sql。

Source是一段程序的開始部分。一般來說Source是通過運算Query產生一串數據行或者人工構建而成。Source也可以並行運算Query產生,然後合並成一條無序的數據源,如下偽代碼的類型:

  def load_par(qrys: Query*): PRG[R,M] = ???

Process-Node是SDP最重要的一個組成部分,因為大部分用戶定義的各種業務功能是在這裏運算的。用戶可以選擇對業務功能進行拆分然後分派給不同的線程或不同的集群節點進行多線程並行或分布式的運算。SDP應該為用戶程序提供多線程,並行式、分布式的運算函數。首先,運算用戶程序後應產生R類型結果而且,作為一種reactive軟件,必須保證完全消耗上一階段產生的所有R類型元素。下面是一個用戶函數的款式:

  type UserFunc = R => R 

除了fire-and-run類型的運算函數,SDP還應該提供針對多線程或分布式程序的map-reduce式運算函數。初步想法是:無論返回結果與否,分派任務都是由persistence-actor來執行的,這樣能保證不會漏掉任何任務。如果整體任務需要在所有分派任務返回運算結果後再統一進行深度運算時akka的actor消息驅動模式是最適合不過的了。具體情況可以參考我前面關於cluster-sharding的博文。

Sink的主要作用實際上是保證完全消耗程序中產生的所有元素,這是reactive類型程序的必須要求。

好了,不知不覺還有幾個鐘頭就進入2017倒計時了。趕快湊合著在跨入2018之前把這篇發布出去,剛好是今年的最後一篇博文。祝各位在新的一年中工作生活稱心如意!

SDP(0):Streaming-Data-Processor - Data Processing with Akka-Stream