1. 程式人生 > 其它 >利用Actor實現管道過濾器模式

利用Actor實現管道過濾器模式

《基於Actor的響應式程式設計》計劃分為三部分,第一部分剖析響應式程式設計的本質思想,為大家介紹何謂響應式程式設計(Reactive Programming)。第二部分則結合兩個案例來講解如何在AKKA中實現響應式程式設計。第三部分則是這個主題的擴充套件,在介紹Reactive Manifesto的同時,介紹進行響應式程式設計更為主流的ReactiveX框架。本文是第二部分的第一個案例。

剖析響應式程式設計的本質》從Actor模型與響應式程式設計中找到彼此相配的特徵;然而空口無憑,沒有一點真憑實據,憑什麼他們能立下海誓山盟、比翼雙飛呢?

其實,Vaughn Vernon早就作了稱職的月老,還為他們寫了一本鴛夢奇緣,總結了如何利用Actor模型實現響應式程式設計的訊息模式《Reactive Messaging Pattterns with the Actor Model》。如果閱讀過《企業整合模式》(Enterprise Integration Patterns)一書,你會發現Vaughn的新書近乎於是《企業整合模式》中各種訊息模式在AKKA中的Actor實現。

順便吐槽一句,本書中文版的譯名《響應式架構——訊息模式Actor實現與Scala、AKKA應用整合》頗有標題黨之嫌。整本書其實只是在相對低的層面講解Actor對訊息模式的實現,幾乎沒有牽涉到任何架構方面的知識。

例如,響應式程式設計通常會與CQRS以及Event Sourcing結合,但本書幾乎沒有涉獵。其實,Vaughn Vernon還是挺老實的,英文書名交代得也很清楚,翻譯成中文,卻莫名其妙地給書名添油加醋,有意誤導消費者,實在不該。

當然,書還是好書,仍有閱讀價值。

其實,我們說到Actor模型與響應式程式設計的相配,更大程度是因為Actor已經為響應式程式設計的程式設計要素提供了現成的基礎設施。例如在AKKA之下進行響應式程式設計,我們幾乎不用再考慮如何進行非同步訊息通訊、狀態切換、併發處理、並行處理,以及對Actor的監督和錯誤處理策略的實現。這在很大程度上使得我們可以從紛繁複雜的基礎設施實現中解脫出來,而僅需要專注於考慮資料流轉與業務流程之間的關係。

管道過濾器模式

談到資料流(或者訊息流),我們會想到一個經典的架構模式:管道過濾器模式。資料在管道中流動,每經過一個過濾器都會被對應的過濾器按照自己的處理邏輯進行處理,處理後的資料又被接著傳遞給下一個過濾器。

引入管道過濾器的一個好處是它可以使得每個過濾器之間都是解耦的,這使得我們可以很好地擴充套件過濾器,改變資料處理的流程,而不需要調整Provider端的程式碼。

在AKKA中,Actor之間可以通過ActorRef引用物件建立關聯,這種抽象層面的弱依賴使得Actor彼此之間能夠很好地解耦。不過,Actor之間還存在一條隱形依賴關係,它是由Actor所能處理的訊息物件悄悄引入的。這些訊息物件對於Actor,就好似Actor的介面,它表明了該Actor只能處理什麼樣的訊息型別。一旦訊息的結構發生改變,又或者希望Actor支援更多的訊息,就需要修改Actor的定義與實現。

為了避免隱形依賴,我們可以將管道傳遞的資料定義為一個通用的訊息型別,所有註冊管道的過濾器處理的都是相同的流。在Provider端,我們實現的單個過濾器Actor,與其他過濾器之間是沒有任何依賴關係的,我們也無需考慮資料處理的順序,僅需要考慮自己的訊息處理邏輯。

從這個角度看,一個Actor的設計與實現,應該儘可能遵循“單一職責原則”與“資訊專家模式”。Udi Dahan在CQRS架構中曾經提出“自治元件”的概念,那麼在Actor模型中,我們也應該儘可能做到讓每個Actor物件自治。

在第一部分《剖析響應式程式設計的本質》中,我曾經提到:

我們幾乎可以將所有業務處理流程都可以建模為資料流的形式。

下面我們就來看看一個訂單處理流程的案例。這個案例來自前述Vaughn Vernon的著作《Reactive Messaging Pattterns with the Actor Model》:

一條訂單訊息進入系統,在為了完成購物操作處理完該條訊息前,必須做一些預備工作。首先必須對這條訂單訊息進行解密,然後需要驗證傳送這條訊息外部實體的資格,最後應確保這條訂單訊息不是之前收到訊息的複製品。

我們可以將這些業務流程視為不同的職責,分解為:

  • 對訂單的部分資料進行解密(decryption)
  • 對訂單進行認證
  • 對訂單進行去重處理
  • 處理訂單

遵循單一職責原則,我們將這些職責分別交給對應的獨立Actor來承擔。例如認證訂單:

class Authenticator(nextFilter: ActorRef) extends Actor with ActorLogging {
 def receive: Receive = {    
 case message: ProcessIncomingOrder =>      val text = new String(message.orderInfo)  
     log.info(s"Authenticator: processing $text")   
        val orderText = text.replace("(certificate)", "")      
        nextFilter !  ProcessIncomingOrder(orderText.toCharArray.map(_.toByte))  }}

每個Actor會接收一個nextFilter的ActorRef物件,但它們是完全解耦的。Actor只專注於自己的職責,一旦處理完訂單訊息,就可以將處理後的訊息傳遞給下一個Actor。這種“分而治之”的思想可以將複雜的事情變得更簡單,開發者每次只需要考慮一個相對簡單的職責,知識變少,利於理解。

過濾器之間的組合完全交給客戶端,如下程式碼所示:

val orderManager = system.actorOf(Props[OrderManagementSystem], "OrderManagementSystem")val deduplicator = system.actorOf(Props(new Deduplicator(orderManager)), "Deduplicator")
val authenticator = system.actorOf(Props(new Authenticator(deduplicator)), "Authenticator")val decrypter = system.actorOf(Props(new Decrypter(authenticator)), "Decrypter")
val acceptance = system.actorOf(Props(new OrderAcceptanceEndpoint(decrypter)), "OrderAcceptanceEndpoint")acceptance ! rawOrderBytesacceptance ! rawOrderBytes

是否覺得似曾相似?倘若我們熟悉設計模式,會發現這一模式與“職責鏈模式”有著如孿生兄弟般的相似類結構。然而,二者的行為仍有些微差別,在經典的職責鏈模式中,一旦職責物件滿足匹配條件時,會在履行該職責後中斷處理並返回,而管道過濾器則會從起點一直“流動”到終點,若無意外,中途不會中斷。

使用Actor實現管道過濾器模式,則又有所不同,業務的處理流程是在訊息的跳轉之間完成的,且每個訊息的處理都是非同步非阻塞的。