魅族大資料之流平臺設計部署實踐
導讀:魅族大資料的流平臺系統擁有自設計的採集SDK,自設計支援多種資料來源採集的Agent元件,還結合了Flume、Spark、Metaq、Storm、Kafka、Hadoop等技術元件,本文就魅族流平臺對大量資料的採集、實時計算、系統分析方法,全球多機房資料採集等問題進行介紹。
流平臺是魅族大資料平臺的重要部分,包括資料採集、資料處理、資料儲存、資料計算等模組,流平臺為大資料提供了強大的支撐能力。
文章還介紹了魅族大資料流平臺的架構、設計方式、常用元件、核心技術框架等方面的內容,還原魅族大資料平臺的搭建過程及遇到的問題。
一、魅族大資料平臺架構
如圖所示便是魅族的大資料平臺架構。
- 左邊是多樣性的資料來源接入;
- 右上是離線資料的採集;
- 下面是流平臺(也是今天分享的主角);
- 中間是叢集的部署;
- 右邊是ETL的資料探勘、演算法庫和一些資料模型;
- 左上角是資料開發平臺,比如webIDE可以使得開發人員更便捷地做一些資料查詢和管理;
- 最右邊的是一個數據產品門戶,包括我們的使用者畫像、統計系統等,這裡麵包含大資料的很多元件,比如資料採集、資料處理、資料儲存、資料探勘等,最後產生大資料的雛形。
二、流平臺介紹
流平臺是大資料平臺一個比較重要的部分,主要包括四個部分:資料採集、資料處理、資料儲存、計算能力。
資料採集
“誰擁有了整個世界的資料,他就是最大的贏家”,這句話雖然有點誇張,但是卻表達了資料採集的重要性。一個大資料平臺數據的多樣性、資料量的級別很大程度上決定了大資料的能力和豐富程度。
資料處理
這裡講的資料處理並不是像末端那麼專業的資料清洗,更多的是為後續入庫做一些簡單處理,以及實時計算。
- 資料儲存
計算能力,包括離線計算和實時計算
流平臺為大資料提供非常強大的支撐,資料統計分析、資料探勘、神經網路的圖形計算等都可以依靠計算能力進行。
實時計算是指在一定單位的時間延遲範圍內,基於增量的資料推算出結果,再結合歷史資料得到期望的分析結果。這個時間是根據業務需求而定。
1、流平臺架構
上圖是我們的流平臺架構圖
- 左邊是資料來源,像NoSQL、RDB、檔案型別;
- 最右邊是叢集,下面還有其他的一些Hadoop(儲存);
- 中間的框是核心,也就是流平臺;
- 最上面的是AS-Manager(我們的流管理平臺),承載了非常多的管理功能;
- 下面是Zookeeper,這是一個非常流行的整合管理中心,魅族的一些架構都會用到它,流平臺也不例外,Zookeeper可以說貫穿了我們整個流平臺的架構;
- 最下面是AS-Protocol,我們自己設計的流平臺的資料物件協議,打通了整個流平臺的資料鏈路;
- 中間四個框是核心的四個模組:採集模組、資料中轉模組、快取模組、實時計算模組,也叫合併層。
2、具體架構介紹
這是我們的具體架構圖。
業務規模:從這邊採集資料到經過流平臺最後經過實時計算或入庫,它的資料量量級在千億級別。
3、元件
- 資料來源渠道
前面提到採集資料來源渠道的多樣性決定了大資料平臺的相應能力和綜合程度。我們這邊首先會有一個檔案類的業務資料,包括業務日誌、業務資料、資料庫檔案,這些都會經過採集服務採集。
下面這一塊包括一些網站的js訪問、手機各APP埋點、特點的應用日誌檔案(它會通過手機端的一些埋點上訪到我們的埋點服務)。
- 資料採集
資料採集分為兩個部分:採集服務、獨立部署的埋點服務。圖中只顯示了一個埋點服務,裡面還會有很多的第三方業務,第三方業務通過這個紅色的外掛接入我們的採集。
- 資料中轉
通過採集模組把資料流轉到中轉模組,中轉模組採用的是目前比較流行的flume元件,紅色sink是我們自己開發的。
- Cache
sink把前面的資料轉給快取層,快取層裡有metaq和Kafka。
- Streaming
實時計算模組上線了Spark和Storm,較早上線的是Spark,目前兩個都在用的原因是它會適應不同的業務場景。
- Store
最後面是我們提供給落地的store層,像HIVE、Hbase等等。
- 流管理平臺
最下面是流管理平臺,圖中有四條線連著四個核心模組,對這四個模組進行非常重要且非常豐富的邏輯管理,包括資料管理、對各節點的監控、治理、實時命令的下發等。
三、流平臺設計
1、概念解讀
Message,就是一條訊息,是最小的資料單位。業務方給的一條資料就是一個message;我們去採集檔案的話,一行資料就是一個message。
AS-Protocol,是我們自己設計的流平臺數據的物件,它會對一批量的message進行打包,然後再加上一些必要的變數做一個封裝。
Evnet,會提供一個類似的標準介面,這個地方其實更多的是為了打通採集的流平臺。它最重要的一個變數是Topic,就是說我拿到了我的AS-Protocol就可以根據對應的Topic發到相應的登入去快取提取,因為我們的AS-Protocol除了起始端和結束端以外,中間層是不用解析協議的。
Type,資料格式目前是Json和Hive格式,可以根據業務去擴充套件。
Compress,Hive格式在空間上也是非常有優勢的,非常適合於網路傳輸壓縮。當壓縮資料來源質量沒有達到一定量的程度的時候會越壓越大,所以我們要判斷是否需要壓縮。我們壓縮採用的是一個全系統
Data_timestamp,資料的時間是最上面的message,每一個message會攜帶一個數據時間.這個比較好理解,就是入庫之後會用做資料統計和分析的。
Send_timestamp,傳送時間會攜帶在我們的AS-Protocol裡,它聲明瞭每一個數據包傳送的時間。
Unique Key,每一個數據包都有一個唯一的標識,這個也是非常重要的,它會跟著AS-Protocol和Event走通整個平臺的資料鏈路,在做資料定位、問題定位的時候非常有用,可以明確查到每個資料包在哪個鏈路經歷了什麼事情。
Topic。這個不需多言。
Data_Group,資料分組是我們非常核心的一個設計思想,原則上我們是一個業務對應一個數據分組。
Protobuf序列化,我們會對Event資料做一個PT序列化,然後再往上面傳,這是為了節省資料流量。
2、協議設計
如圖所示為Event、As-Protocol和Message的關係。
最上層是Event,裡面有一個Unique Key和Topic包括了我們的As-Protocol,然後是資料格式、發動時間是否壓縮、用什麼方式壓縮,還攜帶一些額外的變數。最後面是一個Body,Body其實就是一個message的宿主,以位元組流的方式儲存。這個就是我們一個數據物件的協議設計。
接下來看資料在整個架構裡是如何流轉和傳輸的。
首先是資料來源渠道,最左邊的是message,任何業務方的資料過來都是一條message,經過資料採集把一批message打包封裝成Event,再發給資料中轉模組,也叫flume。把Event拆出來,有一個topic,最後把As-protocol放到相應位置快取,消費對應的Topic,拿到對應的As-Protocol,並把這個資料包解析出來,得到一條一條的message,這時就可以進行處理、入庫或實時計算。
需要特別注意的是message和Event。每個Message的業務量級是不一樣的,有幾十B、幾百B、幾千B的差別,打包成As-Protocol的時候要試試批量的數目有多少,原則上壓縮後的資料有個建議值,這個建議值視業務而定,DataGroup打包的數量是可以配的。
3、資料分組設計
如圖所示是我們的DataGroup設計。首先看最上面,一個Topic可以定義N個DataGroup。往下是Topic和streaming Job一比一的關係,就是說一個實時的Group只需要對應一個Topic,如果兩個業務不相關就對應的兩個Topic,用兩個Job去處理,最後得到想要的關係。
從架構圖可以看到DataGroup的扭轉關係。最初資料採集每一個節點會宣告它是屬於哪一個DataGroup,上傳資料會處於這個DataGroup,經過資料中轉發給我們的分散式快取也對應了Topic下面不同的分組資料。最後Streaming交給我Topic,我可以帥選出在最上面的關係,去配置DataGroup,可以非常靈活地組合。這就是DataGroup的設計思想。
四、採集元件Agent
1、概述
如圖所示,這是完全由我們自己設計和實現的一款元件。右邊是採集元件,分為兩部分:一個是基於java環境的獨立工作程式;另一個是jar外掛。外掛叫Agen-Stub.jar;獨立層是Agent-File.zip,Agent-File有一個paresr支援不同的檔案型別,目前支援的file和Binlog,可擴充套件。根據需要可以增加parser,也是接入Agent-stub,擁有Agent-stub的一些特性。
如上圖右側的示意圖,Agent-stub接入多個Business,前面提到的一個埋點服務就是一個Business,它把資料交給Agent-stub,Agent-stub會往後發展,與file和mysQL相對應的是file parser,出來是Agent-stub,流程是一樣的。
2、Agent-Stub.jar
接下來看Agent-Stub是如何設計的。
多執行緒、非同步。這個毫無疑問,做外掛化肯定是這樣考慮的,不能阻塞上層業務。
記憶體小佇列+磁碟壓縮佇列。這是我們改進最大的一個地方,早期版本中我們採用的是記憶體大佇列,如果只有記憶體大佇列缺點非常明顯:
程式正常啟動的時候大佇列裡的資料怎麼辦?要等他發完嗎?還是不發完?當大佇列塞滿的時候,還有對上層業務的侵入性怎麼辦?程式遇到問題時怎麼辦?大佇列可能是50萬、100萬甚至更多。
採用了記憶體小佇列+磁碟壓縮佇列後可以解決正常程式的啟停,保證資料沒有問題,還可以解決空間的佔用清空性的問題,以此同時,磁碟壓縮佇列還可以在程式出錯的時候加速傳送。
解釋一下磁碟壓縮佇列, 這次我們設計協議的思想很簡單:壓縮之後得到一個位元組速度,存在磁碟的檔案裡,這個檔案按照小時儲存,這時對於二次傳送帶來的損耗並不大,不需要重新阻斷資料也不需要解析和壓縮,只需要讀出來發出去。後面還有一個提升就是磁碟傳送佇列跟記憶體傳送佇列是單獨分開的,這樣更能提升二次資料的傳送效能。
無損啟停。正常的啟動和停止,資料是不會停止不會丟失的。
Agent的版本號自動上報平臺。這個非常重要,我們早期的版本是沒有的,可以想象一下當你的Agent節點是幾千上萬,如果沒有一個平臺直觀地管理,那將是一個怎樣恐怖的局面。現在我們每一個Agent啟動的時候都會建立一個node path,把版本號放到path裡,在管理平臺解析這個path,然後做分類,我們的版本就是這樣上報的。
自動識別接入源,智慧歸類。這個其實和上面那點是一樣的,在早期版本中我們做一個Agent的標識,其實就是一個IP+一個POD,就是說你有幾千個IP+POD量表需要人工管理,工作量非常大且乏味。我們優化了一個自動識別,把DataGroup放到Agent的node path裡,管理平臺可以做到自動識別。
Agent的全面實時監控。包括記憶體佇列數、磁碟佇列數、執行狀態、出錯狀態、qps等,都可以Agent上報,並且在管理平臺直觀地看到哪一個節點是什麼樣子的。其做法也依賴於zookeeper的實現和承載,這裡其實就是對zk node的應用,我們有一個定時執行緒收集當前Agent必要的資料,然後傳到node的data上去,管理平臺會獲取這些date,最後做一個平臺化的展示。
支援實時命令。包括括限流,恢復限流、停止、調整心跳值等,大大提高了運維能力。其實現原理也是依賴於Agent,這裡我們建立一個Data Group,通過管理平臺操作之後把資料放到Data Group裡,然後會有一個監聽者去監聽獲取資料的變化並作出相應的邏輯。
相容Docker。目前魅族在用Doker,Doker對我們這邊的Agent來講是一個挑戰,它的啟動和停止是非常態化的,就是你可能認為相同的Docker容器不會重啟第二次。
3、Agent-File.zip
接入Agent-Stub。 Agent-file首先是接入Agent-stub,擁有Agent-stub的一些特性。
相容Docker。因為啟動和停止的常態,假設我們剛剛一個業務接入了Agent-stub,那停止的時候它會通知我,Agent-stub會把小佇列裡的資料抓到磁碟壓縮佇列裡去。但是這裡需要注意的是:磁碟壓縮佇列不能放到Docker自己的檔案系統裡,不然它停了之後資料就沒有人能夠得到了。
當Agent-stub停的時候,會有一個標識說磁碟要做佇列,我們的資料有沒有發完,磁碟壓縮佇列裡有一個評級的標識檔案,這時要用到Agent-file,Agent-file有一個單獨的掃描執行緒一個個地去掃描Docker目錄,掃到這個檔案的時候判斷其資料有沒有發完,如果沒發完就只能當做一個傳送者。
支援重發歷史資料。做大資料的可能都知道這些名詞,比如昨天的資料已經採集完了,但由於某些原因有可能資料有遺漏,需要再跑一次後端的補貼邏輯,或者上馬訓練,這時就要做資料重發。我們在管理平臺上就會有一個支援這種特定檔案或特定時間段的選擇,Agent接收到這個命令的時候會把相應的資料發上去,當然前提是資料不要被清了。
管理平臺自助升級。這個可以理解成軟體升級,Agent可以說是非常常見的元件,但是我們重新設計時把自動升級考慮在內,這也是我們為什麼設計自己做而不是用開源的元件。這樣做帶來的好處是非常大的,我們幾千個Agent在平臺裡只需要一鍵就可以完成自動升級。
檔名正則表示式匹配。檔名的掃描是用自動錶達式。
源目錄定時掃描 and Jnotify。重點介紹檔案掃描機制。早期的版本是基於Agent-fire和KO-F兩者結合做的資料採集:Agent-file是加碼裡對檔案變更的事件鑑定,包括重新命名、刪除、建立都有一個事件產生;KO-F是拿到檔案下的最佳資料。假設源目錄裡有一千個檔案,KO-F現場就是一千個,Agent-file對應的檔案變革賦予的追加、重新命名等都可能會產生一系列事件,邏輯複雜。
所以我們設計了源目錄定時掃描的機制,首先有一個目標,就是我們的檔案佇列,包括為未讀檔案、已讀檔案做區別,區別之後掃描,當然還會有像檔案摘要等的存在這裡不細講,掃描之後更新未讀檔案、已讀檔案列表。
之所以加Jnotify是因為我們發現只用定製掃描不能解決所有業務場景的問題,jootify在這裡起到補充定製掃描的作用,解決檔案風險和檔案產程的問題。
單檔案讀取。早期版本中這一點依賴於檔案列表,當檔案非常多時程式變得非常不穩定,因為可能要開幾百個或幾千個執行緒。後來我們改成了單檔案的讀取,上文提到的掃描機制會產生一個檔案佇列,然後從檔案佇列裡讀取,這樣一個個檔案、一段段圖,程式就非常穩定了。
檔案方式儲存offset,無損啟停。早期採用切入式PTE做儲存,銜接非常重,後來我們改成檔案方式儲存,設計非常簡單就只有兩個檔案:一個是目錄下面所有檔案的offset;一個是正在讀的檔案的offset。這裡涉及到無損啟停和策略的問題,我們定了一個5次演算法:就是每讀了5次就會刷盤一次,但只刷在讀檔案,別的檔案不會變化,所以可以想象得到,當這個程式被替換走的時候,最多也就是重複5條資料,大會導致資料丟失。
4、Agent示意圖
如圖是Agent示意圖。上面是Agent-file和資料物件。Agent啟動的時候要把裡面的offset檔案取來,就會產生未讀檔案和已讀檔案列表,掃描檔案目錄,然後更新檔案佇列,還有一個fileJNotify是相對應的檔案佇列。然後有一個比較重要的fileReader,我會先從檔案佇列裡拿到再去讀實際檔案,讀完刷盤之後這一塊就成功了,我會根據我的刷盤去重新整理offset。
上圖左邊有一個業務加了一個Agent-stub,最後變成flume,這裡有一個QueueReceiver(佇列接收者),filereader和業務方的DataSender會把message發過來,QueueReceiver接受的資料就是一條條的message,然後傳送到記憶體小佇列裡,當這邊的小佇列滿了怎麼辦呢?中間有一個額外的固定大小的效能提升的地方用於message歸類,當這個fIieReader往這個記憶體小佇列發的時候發現塞不進去了,就會在規定大小的佇列裡發,當一個固定大小的佇列滿了之後就會打包壓縮,以位元組處理的方式存到磁碟壓縮佇列。
再來說說我們為什麼會提出二次資料的傳送,其實就是多了一個countsender即壓縮佇列的傳送者,直接的資料來源是磁碟壓縮佇列,與上面的並生沒有任何衝突。Countsender的資料對賬功能是我們整個平臺的核心功能之一,基於這個統計的資料確保了其完整性,少一條資料我們都知道,在採集層有一個countsender,以另外一個渠道發出去,和真正的資料來源渠道不一樣,會更加的輕量化更加可靠,且數值非常小。
最後是前文提到的監控和命令的實現,一邊是Agentnode,一邊是資料管理。
5、Agent的坑
丟資料。如前文提到記憶體大佇列帶來的問題。
版本管理的問題。
tailf -f的問題。
網路原因導致zk刪節點問題。網路不穩定的時候,ZK會有一個節點的心跳檢測,不穩定的時候監測會以為節點已經不存在了而把節點刪掉,這會導致管理平臺的節點監控、檔案下發全部都失效。解決辦法就是在message加一層控制檢查執行緒,發現節點不在了再建立一遍。
亂碼的問題。可能會跟一些遠端訪問的軟體相關,原則上我們假設第二次啟動的時候沒有配置我們的編碼,預設與系統一致,但當遠端軟體啟動的時候可能會發生不一樣的地方,所以不要依賴於預設值,一定要在啟動程式裡設定希望的編碼。
日誌問題,在外掛化的時候肯定要考慮到業務方的日誌,我們把業務方的日誌刷死了,當網路出現問題的時候每傳送一條就失敗一條,那是不是都要打印出來?我們的考慮是第一條不列印,後面可能十條列印一次,一百條列印一次,一千條列印一次,這個量取決於業務。補充一點,我們有一個統計執行緒,可以根據統計執行緒觀察Agent的正常與否。
五、流管理平臺
如圖所示,我們的流管理平臺介面比較簡單,但功能非常豐富,包括:
- 接入業務的管理、釋出、上線;
- 對Agent節點進行實時監測、管理、命令;
- 對Flume進行監測、管理;
- 對實時計算的job的管理;
- 對全鏈路的資料流量對帳,這是我們自檢的功能;
- 智慧監控報警,我們有一個非常人性化的報警閥值的建議。取一個平均值,比如一週或一天,設定一個閥值,比如一天的流量訪問次數可能是一千次,我們設計的報警是2000次,當連續一週都是2000次的時候就得改進。
六、資料中轉
1、背景
業務發展可能從1到100再到1000,或者當公司網際網路發展到一定程度的時候業務可能遍佈世界各地,魅族的雲服務資料分為海外服務和國內服務,我們把業務拆分開來,大資料採集肯定也要跟著走,這就面臨著資料中轉的問題。
如圖所示是我們兩個案例的示意圖。黑色的是內網的線,橙色的是跨界性的線,有公網的、雲端的、專線的,各種各樣的網路情況。
上面的是Agent叢集,B-IDC也有一個Agent叢集,直接訪問我們登入的叢集。
這裡第一個問題是我們的連線非常多,訪問Agent節點的時候有幾千個Agent節點就得訪問幾千個節點,這是不太友好的事情。另一個問題是當我們做升級遷移的時候,Agent要做修改和配置,必須得重啟,當整個B-IDC遷移到A-IDC,我們加了一個Flyme叢集。同樣是一個Agent叢集,下面有一個Flume叢集,這樣的好處:一是裡面的連線非常少,線上的Flume一個ID就三臺;二是這邊承載了所有的Agent,除了Agent還有其他的採集都在A-IDC裡中轉,當這個片區要做升級的時候上面的業務是透明的,靈活性非常高。
2、Flume介紹
Flume裡有三個核心的部分:Source、Channel、Sink,Source是資料結構源;Channel相當於記憶體大佇列,Sink是輸出到不同的目標。官方提供了很多元件:Avro、HTTP、Thrift、Memory、File、Spillable Memory、Avro、Thrift、Hdfs、Hive。
3、Flume實踐
無Group,採用Zookeeper做叢集
Agent採用LB做負載均衡,動態感知。結合Zookeeper可以感知到Agent列表,這時會採用負載均衡的做法找到當前的那個Flume,到後端的Flume直接變化的時候可以感知到從而下線。
硬碟快取、無損啟停。採用memory可能會帶來些不好的問題,如果記憶體佇列改成檔案就沒有這個問題。因為記憶體速度快,儲存強制重新整理的時候就沒有資料了,所以我們做了優化:還是採用memory,在Flume停的時候把資料採集下來,下一次啟動的時候把資料發出去,這時就可以做到無損啟停,但是有一點千萬要注意:磁碟其實是固化在機器裡面,當這臺機器停下不再啟動的時候,別忘了把資料移走發出去。
停止順序優化。在做優化的時候遇到原始碼的修改,其實就是Flume停止順序的優化。原生裡好像先停止Channel,然後提高sink,這就會導致想要做這個功能的時候做不到。我們應該先把這個資料改掉再去停止sink最後停止Channel,這樣就保證Channel裡的資料可以全部固化到硬盤裡。
多種轉發方式。我們現在是全球的RBC,支援公網、內網、跨域性專線,我們提供一個非常好的功能:http sink,它也是一個安全的支援ssl的轉換方式。
自定義Sink,多執行緒傳送(channel的get只能單執行緒)。
4、停止順序
如圖是停止順序的修改。這是一個sourceRunner、sink、channel。
5、Memory的capacity
選擇記憶體之後,這個記憶體大小到底多少比較合適?如圖所示,左邊Flume是從500-1000,channel容量是5萬、10萬,還有Agent的個數、執行緒,我們發現在10萬的時候它的fullGC是非常頻繁的,所以我們最後定的大小是5萬。當然不同的機器根據不同的測試得到自己的值,這個值不是恆定的。
包大小從10K到30K到50K有什麼不一樣呢?很明顯TPS從1萬多降到了2000多,因為包越大網絡卡就越慢了,這裡看到其實已經到了200兆(雙網絡卡),把網絡卡跑滿了。我們做流平臺設計的時候,不希望鏈路被跑滿,所以我們給了個建議值,大小在5-10K。當然,線上我們採用的萬兆網絡卡。
七、實時計算
1、實時計算叢集
在SparkZK裡直接寫HA,可以減少不必要的MR提高IO,減少IO消耗。
Kafka+Strom (ZK)
2、Spark實踐
直接寫HDFS底層檔案
自動建立不存在的Hive分割槽
相應Metaq的日誌切割,這一點上現在的Kafka是沒有問題的,當時的日誌切割會導致網路連線超時,我們檢視原始碼發現確實會堵塞,我們的解決方法是把切割調成多色或分割槽調多。
不要定時的killJob。早期的Spark版本因為大批量的killJob導致一些不穩定的情況,某些job其實是沒有被完全覆蓋,假死在那裡的。
作者:沈輝煌,魅族資料架構師,2010年加入魅族,負責大資料、雲服務相關設計與研發;專注於分散式服務、分散式儲存、海量資料下rdb與nosql融合等技術。
本篇文章內容來自第八期魅族開放日魅族資料架構師沈輝煌的現場分享,由IT大咖說提供現場速錄,由msup整理編輯。