Flink 在又拍雲日誌批處理中的實踐
阿新 • • 發佈:2021-02-03
日前,由又拍雲舉辦的大資料與 AI 技術實踐|Open Talk 杭州站沙龍在杭州西溪科創園順利舉辦。本次活動邀請了有贊、個推、方得智慧、又拍雲等公司核心技術開發者,現場分享各自領域的大資料技術經驗和心得。以下內容整理自又拍雲資深開發工程師張召現場分享:
張召,資深開發工程師,目前負責又拍雲 CDN 的重新整理預熱、日誌處理和運維平臺開發。熟悉 OpenResty,在 Web 開發領域經驗頗豐,目前熱衷研究大資料處理相關技術。
大家好,我是來自又拍雲的張召,今天主要分享又拍雲多資料來源日誌處理選型 Flink 的考量,以及 Flink 落地過程中遇到的問題和解決方案。
## 為什麼用 Flink 做批處理
在選用 Flink 前,我們對日誌批處理的整個業務需求分為三步:資料來源採集、日誌處理、結果的儲存。我們的日誌量在 100G/h,單機服務處理速度慢、擴容不方便,一些相似的需求都是以編碼形式完成的。另外,資料處理流程複雜,需要在多個服務間流轉,迫切需要一個方案來解決問題。
前期我們調研了資料庫,發現數據庫裡沒有多維度的反覆總結和挖掘的功能,所以我們放棄了選用資料庫的方案,選用 MapReduce 裡的 hadoop 這條元件。實際生產中發現它經常在寫入的時候出現一些錯誤,導致無法做一些聚合的操作。接著我們選擇了 Spark,新的問題又出現了:提交任務時,Restful API 介面的支援不全面;web 控制檯中虛擬 IP 無法訪問內部。
基於以上原因,我們需要一個更好的解決方案。通過比較之後,我們發現了 Flink。Flink 規避了前面所有的問題,後面還提供一套完整的 Restful API。不僅能夠渲染出這個頁面,還可以通過 Submit New Job 直接提交任務。同時,我們對老服務升級的過程中,逐漸明白了我們日誌資料的特點,以及當前我們需要挖掘日誌資料的哪些方面。**在盤點了手頭上可呼叫的資源後,我們希望部署的服務整個系統是可觀測、可維護的,所以基於以上各種原因,最終我們放棄 Spark 方案,選擇了 Flink 。**
## Flink 基礎知識
**Flink 元件棧**
如下圖所示,這是一個分散式系統,整體也比較簡單。最左邊的 Flink Client 支援客戶端現在的提交方式,後面會談到它支援提交 Restful API 介面以及通過命令列等 5 種手段向這個 Job Manager 提交任務。
![](https://upload-images.jianshu.io/upload_images/80097-4c7d88dddbda2da7.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
Job Manager 是分散式系統裡的 master 節點,master 節點拿到資料之後會對架包進行分析,而後把相關其他資訊給傳送到對應的 TaskManager 節點。TaskManager 節點拿到資訊後才真正執行 Job,Job Manager 最主要的作用就是解析這個圖以及維持整個叢集,比如心跳、資源排程、HA 高可用、檔案儲存等,這是 Flink 提交任務 runtime 的過程。
接著看 Flink 靜態的整體設計,底層是部署部分,稍後展開講。中間的核心部分是 Runtime,分別封裝了兩個不同的 API:DataStream 是流處理,是現在 Flink 用的最多的場景;DataSet 是我們用到的批處理方式。雖然現在 Flink 號稱支援流批一體處理,但是它目前版本兩個介面是分開的,今年 12 月發的 1.12 版本已經不鼓勵用 DataSet 相關的 API,這部分功能合到了 DataStream 裡。但由於我們部署的版本還在 1.1,沒有升級,所以我們還沒有把這些 Job 遷到 DataStream 上去。
![](https://upload-images.jianshu.io/upload_images/80097-29f274389ebd9a28.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
接下來我們探索最上層的 tabl circle,但使用的最終效果並不好,因為無論是文件裡,還是程式碼裡寫的支援限度是比較有限的。比如去執行 circle,但 circle 想把最終結果輸出到 PG 裡面的時候,它就出現了一個 bug,它 PG 的資料庫最終拼出來的地址是錯的,它的 host 和 pot 少了一個反斜線。這個 bug 非常簡單,但是現在都沒有修復。所以我認為最上層這部分可能測試的還不完善,也不是很穩定。所以我們最終程式碼的實現和業務集中編寫也是放在呼叫的 DataSet API 這部分來做的。
另外我們還做了些小的工作,我們基於又拍雲端儲存系統,擴充套件了它的相關功能,能夠支援 Flink 的處理結果直接輸出到雲端儲存上,對整體程式碼起到簡化作用。
**JobManager 和 TaskManager**
JobManager 的作用主要體現在裡面的元件。比如 Dataflow Graph 可以把 Flink 客戶端提交的架包分析成一個可以執行的 graph,分發到下面的 TaskManager 節點裡面去。另外一個我們比較關注的元件是 Actor System,它是由 ScadAKKA 非同步網路元件實現的。我們後期部署時發現有很多 AKKA time out 這類問題,這意味著 JobManager 元件和 TaskManager 元件進行通訊的時候出現了問題。
![](https://upload-images.jianshu.io/upload_images/80097-42d2a7a04b8a4f95.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
再看 TaskManager 主要關注的概念,當 TaskManager 和外界系統發生互動時,它用的不是 actor 模型,actor 模型主要是非同步通訊,強調的是快。它和外部通訊時,TaskManager 用的是 Netty,輸入資料更加的穩定。
這裡要著重關注一下 Task Slot 概念,一些分享的最佳實踐案例提到 TaskManager 裡的 slot 最好和當前機器 CPU 核數保持 1:1 的設定。我們最初按照 1:1 設計跑一些小的 job 的時候很好,但資料量上升時經常會出現一些 time out 的問題。原因在於 Kubernetes 提供的 CPU 只是一個 CPU 的實踐片,不能等同物理機上的 CPU,當在 TaskManager 下部署多個的時候,雖然它們的記憶體會被分攤掉,但 CPU 卻是共享的。在這種狀況下,整個 TaskManager 就不是特別穩定。所以我們最終設定大概在 1:4 或 1:8。具體資料應該是從當前環境內的網路狀況和經驗值來確定的。
**Flink 部署**
剛開始部署 Flink 時,我們是比較懵的,因為 Flink 部署文件裡介紹了很多模式,比如部署在 standalone,Kubernetes、YARN 或者是 Mesos,還有一些應用實踐都比較少的模式。雖然我們在雲平臺上搞一個 Kubernetes 的操作,但我們做不到直接使用 Kubernetes 託管式的服務,所以最終採用的是 Standalone on Docker 模式,如下圖所示:
![Standalone on Docker 模式](https://upload-images.jianshu.io/upload_images/80097-1262792832b52dce.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
- Standalone 模式下,Master 和 TaskManager 可以執行在同一臺機器或者不同的機器上;
- Master 程序中,Standalone ResourceManager 的作用是對資源進行管理。當用戶通過 Flink Cluster Client 將 JobGraph 提交給 Master 時,JobGraph 先經過 Dispatcher;
- 當 Dispatcher 收到請求,生成 JobManager。接著 JobManager 程序向 Standalone ResourceManager 申請資源,最終再啟動 TaskManager;
- TaskManager 啟動後,經歷註冊後 JobManager 將具體的 Task 任務分發給 TaskManager 去執行。
**Flink 提交任務**
Flink 提供豐富的客戶端操作提交任務和與任務進行互動,包括 Flink 命令列、Scala Shell、SQL Client、
Restful API 和 Web。
![](https://upload-images.jianshu.io/upload_images/80097-7645e98f9f5e60af.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
最重要的是命令列,其次是 SQL Client 用於提交 SQL 任務的執行,以及 Scala Shell 提交 Table API 的任務,還提供可以通過 http 方式進行呼叫的 Restful 服務,此外還有 Web 的方式可以提交任務。對我們非常實用的是 Restful API 功能。目前的服務裡,除了拉取原始日誌這塊程式碼沒有動,其他一些 go 自研元件的統計、排序等後續的操作現在統統不用了,直接呼叫 Flink 相關的介面。
Flink 是一個非同步執行的過程。呼叫介面傳遞任務後,緊接著會把 taster 的 ID 返還給你,後續的操作裡面可以通過這個介面不斷去輪循,發現當前任務的執行情況再進行下一步決策。綜合來看,Flink 的 Restful API 介面,對於我們這種異構的、非 JAVA 系的團隊來說還是非常方便的。
## 使用批處理時遇到的問題
**網路問題**
![](https://upload-images.jianshu.io/upload_images/80097-7e4777837a4cbc7e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
當我們逐步遷移日誌服務時,開始日誌量比較小,Flink 執行的非常好;當發現它負載不了,出現 GVM 堆錯誤之類的問題時也只需要把相關引數調大就可以了,畢竟雲平臺上資源還是比較富裕的,操作也很方便。
但當我們越來越信任它,一個 job 上百 G 流量時,整個 tap 圖就變成一條線,網路問題就出現了。此前有心跳超時或者任務重試之類的問題,我們並不是特別在意,因為失敗後 Flink 支援重試,我們通過 restful 介面也能夠感知到,所以失敗就再試一次。但是隨著後面的任務量加大,每執行一次代價就越來越大了,導致提交的越多當前整個叢集就會越來越惡化。
![](https://upload-images.jianshu.io/upload_images/80097-738190c0de6d6978.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
當這種上百 G 的日誌批處理任務放進去後經常會出現三類錯誤:最上面紅線畫出的 akkaTimeout 問題是前面講的 JobManager 和 TaskManager 相互通訊出現的問題;像心跳超時或連結被重置的問題也非常多。
為什麼我們沒有完全把這個問題處理掉呢?是因為我們看了一些阿里的 Flink on K8S 的經驗總結。大家有興趣也可以看一下。
![](https://upload-images.jianshu.io/upload_images/80097-3daa4a75d37108fc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
這篇文章中面對同樣的問題,阿里團隊提出將網路放到 K8S 網路虛擬化會實現一定的效能,我們參考了這種解決方案。具體來說,需要對 Flink 配置進行一些調整,另外有一些涉及 connection reset by peer 的操作:
**調整 Flink 配置引數**
- 調大網路容錯性, 也就是配置引數中 timeout 相關的部分。比如心跳 5 秒一次超時了就調成 20 秒或者 30 秒,注意不可以完全禁掉或者調到很大;
- 開啟壓縮。如果是以純文字的形式或者不是壓縮包的形式上傳,Flink 會並行讀取檔案加快處理速度,所以前期傾向上傳解壓後的文字;當網路開銷變大後,我們就選擇開啟檔案壓縮,希望通過 CPU 的壓力大一點,儘量減少網路開銷。此外,TaskManager 或者是 JobManager 和 TaskManager 之間進行通訊也可以開啟壓縮;
- 利用快取, 如 `taskmanager.memory.network.fraction` 等,引數配置比較靈活;
- 減少單個 task manager 下 task slots 的數量。
**Connection reset by peer**
- 不要有異構網路環境(儘量不要跨機房訪問)
- 雲服務商的機器配置網絡卡多佇列 (將例項中的網路中斷分散給不同的CPU處理,從而提升效能)
- 選取雲服務商提供的高效能網路外掛:例如阿里雲的 Terway
- Host network,繞開 K8s 的虛擬化網路(需要一定的開發量)
由於 Connection reset by peer 的方案涉及到跨部門協調,實施起來比較麻煩,所以我們目前能夠緩解網路問題的方案是對 Flink 配置進行一些調整,通過這種手段,當前叢集的網路問題有了很大程度的緩解。
**資源浪費**
standlone 模式下,整個叢集配置資源的總額取決於當前所有 job 裡最大的 job 需要的容量。如下圖所示,最下面不同任務步驟之間拷貝的資料已經達到了 150G+,能夠緩解這種問題的辦法是不斷配置更大的引數。
![](https://upload-images.jianshu.io/upload_images/80097-bab170c9903a5370.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
但由於 Flink 這一套後面有一個 JVM 的虛擬機器,JVM 虛擬機器經常申請資源後並沒有及時釋放掉,所以一個容器一旦跑過一個任務後,記憶體就會飆上去。當不斷拉大配置,且配置數量還那麼多的情況下,如果我們的任務只是做一個小時級的日誌處理,導致真正用到的資源量很少,最終的效果也不是很好,造成資源浪費。
**job 卡死**
在容量比較大後,我們發現會出現 job 卡死,經常會出現量大的 job 載入進行到一半的時候就卡住了。如下圖所示(淺藍色是已經完成的,鮮綠色表示正在進行的),我們試過不干預它,那麼這個任務就會三五個小時甚至是八個小時的長久執行下去,直到它因為心跳超時這類的原因整體 cross 掉。
![](https://upload-images.jianshu.io/upload_images/80097-61034eefc8657994.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
這個問題目前沒有完全定位出來,所以現在能採取的措施也只是通過 restful 介面檢查任務的時候,給它設定一個最大的閾值。當超過這個閾值就認為這個任務已經完全壞掉了,再通過介面把它取消掉。
## Flink 帶來的收益
下圖所示是日誌處理的某一環節,每一個小方塊代表一個服務,整個服務的鏈路比較長。當有多個數據源載入一個數據時,它會先 transfer porter 放到又拍雲的雲端儲存裡,由 log-merge 服務進行轉換,再根據當前服務的具體業務需求,最終才會存到雲端儲存或者存到 redis。
![](https://upload-images.jianshu.io/upload_images/80097-530bb298420b09a2.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
任務和任務之間的銜接是通過兩種方式:一種是人為之間進行約定,比如我是你的下游元件,我們約定延遲 3 個小時,預設 3 個小時後你已經資料處理好,我就去執行一次;第二種是用 ASQ,我處理結束後推送訊息,至於你消費不消費、消費是否成功,上游不需要關心。**雖然原本正常的情況下服務執行也很穩定,但一旦出現問題再想定位、操縱整個系統,追捕一些日誌或重跑一些資料的時候就比較痛苦。這一點在我們引入到 Flink 後,整體上有非常大的改進。**
![](https://upload-images.jianshu.io/upload_images/80097-9aa5537ceedb0aca.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
目前只有任務管理部分是複用了之前的程式碼,相當於採集板塊。採集好資料直接向 Flink 提交當前的 job,Flink 處理好後直接存進雲端儲存。我們的任務管理主要分兩類功能,一個是採集,另一個是動態監控當前任務的進行結果。總的來看,重構後相當於形成了一個閉環,無論是 Flink 處理出現問題,亦或是儲存有問題,任務管理系統都會去重跑,相當於減少一些後期的運維工作。
## 總結
選擇 standalone 系統部署一套 Flink 系統,又要它處理不是太擅長的批處理,且量還比較大,這是非常有挑戰性的。充滿挑戰的原因在於這不是 Flink 典型的應用場景,很多配置都做不到開箱即用,雖說號稱支援批處理,但相關配置預設都是關閉的。這就需要調優,不過很文件裡大多會寫如果遇到某類問題就去調大某類值,至於調大多少完全靠經驗。
儘管如此,但由於當前 Flink 主推的也是流批一體化開發,我們對 Flink 後續的發展還是比較有信心的。前面也講了 Flink1.1 版本中,datesat 批處理的 API 和 stream 的 API 還是分開的,而在最新版本 1.12 中已經開始融合在一起了,並且 datesat 部分已經不建議使用了。我們相信沿著這個方向發展,跟上社群的節奏,未來可期。
#### 推薦閱讀
[有贊統一接入層架構演進](https://www.upyun.com/tech/article/600/%E6%9C%89%E8%B5%9E%E7%BB%9F%E4%B8%80%E6%8E%A5%E5%85%A5%E5%B1%82%E6%9E%B6%E6%9E%84%E6%BC%94%E8%BF%9B.html)
[微服務架構下 CI/CD 如何落地](https://www.upyun.com/tech/article/602/%E5%BE%AE%E6%9C%8D%E5%8A%A1%E6%9E%B6%E6%9E%84%E4%B8%8B%20CI%2FCD%20%E5%A6%82%E4%BD%95%E8%90%BD%E5%9C%B0.html)