1. 程式人生 > >新一代大數據處理引擎 Apache Flink

新一代大數據處理引擎 Apache Flink

wordcount ted 進步 而已 我認 自己的 特點 功能 track

https://www.ibm.com/developerworks/cn/opensource/os-cn-apache-flink/index.html

大數據計算引擎的發展

這幾年大數據的飛速發展,出現了很多熱門的開源社區,其中著名的有 Hadoop、Storm,以及後來的 Spark,他們都有著各自專註的應用場景。Spark 掀開了內存計算的先河,也以內存為賭註,贏得了內存計算的飛速發展。Spark 的火熱或多或少的掩蓋了其他分布式計算的系統身影。就像 Flink,也就在這個時候默默的發展著。

在國外一些社區,有很多人將大數據的計算引擎分成了 4 代,當然,也有很多人不會認同。我們先姑且這麽認為和討論。

首先第一代的計算引擎,無疑就是 Hadoop 承載的 MapReduce。這裏大家應該都不會對 MapReduce 陌生,它將計算分為兩個階段,分別為 Map 和 Reduce。對於上層應用來說,就不得不想方設法去拆分算法,甚至於不得不在上層應用實現多個 Job 的串聯,以完成一個完整的算法,例如叠代計算。

由於這樣的弊端,催生了支持 DAG 框架的產生。因此,支持 DAG 的框架被劃分為第二代計算引擎。如 Tez 以及更上層的 Oozie。這裏我們不去細究各種 DAG 實現之間的區別,不過對於當時的 Tez 和 Oozie 來說,大多還是批處理的任務。

接下來就是以 Spark 為代表的第三代的計算引擎。第三代計算引擎的特點主要是 Job 內部的 DAG 支持(不跨越 Job),以及強調的實時計算。在這裏,很多人也會認為第三代計算引擎也能夠很好的運行批處理的 Job。

隨著第三代計算引擎的出現,促進了上層應用快速發展,例如各種叠代計算的性能以及對流計算和 SQL 等的支持。Flink 的誕生就被歸在了第四代。這應該主要表現在 Flink 對流計算的支持,以及更一步的實時性上面。當然 Flink 也可以支持 Batch 的任務,以及 DAG 的運算。

或許會有人不同意以上的分類,我覺得其實這並不重要的,重要的是體會各個框架的差異,以及更適合的場景。並進行理解,沒有哪一個框架可以完美的支持所有的場景,也就不可能有任何一個框架能完全取代另一個,就像 Spark 沒有完全取代 Hadoop,當然 Flink 也不可能取代 Spark。本文將致力描述 Flink 的原理以及應用。

Flink 簡介

很多人可能都是在 2015 年才聽到 Flink 這個詞,其實早在 2008 年,Flink 的前身已經是柏林理工大學一個研究性項目, 在 2014 被 Apache 孵化器所接受,然後迅速地成為了 ASF(Apache Software Foundation)的頂級項目之一。Flink 的最新版本目前已經更新到了 0.10.0 了,在很多人感慨 Spark 的快速發展的同時,或許我們也該為 Flink 的發展速度點個贊。

Flink 是一個針對流數據和批數據的分布式處理引擎。它主要是由 Java 代碼實現。目前主要還是依靠開源社區的貢獻而發展。對 Flink 而言,其所要處理的主要場景就是流數據,批數據只是流數據的一個極限特例而已。再換句話說,Flink 會把所有任務當成流來處理,這也是其最大的特點。Flink 可以支持本地的快速叠代,以及一些環形的叠代任務。並且 Flink 可以定制化內存管理。在這點,如果要對比 Flink 和 Spark 的話,Flink 並沒有將內存完全交給應用層。這也是為什麽 Spark 相對於 Flink,更容易出現 OOM 的原因(out of memory)。就框架本身與應用場景來說,Flink 更相似與 Storm。如果之前了解過 Storm 或者 Flume 的讀者,可能會更容易理解 Flink 的架構和很多概念。下面讓我們先來看下 Flink 的架構圖。

圖 1. Flink 架構圖

技術分享

如圖 1 所示,我們可以了解到 Flink 幾個最基礎的概念,Client、JobManager 和 TaskManager。Client 用來提交任務給 JobManager,JobManager 分發任務給 TaskManager 去執行,然後 TaskManager 會心跳的匯報任務狀態。看到這裏,有的人應該已經有種回到 Hadoop 一代的錯覺。確實,從架構圖去看,JobManager 很像當年的 JobTracker,TaskManager 也很像當年的 TaskTracker。然而有一個最重要的區別就是 TaskManager 之間是是流(Stream)。其次,Hadoop 一代中,只有 Map 和 Reduce 之間的 Shuffle,而對 Flink 而言,可能是很多級,並且在 TaskManager 內部和 TaskManager 之間都會有數據傳遞,而不像 Hadoop,是固定的 Map 到 Reduce。

Flink 中的調度簡述

在 Flink 集群中,計算資源被定義為 Task Slot。每個 TaskManager 會擁有一個或多個 Slots。JobManager 會以 Slot 為單位調度 Task。但是這裏的 Task 跟我們在 Hadoop 中的理解是有區別的。對 Flink 的 JobManager 來說,其調度的是一個 Pipeline 的 Task,而不是一個點。舉個例子,在 Hadoop 中 Map 和 Reduce 是兩個獨立調度的 Task,並且都會去占用計算資源。對 Flink 來說 MapReduce 是一個 Pipeline 的 Task,只占用一個計算資源。類同的,如果有一個 MRR 的 Pipeline Task,在 Flink 中其也是一個被整體調度的 Pipeline Task。在 TaskManager 中,根據其所擁有的 Slot 個數,同時會擁有多個 Pipeline。

在 Flink StandAlone 的部署模式中,這個還比較容易理解。因為 Flink 自身也需要簡單的管理計算資源(Slot)。當 Flink 部署在 Yarn 上面之後,Flink 並沒有弱化資源管理。也就是說這時候的 Flink 在做一些 Yarn 該做的事情。從設計角度來講,我認為這是不太合理的。如果 Yarn 的 Container 無法完全隔離 CPU 資源,這時候對 Flink 的 TaskManager 配置多個 Slot,應該會出現資源不公平利用的現象。Flink 如果想在數據中心更好的與其他計算框架共享計算資源,應該盡量不要幹預計算資源的分配和定義。

需要深度學習 Flink 調度讀者,可以在 Flink 的源碼目錄中找到 flink-runtime 這個文件夾,JobManager 的 code 基本都在這裏。

Flink 的生態圈

一個計算框架要有長遠的發展,必須打造一個完整的 Stack。不然就跟紙上談兵一樣,沒有任何意義。只有上層有了具體的應用,並能很好的發揮計算框架本身的優勢,那麽這個計算框架才能吸引更多的資源,才會更快的進步。所以 Flink 也在努力構建自己的 Stack。

Flink 首先支持了 Scala 和 Java 的 API,Python 也正在測試中。Flink 通過 Gelly 支持了圖操作,還有機器學習的 FlinkML。Table 是一種接口化的 SQL 支持,也就是 API 支持,而不是文本化的 SQL 解析和執行。對於完整的 Stack 我們可以參考下圖。

圖 2. Flink 的 Stack

技術分享

Flink 為了更廣泛的支持大數據的生態圈,其下也實現了很多 Connector 的子項目。最熟悉的,當然就是與 Hadoop HDFS 集成。其次,Flink 也宣布支持了 Tachyon、S3 以及 MapRFS。不過對於 Tachyon 以及 S3 的支持,都是通過 Hadoop HDFS 這層包裝實現的,也就是說要使用 Tachyon 和 S3,就必須有 Hadoop,而且要更改 Hadoop 的配置(core-site.xml)。如果瀏覽 Flink 的代碼目錄,我們就會看到更多 Connector 項目,例如 Flume 和 Kafka。

Flink 的部署

Flink 有三種部署模式,分別是 Local、Standalone Cluster 和 Yarn Cluster。對於 Local 模式來說,JobManager 和 TaskManager 會公用一個 JVM 來完成 Workload。如果要驗證一個簡單的應用,Local 模式是最方便的。實際應用中大多使用 Standalone 或者 Yarn Cluster。下面我主要介紹下這兩種模式。

Standalone 模式

在搭建 Standalone 模式的 Flink 集群之前,我們需要先下載 Flink 安裝包。這裏我們需要下載 Flink 針對 Hadoop 1.x 的包。下載並解壓後,進到 Flink 的根目錄,然後查看 conf 文件夾,如下圖。

圖 3. Flink 的目錄結構

技術分享

我們需要指定 Master 和 Worker。Master 機器會啟動 JobManager,Worker 則會啟動 TaskManager。因此,我們需要修改 conf 目錄中的 master 和 slaves。在配置 master 文件時,需要指定 JobManager 的 UI 監聽端口。一般情況下,JobManager 只需配置一個,Worker 則須配置一個或多個(以行為單位)。示例如下:

1 2 3 4 5 micledeMacBook-Pro:conf micle$ cat masters localhost:8081 micledeMacBook-Pro:conf micle$ cat slaves localhost

在 conf 目錄中找到文件 flink-conf.yaml。在這個文件中定義了 Flink 各個模塊的基本屬性,如 RPC 的端口,JobManager 和 TaskManager 堆的大小等。在不考慮 HA 的情況下,一般只需要修改屬性 taskmanager.numberOfTaskSlots,也就是每個 Task Manager 所擁有的 Slot 個數。這個屬性,一般設置成機器 CPU 的 core 數,用來平衡機器之間的運算性能。其默認值為 1。配置完成後,使用下圖中的命令啟動 JobManager 和 TaskManager(啟動之前,需要確認 Java 的環境是否已經就緒)。

圖 4. 啟動 StandAlone 模式的 Flink

技術分享

啟動之後我們就可以登陸 Flink 的 GUI 頁面。在頁面中我們可以看到 Flink 集群的基本屬性,在 JobManager 和 TaskManager 的頁面中,可以看到這兩個模塊的屬性。目前 Flink 的 GUI,只提供了簡單的查看功能,無法動態修改配置屬性。一般在企業級應用中,這是很難被接受的。因此,一個企業真正要應用 Flink 的話,估計也不得不加強 WEB 的功能。

圖 5. Flink 的 GUI 頁面

技術分享

Yarn Cluster 模式

在一個企業中,為了最大化的利用集群資源,一般都會在一個集群中同時運行多種類型的 Workload。因此 Flink 也支持在 Yarn 上面運行。首先,讓我們通過下圖了解下 Yarn 和 Flink 的關系。

圖 6. Flink 與 Yarn 的關系

技術分享

在圖中可以看出,Flink 與 Yarn 的關系與 MapReduce 和 Yarn 的關系是一樣的。Flink 通過 Yarn 的接口實現了自己的 App Master。當在 Yarn 中部署了 Flink,Yarn 就會用自己的 Container 來啟動 Flink 的 JobManager(也就是 App Master)和 TaskManager。

了解了 Flink 與 Yarn 的關系,我們就簡單看下部署的步驟。在這之前需要先部署好 Yarn 的集群,這裏我就不做介紹了。我們可以通過以下的命令查看 Yarn 中現有的 Application,並且來檢查 Yarn 的狀態。

1 yarn application –list

如果命令正確返回了,就說明 Yarn 的 RM 目前已經在啟動狀態。針對不同的 Yarn 版本,Flink 有不同的安裝包。我們可以在 Apache Flink 的下載頁中找到對應的安裝包。我的 Yarn 版本為 2.7.1。再介紹具體的步驟之前,我們需要先了解 Flink 有兩種在 Yarn 上面的運行模式。一種是讓 Yarn 直接啟動 JobManager 和 TaskManager,另一種是在運行 Flink Workload 的時候啟動 Flink 的模塊。前者相當於讓 Flink 的模塊處於 Standby 的狀態。這裏,我也主要介紹下前者。

在下載和解壓 Flink 的安裝包之後,需要在環境中增加環境變量 HADOOP_CONF_DIR 或者 YARN_CONF_DIR,其指向 Yarn 的配置目錄。如運行下面的命令:

1 export HADOOP_CONF_DIR=/etc/hadoop/conf

這是因為 Flink 實現了 Yarn 的 Client,因此需要 Yarn 的一些配置和 Jar 包。在配置好環境變量後,只需簡單的運行如下的腳本,Yarn 就會啟動 Flink 的 JobManager 和 TaskManager。

1 yarn-session.sh –d –s 2 –tm 800 –n 2

上面的命令的意思是,向 Yarn 申請 2 個 Container 啟動 TaskManager(-n 2),每個 TaskManager 擁有兩個 Task Slot(-s 2),並且向每個 TaskManager 的 Container 申請 800M 的內存。在上面的命令成功後,我們就可以在 Yarn Application 頁面看到 Flink 的紀錄。如下圖。

圖 7. Flink on Yarn

技術分享

如果有些讀者在虛擬機中測試,可能會遇到錯誤。這裏需要註意內存的大小,Flink 向 Yarn 會申請多個 Container,但是 Yarn 的配置可能限制了 Container 所能申請的內存大小,甚至 Yarn 本身所管理的內存就很小。這樣很可能無法正常啟動 TaskManager,尤其當指定多個 TaskManager 的時候。因此,在啟動 Flink 之後,需要去 Flink 的頁面中檢查下 Flink 的狀態。這裏可以從 RM 的頁面中,直接跳轉(點擊 Tracking UI)。這時候 Flink 的頁面如圖 8。

圖 8. Flink 的頁面

技術分享

對於 Flink 安裝時的 Trouble-shooting,可能更多時候需要查看 Yarn 相關的 log 來分析。這裏就不多做介紹,讀者可以到 Yarn 相關的描述中查找。

Flink 的 HA

對於一個企業級的應用,穩定性是首要要考慮的問題,然後才是性能,因此 HA 機制是必不可少的。另外,對於已經了解 Flink 架構的讀者,可能會更擔心 Flink 架構背後的單點問題。和 Hadoop 一代一樣,從架構中我們可以很明顯的發現 JobManager 有明顯的單點問題(SPOF,single point of failure)。 JobManager 肩負著任務調度以及資源分配,一旦 JobManager 出現意外,其後果可想而知。Flink 對 JobManager HA 的處理方式,原理上基本和 Hadoop 一樣(一代和二代)。

首先,我們需要知道 Flink 有兩種部署的模式,分別是 Standalone 以及 Yarn Cluster 模式。對於 Standalone 來說,Flink 必須依賴於 Zookeeper 來實現 JobManager 的 HA(Zookeeper 已經成為了大部分開源框架 HA 必不可少的模塊)。在 Zookeeper 的幫助下,一個 Standalone 的 Flink 集群會同時有多個活著的 JobManager,其中只有一個處於工作狀態,其他處於 Standby 狀態。當工作中的 JobManager 失去連接後(如宕機或 Crash),Zookeeper 會從 Standby 中選舉新的 JobManager 來接管 Flink 集群。

對於 Yarn Cluaster 模式來說,Flink 就要依靠 Yarn 本身來對 JobManager 做 HA 了。其實這裏完全是 Yarn 的機制。對於 Yarn Cluster 模式來說,JobManager 和 TaskManager 都是被 Yarn 啟動在 Yarn 的 Container 中。此時的 JobManager,其實應該稱之為 Flink Application Master。也就說它的故障恢復,就完全依靠著 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 一樣)。由於完全依賴了 Yarn,因此不同版本的 Yarn 可能會有細微的差異。這裏不再做深究。

Flink 的 Rest API 介紹

Flink 和其他大多開源的框架一樣,提供了很多有用的 Rest API。不過 Flink 的 RestAPI,目前還不是很強大,只能支持一些 Monitor 的功能。Flink Dashboard 本身也是通過其 Rest 來查詢各項的結果數據。在 Flink RestAPI 基礎上,可以比較容易的將 Flink 的 Monitor 功能和其他第三方工具相集成,這也是其設計的初衷。

在 Flink 的進程中,是由 JobManager 來提供 Rest API 的服務。因此在調用 Rest 之前,要確定 JobManager 是否處於正常的狀態。正常情況下,在發送一個 Rest 請求給 JobManager 之後,Client 就會收到一個 JSON 格式的返回結果。由於目前 Rest 提供的功能還不多,需要增強這塊功能的讀者可以在子項目 flink-runtime-web 中找到對應的代碼。其中最關鍵一個類 WebRuntimeMonitor,就是用來對所有的 Rest 請求做分流的,如果需要添加一個新類型的請求,就需要在這裏增加對應的處理代碼。下面我例舉幾個常用 Rest API。

1.查詢 Flink 集群的基本信息: /overview。示例命令行格式以及返回結果如下:

1 2 $ curl http://localhost:8081/overview{"taskmanagers":1,"slots-total":16, "slots-available":16,"jobs-running":0,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0}

2.查詢當前 Flink 集群中的 Job 信息:/jobs。示例命令行格式以及返回結果如下:

1 2 $ curl http://localhost:8081/jobs{"jobs-running":[],"jobs-finished": ["f91d4dd4fdf99313d849c9c4d29f8977"],"jobs-cancelled":[],"jobs-failed":[]}

3.查詢一個指定的 Job 信息: /jobs/jobid。這個查詢的結果會返回特別多的詳細的內容,這是我在瀏覽器中進行的測試,如下圖:

圖 9. Rest 查詢具體的 Job 信息

技術分享

想要了解更多 Rest 請求內容的讀者,可以去 Apache Flink 的頁面中查找。由於篇幅有限,這裏就不一一列舉。

運行 Flink 的 Workload

WordCount 的例子,就像是計算框架的 helloworld。這裏我就以 WordCount 為例,介紹下如何在 Flink 中運行 workload。

在安裝好 Flink 的環境中,找到 Flink 的目錄。然後找到 bin/flink,它就是用來提交 Flink workload 的工具。對於 WordCount,我們可以直接使用已有的示例 jar 包。如運行如下的命令:

1 ./bin/flink run ./examples/WordCount.jar hdfs://user/root/test hdfs://user/root/out

上面的命令是在 HDFS 中運行 WordCount,如果沒有 HDFS 用本地的文件系統也是可以的,只需要將“hdfs://”換成“file://”。這裏我們需要強調一種部署關系,就是 StandAlone 模式的 Flink,也是可以直接訪問 HDFS 等分布式文件系統的。

結束語

Flink 是一個比 Spark 起步晚的項目,但是並不代表 Flink 的前途就會暗淡。Flink 和 Spark 有很多類似之處,但也有很多明顯的差異。本文並沒有比較這兩者之間的差異,這是未來我想與大家探討的。例如 Flink 如何更高效的管理內存,如何進一步的避免用戶程序的 OOM。在 Flink 的世界裏一切都是流,它更專註處理流應用。由於其起步晚,加上社區的活躍度並沒有 Spark 那麽熱,所以其在一些細節的場景支持上,並沒有 Spark 那麽完善。例如目前在 SQL 的支持上並沒有 Spark 那麽平滑。在企業級應用中,Spark 已經開始落地,而 Flink 可能還需要一段時間的打磨。在後續文章中,我會詳細介紹如何開發 Flink 的程序,以及更多有關 Flink 內部實現的內容。

相關主題

  • Apache Flink
  • Apache Tachyon
  • Apache Hadoop
  • developerWorks 開源技術主題:查找豐富的操作信息、工具和項目更新,幫助您掌握開源技術並將其用於 IBM 產品。

新一代大數據處理引擎 Apache Flink