閱讀程式碼:Spark 與 Flink 中的 RPC 實現
近日常有同學來問我如何閱讀程式碼,關於這個問題的一般性答案我特別提了一個問題並自問自答。出於提供一個實際的例子的考量,正好此前綜合地閱讀了 Spark 的 RPC 實現、Flink 基於 Akka 的 RPC 實現和 Actor Model 的通訊模型,寫成本文分享我閱讀分散式計算系統 Spark 和 Flink 中的 RPC 實現的過程和思考。
簡介 Actor Model 及 Akka 的問題
通常來說,閱讀程式碼的流程是首先了解自己要閱讀的程式碼解決了什麼問題,這個問題的現有解決方案是什麼,有什麼優勢和缺點。大致清楚了這些背景之後再在走讀程式碼的過程中思考閱讀的程式碼具體是怎麼解決這個問題的,最後專注到重點難點的程式碼塊的理解上。也就是說,程式碼閱讀最重要的不是程式碼。程式碼只是將思考的結果轉換為實際可用的軟體的手段,思考的結果或者說解決問題的方法才是重要的內容。
分散式計算系統的分散式特性決定了設計過程中必然會考慮節點間的通訊問題,即籠統的 RPC 需求。關於 RPC 和 RMI 及 Actor Model 具體的差別本文不做展開,主要集中在 Spark 和 Flink 的 RPC 實現來介紹 Actor Model 下的 RPC 實現。
Actor Model 的主要概念包括
- 通訊的主體 Actor
- 通訊的內容 Message
- 單執行緒先到先處理的訊息處理器 Mailbox
特別需要提及的是 Actor 之間的通訊是通過類似於地址的 ActorRef 來引用其他的 Actor 的,同時,在實現中,需要一個支援 Actor Model 執行起來的 ActorSystem 環境。這些具體的概念和名詞屬於 Akka,我們會在後面看到它們如何在 Spark 和 Flink 中被一一對應。
Actor Model 一個很少被注意的特點是它的建模過程中只存在 tell 這一個通訊原語,ask 等等只是構建在 tell 上層的方便的通訊模式。這就導致一個問題,即 Actor Model 原生的程式設計模式是明顯不同於傳統的程式設計模型的。傳統的程式設計模型中,函式呼叫是有返回值的,即使採用 Future 作為返回值的佔位符,本質上還是有一一對應的返回值的;而在 Actor Model 中,訊息一經發出就被遺忘,即所謂的 fire and forget 模式。要建立當前發出的訊息和稍後收到的訊息之間的 ask and answer 關係,需要額外的工作。這部分的內容可以參考 Akka 官方文件中介紹通訊模式的章節,本身可以作為 Akka 最佳實踐的一部分,有時間我會專門寫一篇文章介紹 Actor Model 下完全被顛覆的程式設計模型以及通過在其上模擬常見的程式設計模型來探索 Actor Model 的最佳實踐。
關於更多 Actor Model 的概念性和介紹性資料,可以參考的資料有Akka 的官方文件和《反應式設計模式》等等。
Akka 作為目前最成熟的 Actor Model 的實現之一,以及擁有容易理解的單執行緒 Actor 和併發通訊模型,廣泛地充當了 JVM 系的分散式系統的 RPC 層。Akka 最近的演化有兩個重點,一個是型別化(Typed)的 Akka,另一個是在拆分行為(Behavior)和狀態(State)的概念。前者我們後面看到 Spark 和 Flink 的 RPC 實現時就能看到選擇標準的不同,後者這裡不作展開,可能會在後續討論函數語言程式設計的文章中再次提及。
儘管 Akka 的實現非常成熟,但是直接使用 Akka 的底層 Actor Model 的軟體卻不多。對於業務軟體來說,Akka Model 過於底層,如果要利用它帶來的好處通常會直接使用 Akka Streams 和 Akka HTTP 等上層建築;對於其他分散式系統來說,主要有以下兩個問題。
第一個問題是兩層叢集的負擔。如果我們使用 Akka 作為底層 RPC 的實現,本身 Akka 會有相應的基礎元件,包括 ActorSystem 或者進一步使用 Akka Cluster 的話相應的 Cluster 物件。我們的分散式系統例如 Spark 和 Flink 本身有自己的叢集管理策略,在 Spark 中有 Driver 和 Worker 的概念,在 Flink 中有 JobManager 和 TaskManager 等概念。如果在處理本身系統的叢集管理的同時還要兼顧底層的 Akka 叢集,這樣兩層的叢集在實際開發和運維的過程當中會帶來額外的複雜性。尤其是 Akka 作為一個功能複雜的重量級框架,並且在 Typed Akka 中做出了限制公開的直接溝通兩個 Actor 的能力,強制要求使用 Akka Cluster 的決定。同時處理兩層叢集複雜的狀態機和角色與訊息的轉換將會是一個巨大的負擔。
第二個問題是版本的負擔,這也是 Spark 走向去 Akka 化的直接原因,也是 Flink 社群經常被提問的一個問題。我們知道,為了保證分散式系統的穩定性,它依賴的元件尤其是 RPC 實現這樣底層模組的依賴版本會保持相當的穩定性。這樣就有一個問題,Spark 和 Flink 的使用者在使用它們的同時也很有可能使用 Akka,並且依賴的是另一個 Akka 的版本。這樣,就會出現版本不同帶來的不相容性問題。通常來說,這一點可以通過釋出一個專案專有的第三方依賴並使用 shaded 技術重定位包名來解決問題。但是由於重定位為了覆蓋反射呼叫,是在位元組碼級別對全限定名和字串的包名字首做替換。一般來說,包名都是諸如org.apache.spark
或者org.apache.flink
的形式,具有唯一性,替換起來不會有什麼問題。Akka 就不一樣了,它的包名是akka.actor
等等,跟配置名稱是一樣的。這就導致重定位會錯誤改動程式碼中的配置名字串導致執行時字串失配出錯。版本問題在 Lightbend 全家桶裡是不存在的,例如 Play 通過介面暴露底層的 Akka 資料結構,並固定依賴到某一個版本,這樣使用 Play 的人需要 Akka 的功能是隻需要通過介面拿到對應的 Akka 資料結構就可以,但是這種方式並沒有考慮和其他系統的版本相容問題。
雖然上述問題可以通過定製 ClassLoader 並精心調整打包策略來繞過,或者要求使用者程式使用跟系統框架相容的 Akka 版本,但是這會導致複雜不友好的使用者體驗,而清楚簡單的使用者體驗很多時候比功能更能決定一個框架的生存空間。同時,Akka 提供的很多功能,例如 Actor Model 基石的監督(Supervise)功能,對於上層提供 Failover 機制的 Spark 和 Flink 來說是多餘的。前有使用者體驗的硬性需求,後有開發輕量化的敏捷需求,Ligetbend 系以外的成熟的分散式系統開發自己的 RPC 實現是理所當然的選擇。
理解了 Spark 和 Flink 為什麼要開發自己的 RPC 實現之後,我們再看到 RPC 實現具體的考量點和內容。
Spark 的 RPC 實現
Spark 開發自己的 RPC 實現以換下 Akka 的理由主要是上面提及的版本依賴問題,在社群中記錄為SPARK-5293。
閱讀相關程式碼,首先我們要定位程式碼的位置。Spark 的 RPC 實現主要位於core
模組下的org.apache.spark.rpc
這個包下,閱讀程式碼的過程中通過跳轉到定義和查詢使用點可以找到完整的脈絡。結果而言,除了實際的 RPC Endpoint 實現之外,主要相關的程式碼還包括common/network-common
路徑下網路傳輸層相關的底層支援。
Spark 的 RPC 實現雖然是為了替換 Akka 而誕生的,但是它實際上可以看成一個簡化版的 Akka,仍然遵循許多 Actor Model 的抽象。例如
- RpcEndpoint 對應 Actor
- RpcEndpointRef 對應 ActorRef
- RpcEnv 對應 ActorSystem
RpcEndpoint 與訊息處理模型
這其中從模型上來說最簡單的反而是 RpcEndpoint,因為所有的實現邏輯是具體實現類的事情,它其實只是一個簡單的存根(Stub)。總的來說,RpcEndpoint 有以下介面
private[spark] trait RpcEndpoint {
final def self: RpcEndpointRef = ???
final def stop(): Unit = ???
val rpcEnv: RpcEnv = ???
def receive: PartialFunction[Any, Unit] = ???
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = ???
def onError(cause: Throwable): Unit = ???
def onConnected(remoteAddress: RpcAddress): Unit = ???
def onDisconnected(remoteAddress: RpcAddress): Unit = ???
def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = ???
def onStart(): Unit = ???
def onStop(): Unit = ???
}
可以看到,上面的函式我分成了四組,其中第一組是和元操作有關的,望文生義不做解釋;第三組是連線和錯誤處理相關的回撥函式,主要是記錄日誌和處理邊界情況的,也不用多做介紹;第四組實現的比較多,雖然和第三組一樣是掛載在特定事件上的回撥函式,但是 RpcEndpoint 啟動和關閉時常常需要做狀態初始化和終結,以及資源的申請和釋放,所以onStart
和onStop
是經常被實現的介面。
這裡在展開說一下第二組介面。首先是receive
,這個介面跟 Akka 裡面 Actor 的receive
是類似的,我們看到它的簽名是PartialFunction[Any, Unit]
,也就是說實現起來類似於下面的程式碼塊。
override def receive: PartialFunction[Any, Unit] = {
case Message => ...
case BoxedMessage(msg1, msg2) => ...
}
可以看到和 Untyped Akka 別無二致,也就是說 Spark 的 RPC 實現也是非型別化的,程式設計模型上基於訊息和模式匹配來做的。後面我們會看到 Flink 對這一點做了不同的選擇,介紹完 Flink 的情況後我們會做一個總的探討。
另一個介面就比較有意思了,receiveAndReply
實現了接收資訊後返回的功能。由於沒有實現 Akka 中上下文sender()
的邏輯,Spark 使用了另一個介面來處理需要返回的呼叫。我們分兩點說明sender()
的問題和 Spark 基於 Actor Model 實現了傳統的具有返回值的呼叫的方式。
第一點,sender()
主要的問題是,它是一個方法呼叫,而不是一個確定性的值。這是函數語言程式設計的擁躉喜歡討論的話題,即在不同的時刻呼叫sender()
會返回不同的值。乍一看我們在每次處理一條訊息的時候都呼叫sender()
獲得當前訊息的傳送來源並沒有問題,不過這個方法在 Akka 社群給新手帶來了不少麻煩。
最大的問題還是上面提到的呼叫點的問題。通常來說,由於 Actor Model 中的 Actor 是單執行緒的處理訊息的,你在同一個訊息處理過程中多次呼叫sender()
返回的都是當前訊息的來源。不過,在一個常見的場景中,你在處理訊息的時候發起了另一個非同步動作,在非同步動作中呼叫sender()
來獲取當前訊息的來源。由於非同步動作觸發的時間是未知的,實際上當它觸發時再次呼叫sender()
的時候,可能返回的就是另一條訊息的來源了。這個問題很好解決,即用一個變數儲存當前的sender()
後面傳遞這個物件而不是再次呼叫sender()
獲取物件。顯然,Spark 的receiveAndReply
中的引數context
就是這個可用於發回訊息的上下文,與sender()
類似。而在 Typed Akka 中,由於sender()
無法確切的型別化,因此採用的是將訊息來源直接編碼在傳送的訊息中的方式以在需要的時候使用它回覆訊息,這要求 ActorRef 在不同的 ActorSystem 上正確的序列化和反序列化。
第二點,我們看到這裡的時候就會想,那我現在有兩個 receive 函式,雖然我可以根據需不需要傳送回覆訊息把訊息處理邏輯拆分到不同的函式裡,但是 Spark 又是怎麼知道應該把入站的請求分配到哪個函式的呢?這個就涉及到 Spark RPC 實現的細節。簡單的說我們先看到呼叫兩個 receive 函式的片段。
// Inbox.scala
class Inbox {
def process(dispatcher: Dispatcher): Unit = {
// ...
message match {
case RpcMessage(_sender, content, context) =>
try {
endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
} catch {
case e: Throwable =>
context.sendFailure(e)
throw e
}
case OneWayMessage(_sender, content) =>
endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
case OnStart =>
endpoint.onStart()
// ...
case OnStop =>
// ...
endpoint.onStop()
assert(isEmpty, "OnStop should be the last message")
case RemoteProcessConnected(remoteAddress) =>
endpoint.onConnected(remoteAddress)
case RemoteProcessDisconnected(remoteAddress) =>
endpoint.onDisconnected(remoteAddress)
case RemoteProcessConnectionError(cause, remoteAddress) =>
endpoint.onNetworkError(cause, remoteAddress)
}
// ...
}
}
簡單掃過回撥系列函式,我們看到 Spark RPC 判斷將訊息轉往何處主要是看訊息的型別是RpcMessage
還是OneWayMessage
。從名字中我們就可以看出,前者指的是呼叫並返回的訊息,後者是 fire and forget 的訊息。我們跳轉到定義並查詢初始化點,可以發現生成這兩種不同資訊的差異的根源發生在RpcEndpointRef
是呼叫ask
還是send
上,在唯一的 Netty 實現上一路會經過NettyRpcEnv
對應的ask
和send
方法,生成不同的訊息傳送到遠端。這也就是前面說的 Spark 原生的支援 ask 語義的意思。從熟悉的變成模型出發,可以把 ask 當成返回值不為 void 的函式或者 Pascal 中的 function,send 當成返回值為 void 的函式或者 Pascal 中的 procedure。
send 的語義是比較清楚的,關於 ask 的語義還有一個值得討論的點。我們知道 ask 會有一個返回值,這個返回值是真正有意義的返回值的佔位符 Future,而 Future 一般的處理方式在經過拼接和轉換之後終究是會有一個 onSuccess 或者 onFailure 的觸發動作,這個觸發在哪個執行緒上執行是很重要的。這涉及到我們在編寫 receive 函式的時候對非同步行為和同步策略的判斷。Spark 的實現類似於 Akka 中 AskPattern 引入 PromiseActorRef 的方式,生成一個 Promise 並在對應的返回收到時完成,這個 Promise 作為 ask 的返回值。相關的回撥邏輯發生在NettyRpcEnv#askAbortable
中,可以看到,本地訊息中 Promise 的完成發生在傳送訊息的同一個執行緒上,而遠端訊息中 Promise 的完成一路探查到 TransportClient 和 TransportChannelHandler 可以發現完成在 Netty 的 channelRead0 上,也就是說,Spark 的 ask 返回的 Future,其完成的時間點並不一定和 RpcEndpoint 的主執行緒同步。這可能會導致在不加同步策略下的一些問題,例如通過 ask 詢問一個遠端節點的狀態和遠端節點主動 send 過來的狀態同時觸發狀態處理邏輯而導致競態條件。補充說明,Spark 的 RpcEndpoint 本身也可能併發的處理訊息,僅當它是 ThreadSafeRpcEndpoint 或 IsolatedRpcEndpoint 時才表現出類似於 Actor Model 下單執行緒 Actor 的行為。上面提到的 ask 導致競態條件的問題在 Akka 中也存在,這倒不算 BUG,只是在使用的時候需要注意採用合適的同步策略。
RpcEnv 與訊息分派模型
接下來我們看到更接近 RPC 實現的核心的程式碼。RpcEnv 是正確的處理 RpcEndpoint 存在和執行及其支援的網路環境的上下文,目前 Spark 中只有基於Netty 的實現。
對於服務端來說,RpcEnv 是支援 RpcEndpoint 正常執行的環境,排程執行緒處理訊息並負責 RpcEndpoint 的生命週期管理;對於客戶端來說,可以使用 RpcAddress 等方式從 RpcEnv 中獲取可用的本地或遠端的 RpcEndpointRef,這是一個 RpcEndpoint 的位置透明的引用或者叫控制代碼,可以通過呼叫它的 send 或 ask 方法來向 RpcEndpoint 傳送資訊。
對於訊息的分派,我們從訊息的入站和出站來看。
首先看出站,即本 RpcEnv 向 RpcEndpoint 傳送訊息。注意這裡如果是本地的 RpcEndpoint,會將訊息直接通過 Dispatcher 分派到本地的 RpcEndpoint 上,嚴格來說不算出站。如果是遠端的 RpcEndpoint,NettyRpcEnv 會通過 postToOutbox 方法,對於 ask 來的方法的回覆,構造的訊息來源 RpcEndpointRef 會帶有網路層的 client,因此是直接返回;而對於本地直接出站的訊息,則會根據接收者的地址放入 Outbox 的佇列中。一個地址對應著一個 Outbox,在 Outbox 中的訊息非同步的被取出併發送。
接著看入站,入站的訊息會統一先由 NettyRpcEnv 交給 Dispatcher,Dispatcher 在根據訊息的元資料分派到對應的處理 RpcEndpoint 上。Dispatcher 中每一個和 RpcEndpoint 一一對應的地址都會被關聯上一個 MessageLoop,類似於 EventLoop 它會負責處理髮給 RpcEndpoint 的初步分派後的訊息。每個 RpcEndpoint 實際繫結的訊息處理觸發器是 Inbox,Inbox 相當於 Actor Model 中的 Mailbox,負責接收外部發到當前 RpcEndpoint 即 Actor 的訊息。DedicatedMessageLoop 只服務於一個 RpcEndpoint,因此它也只持有一個 Inbox,當訊息由 Dispatcher 發給 DedicatedMessageLoop 之後,它就轉發給唯一的 Inbox;SharedMessageLoop 可服務於多個 RpcEndpoint,所以它的內部有一個 RpcEndpoint 地址對應到 Inbox 的對映,收到 Dispatcher 初步分派後的訊息後它會再次進行分派發送到具體的 RpcEndpoint 中。這種 MessageLoop 的設計對應的是一般的 RpcEndpoint 和 IsolatedRpcEndpoint,主要是提供不同的同步保證和執行緒配置。
具體到 Inbox 的訊息就比較直接了,拋開狀態管理和異常管理不談,主要的內容就是一個同步的先進先出的佇列處理髮布進來的訊息,如上一節程式碼片段所貼,最終根據訊息的型別呼叫 RpcEndpoint 的不同方法。
Flink 的 RPC 實現
現在我們轉過頭來看 Flink 的 RPC 實現。總的來說 Flink 的 RPC 實現依然是基於 Akka 的,這一點與 Spark 基於 Netty 開發的一套不同。Flink 社群有去掉 Akka 依賴的計劃,但進度只是FLINK-4346把介面抽象出來的程度,其底層實現仍然是 Akka,並沒有解決一開始我們提到的使用 Akka 帶來的問題。
我們看到 FLINK-4346 描述的目標,先從整體上了解它的設計方向。
It should address the following issues:
- Add type safety to the sender and receiver of messages. We want proper types methods to be called, rather than having generic message types and pattern matching everywhere. This is similar to typed actors.
- Make the message receivers testable without involving actors, i.e. the methods should be callable directly. When used with other component, the receiver will be wrapped in an actor that calls the methods based on received messages.
- We want to keep the paradigm of single-threaded execution per "actor"
首先我們可以看到的是它仍然強調了 Actor Model 的核心之一,單執行緒的 Actor 訊息處理。其次,我們可以看到和 Spark 有兩個重要的不同點。
其一是不同於 Akka 的 testkit 套路,Flink 強調遠端呼叫和本地呼叫在程式設計模型上的統一性,從而可以在不引入 Actor 一套的情況下直接呼叫 Actor 的方法來進行測試。這一點實際上跟 RMI 是比較相似的,可以建立一個本地的物件除錯,需要訪問遠端物件的時候就建立一個遠端物件的引用。關於這個呼叫程式設計模型上的統一性,後面講到 RpcGateway 和 RpcEndpoint 以及反射呼叫的時候會看到細節,總的來說這一套類似於 Akka 社群已經放棄的 Typed Actors 實現 Actor Model 型別化的方案。
其二是型別化,上面我們提到的程式設計模型本身跟型別關係不大。Flink 為了更好的實現防禦性程式設計,期望在呼叫對應的遠端方法的時候能夠使用上型別系統的優勢來保證引數和返回值的型別匹配,其中主要是返回值的匹配和對應的 RpcGateway 不像無型別的 ActorRef 或 RpcEndpointRef 一樣難以判斷哪些訊息是合法的。不過由於 FLINK-4346 的歷史侷限性,它借鑑了當時 Typed Actors 的實現方案,這個方案後來被廢棄。
由於不需要像 Akka 或 Spark 那樣從 Netty 或者 Aeron 這樣的網路層框架重新搭建訊息分派系統,Flink 的討論主要集中在它復刻 Typed Actors 的程式碼和執行緒模型上。
RMI 式的型別化 RPC 實現
Flink 中的 RPC 實現主要在flink-runtime
模組下的org.apache.flink.runtime.rpc
包中。由於複用了 Akka 的基礎設施,它並不像 Spark 那樣直接依賴傳輸層的實現,也不需要自己的分派資訊。上次 Flink 的 PMC Chair Stephan Ewen 來北京,和他交流的時候確認了 Flink 只把 Akka 作為 RPC 底層來用,並沒有使用 Akka 豐富的監督等其他功能,並且在未來有去掉 Akka 依賴的計劃。
Flink 的 RPC 實現的主要抽象包括
- ActorSystem 的封裝 RpcService
- Actor 與 RpcEndpoint 兩層之間的膠合層 RpcServer
- 業務邏輯的載體 RpcEndpoint
- RpcEndpoint 的位置透明的引用 RpcGateway
- 迷之執行緒模型輔助介面 MainThreadExecutable 和 MainThreadExecutor
可以看到,這個 Spark 和 Akka 基本一一對應的骨架是不一樣的,主要的矛盾點在 RpcServer 這一層上。這是因為相比於前兩者直接實現 Actor 或其等價物,Flink 的 RPC 實現是基於 Akka 的 Actor 實現了自己的 Actor 等價物 RpcEndpoint,這就導致模型的對應關係適配。
這個問題我們談到 RpcServer 的具體程式碼的時候再提。Flink 的程式碼不能像 Spark 那樣按照不同的型別來看,因為類的實現可能涉及到反射訪問另一個類,這種情況下按照功能點來閱讀程式碼會更好理解。
我們首先看到上面抽象的構造過程。最後的輔助介面放在下一節講,其他的抽象構造過程分別如下。
RpcServices 目前的唯一實現 AkkaRpcService 是 Akka 的 ActorSystem 的封裝,基本可以理解成 ActorSystem 的一個介面卡。所以其構造過程也比較簡單,就是將適配的物件引用儲存後返回。複雜的是由 RpcServices 構造的 RpcServer。
RpcServer 的構造有兩個觸發點。我們先看到連線遠端的 RpcEndpoint 時通過RpcServices#connect
構造的 RpcServer。這個方法的兩個過載的區別只在於是否實現 fencing 的功能,即區分監聽同一地址的不同任期的 RpcEndpoint。由於 Flink 的 JobManager 等 RpcEndpoint 會通過主節點選舉選出主節點,監聽同一個地址的可能是節點的不同任期,而上一個任期的請求的回覆應該被過濾掉以免影響當前任期的節點狀態。這點先簡單帶過,我們看到 connect 除此以外的共同部分,摘要如下。
private <C extends RpcGateway> CompletableFuture<C> connectInternal(
String address,
Class<C> clazz,
Function<ActorRef, InvocationHandler> invocationHandlerFactory
) {
// ...
final ActorSelection actorSel = actorSystem.actorSelection(address);
final Future<ActorIdentity> identify = Patterns
.ask(actorSel, new Identify(42), configuration.getTimeout().toMilliseconds())
.mapTo(ClassTag.apply(ActorIdentity.class));
final CompletableFuture<ActorRef> actorRefFuture = FutureUtils.toJava(identify).thenApply(
(ActorIdentity actorIdentity) -> {
if (actorIdentity.getRef() != null) {
return actorIdentity.getRef();
} else ...
});
final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose(
(ActorRef actorRef) -> FutureUtils.toJava(
Patterns.ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), configuration.getTimeout().toMilliseconds())
.mapTo(ClassTag.apply(HandshakeSuccessMessage.class))));
return actorRefFuture.thenCombineAsync(
handshakeFuture,
(