Flink即將在1.7版本釋出全新的Kafka聯結器
現狀與問題 Apache Kafka作為開源界最流行的訊息中介軟體之一,一直以來被廣泛應用於很多大資料處理框架中。毫無疑問,Apache Flink作為新一代的大資料計算框架,也非常重視跟Kafka進行生態銜接。Flink所實現並推崇的端到端的“恰好一次”(Exactly-Once)的事件處理語義的典型場景就是配合Kafka來實現的,由此可見Kafka在Flink的上下游生態中佔有著非常重要的地位。
從Kafka 0.8版本開始,Flink都以Kafka client的版本為依據,為不同的Kafka client提供特定的connector。在現有的程式碼庫中,有針對Kafka client 0.8, 0.9, 0.10, 0.11這些版本的聯結器。如下圖所示:
按照社群之前的計劃,隨著Kafka 1.0, 1.1, 2.0… 版本的釋出,Flink kafka connector也會隨之迭代下去,以維護不同版本的Kafka client可能出現的issue。
這種模式在最初沒有暴露出明顯的問題,而且看起來也是合理的。因為雖然Kafka client承諾保證API的向後相容性。但是:
不同的client版本維護的特性集合不同,比如 Kafka Producer事務自0.11版本才開始提供; 拆分不同版本的connector有利於專案管理,比如issue的分類維護; 出於測試目的以及其他目的,Kafka connector使用了Mini版的Kafka Server、Admin API、非公開的API,這些都跟特定版本有強繫結關係; 所以,看起來似乎沒有毛病。
但與此同時,Kafka社群也在快速發展,版本一個接一個地在向前迭代。目前,Flink的這種以Kafka 版本拆分connector的模式很顯然已經處於非常被動的地位,並且由於開源社群的開發模式使其追隨的速度也大大滯後Kafka的發版速度。可以看到至今為止,仍然沒有釋出connector-1.0(這個connector原本是打算構建的,也是由Oceanus團隊提交的PR,但社群明顯發現了這種模式存在著很大的問題,具體的問題我們後面會詳談,這些問題也催生了一個全新的connector的誕生)。
版本與相容性 很多系統在版本演進的過程中都對外號稱保證向後相容性。相容性是一個框架是否成熟的考量標準之一,但某種意義上它既是優勢也是負擔,而將多個系統混在一起,存在多種版本不同的依賴關係又使得使用者感到很困擾。就拿 Flink 跟 Kafka 來舉例,使用者使用Flink 1.6.0版本,但是Kafka Server的版本已經升級到了2.0。但是官方對Flink 1.6.0版本只提供了針對0.11版本的connector:
也許有人會認為,將Flink-connector-kafka-0.11對Kafka client的依賴升級到2.0不就可以了?理想情況下確實應該這樣。但是,Flink connector無法讓你做到這一點。因為由於一些技術實現的需求,比如Producer端事務的恢復操作,使得Flink在實現connector的時候不得不借助於反射機制來獲取client特定類的內部變數來執行相應的邏輯。我們都知道,所謂的相容性保證只針對公開的API,對於類的內部實現沒有哪個框架會做出這種保證。接下來我們會看到,正是因為這一點阻止了我們直接升級Kafka client依賴的方案。退一步說,就算內部實現沒有改變,但如果沒有通過Flink已有的針對特性構建的整合測試,你也無法確定這種做法在線上是否能正常工作。
新聯結器的優勢 新聯結器做了這樣一個決定——只跟蹤最新版本的Kafka client並保證它跟Flink的完美銜接。當然由於一些歷史原因(Flink原先實現Kafka connector的模式)以及實現問題(少量程式碼使用反射訪問Kafka client的內部變數),已有的Kafka connector會繼續保留且不會在新聯結器裡提供適配與支援。但自Kafka 1.0開始(到目前為止已有三個client版本1.0、1.1、2.0)新聯結器都可以支援並能通過Flink全部的整合測試。它所體現出來的優勢是:一旦後續Kafka釋出新的client版本,這個connector就會開始適配新版本:
升級Kafka client的依賴; 升級connector並提供新功能; 適配非公開API的改動; 適配Admin API的改動; 重構整合測試驗證以相關的feature; 以端到端的測試驗證升級過client版本的connector訪問舊版本的Kafka Server; 而不是再次構建一個針對新版本的Kafka connector。
新聯結器的實現 簡而言之,新聯結器的實現包含量兩部分的工作:
重構:部分重構基礎聯結器以及老版本的聯結器(尤其是類名稱); 實現:提供一個新聯結器的實現; Flink提供了一個基礎connector模組,它是實現所有connector的核心模組,所有的connector都依賴於基礎connector。而由於舊connector按Kafka client版本拆分的方式,存在不同的feature在不同的connector中對外開放的問題,所以出現了太多的級聯依賴,後實現的connector保有對基礎connector以及離它最近版本的connector的依賴,而在測試中甚至出現了跨版本的依賴。這些繼承體系中,有的類名中帶有“Base”字樣,有的類名中帶有“08”、“09”等版本號字樣。我們對新connector的使用的統一命名約定是:類名中將不會攜帶任何版本號資訊,新connector只依賴基礎connector且不再引入對老版本connector的直接或間接依賴。
在對現有的connector進行重構以確保新connector能滿足其命名規範後,我們開始實現新connector。引入新connector不僅僅是進行類檔案遷移這麼簡單。我們需要知道,當前Flink原始碼庫中最新的connector所支援的Kafka client版本( 0.11)跟Kafka最新發布的2.0的client的版本的差異(尤其是client內部的變動、admin API的變化)很大。這裡面我們列舉幾個典型的差異:
Flink 進行事務恢復需要用到的sequenceNumbers在新版本內部被重新命名為了nextSequence; 之前不少基於Zookeeper獲取元資料的API被移除,轉而使用Admin API; Slf4j的依賴版本高於Flink自身對其的依賴版本,導致類載入報錯; 所有“kafka.consumer”開頭的包匯入在2.0都已被廢棄(相應的包中的類被刪除); Kafka client一些API修改了預設的超時時間,導致Flink原有的整合測試報錯; … 除此之外,我們還需要破除多版本的級聯依賴,重新梳理程式碼以使得它適配新的connector。