1. 程式人生 > >美圖離線ETL實踐

美圖離線ETL實踐

美圖收集的日誌需要通過 ETL 程式清洗、規整,並持久化地落地於 HDFS / Hive,便於後續的統一分析處理。

clipboard.png

什麼是 ETL?

ETL 即 Extract-Transform-Load,用來描述將資料從來源端經過抽取(extract)、轉換(transform)、載入(load)至目的端的過程。ETL 一詞較常用在資料倉庫,但其物件並不限於資料倉庫。

在美圖特有的業務環境下,ETL 需要做到以下需求:

1.大資料量、高效地清洗落地。美圖業務繁多、使用者基數大、資料量龐大,除此之外業務方希望資料採集後就能快速地查詢到資料。

2.靈活配置、滿足多種資料格式。由於不斷有新業務接入,當有新業務方資料接入時要做到靈活通用、增加一個配置資訊就可以對新業務資料進行清洗落地;同時每個業務方的資料格式各式各樣,ETL 需要相容多種通用資料格式,以滿足不同業務的需求(如 json、avro、DelimiterText 等)。

3.約束、規範。需要滿足資料庫倉庫規範,資料按不同層(STG 層、ODS 層等)、不同庫(default.db、meipai.db 等)、不同分割槽(必須指定時間分割槽)落地。

4.容錯性。考慮業務日誌採集可能存在一定的髒資料,需要在達到特定的閾值時進行告警;並且可能出現 Hadoop 叢集故障、Kafka 故障等各種狀況,因此需要支援資料重跑恢復。

ETL 有兩種形式:實時流 ETL 和 離線 ETL。

如圖 2 所示,實時流 ETL 通常有兩種形式:一種是通過 Flume 採集服務端日誌,再通過 HDFS 直接落地;另一種是先把資料採集到 Kafka,再通過 Storm 或 Spark streaming 落地 HDFS,實時流 ETL 在出現故障的時候很難進行回放恢復。美圖目前僅使用實時流 ETL 進行資料注入和清洗的工作。

clipboard.png

圖 2

根據 Lambda 結構,如果實時流 ETL 出現故障需要離線 ETL 進行修補。離線 ETL 是從 Kafka拉取訊息,經過 ETL 再從 HDFS 落地。為了提高實時性及減輕資料壓力,離線 ETL 是每小時 05 分排程,清洗上一個小時的資料。為了減輕 HDFS NameNode 的壓力、減少小檔案,日期分割槽下同個 topic&partition 的資料是 append 追加到同一個日誌檔案。

離線ETL的架構設計及實現原理

離線 ETL 採用 MapReduce 框架處理清洗不同業務的資料,主要是採用了分而治之的思想,能夠水平擴充套件資料清洗的能力;

clipboard.png

圖 3:離線 ETL 架構

如圖 3 所示,離線 ETL 分為三個模組:

Input(InputFormat):主要對資料來源(Kafka 資料)進行解析分片,按照一定策略分配到不同的 Map 程序處理;建立 RecordReader,用於對分片資料讀取解析,生成 key-value 傳送給下游處理。

Map(Mapper):對 key-value 資料進行加工處理。

Output (OutputFormat):建立 RecordWriter 將處理過的 key-value 資料按照庫、表、分割槽落地;最後在 commit 階段檢測訊息處理的完整性。

離線 ETL 工作流程

clipboard.png

圖 4

如圖 4 所示是離線 ETL 的基本工作流程:

1.kafka-etl 將業務資料清洗過程中的公共配置資訊抽象成一個 etl schema ,代表各個業務不同的資料;

2.在 kafka-etl 啟動時會從 zookeeper 拉取本次要處理的業務資料 topic&schema 資訊;

3.kafka-etl 將每個業務資料按 topic、partition 獲取的本次要消費的 offset 資料(beginOffset、endOffset),並持久化 mysql;

4.kafka-etl 將本次需要處理的 topic&partition 的 offset 資訊抽象成 kafkaEvent,然後將這些 kafkaEvent 按照一定策略分片,即每個 mapper 處理一部分 kafkaEvent;

5.RecordReader 會消費這些 offset 資訊,解析 decode 成一個個 key-value 資料,傳給下游清洗處理;

6.清洗後的 key-value 統一通過 RecordWriter 資料落地 HDFS。

離線 ETL 的模組實現

資料分片(Split)

我們從 kafka 獲取當前 topic&partition 最大的 offset 以及上次消費的截止 offset ,組成本次要消費的[beginOffset、endOffset]kafkaEvent,kafkaEvent 會打散到各個 Mapper 進行處理,最終這些 offset 資訊持久化到 mysql 表中。

clipboard.png

圖 5

那麼如何保證資料不傾斜呢?首先通過配置自定義mapper個數,並建立對應個數的ETLSplit。由於kafkaEevent包含了單個topic&partition之前消費的Offset以及將要消費的最大Offset,即可獲得每個 kafkaEvent 需要消費的訊息總量。最後遍歷所有的 kafkaEevent,將當前 kafkaEevent 加入當前最小的 ETLSplit(通過比較需要消費的資料量總和,即可得出),通過這樣生成的 ETLSplit 能儘量保證資料均衡。

資料解析清洗(Read)

clipboard.png

圖 6

如圖 6 所示,首先每個分片會有對應的 RecordReader 去解析,RecordReade 內包含多個 KafkaConsumerReader ,就是對每個 KafkaEevent 進行消費。每個 KafkaEevent 會對應一個 KafkaConsumer,拉取了位元組資料訊息之後需要對此進行 decode 反序列化,此時就涉及到 MessageDecoder 的結構。MessageDecoder 目前支援三種格式:

clipboard.png

MessageDecoder 接收到 Kafka 的 key 和 value 時會對它們進行反序列化,最後生成 ETLKey 和 ETLValue。同時 MessageDecoder 內包含了 Injector,它主要做了如下事情:

注入 Aid:針對 arachnia agent 採集的日誌資料,解析 KafkaKey 注入日誌唯一標識 Aid;

注入 GeoIP 資訊:根據 GeoIP 解析 ip 資訊注入地理資訊(如 country_id、province_id、city_id);

注入 SdkDeviceInfo: 本身實時流 ETL 會做注入 gid、is_app_new 等資訊,但是離線 ETL檢測這些資訊是否完整,做進一步保障。

過程中還有涉及到 DebugFilter,它將 SDK 除錯裝置的日誌過濾,不落地到 HDFS。

多檔案落地(Write)

由於 MapReduce 本身的 RecordWriter 不支援單個落地多個檔案,需要進行特殊處理,並且 HDFS 檔案是不支援多個程序(執行緒)writer、append,於是我們將 KafkaKey+ 業務分割槽+ 時間分割槽 + Kafka partition 定義一個唯一的檔案,每個檔案都是會到帶上 kafka partition 資訊。同時對每個檔案建立一個 RecordWriter。

clipboard.png

圖 7

如圖 7 所示,每個 RecordWriter 包含多個 Writer ,每個 Writer 對應一個檔案,這樣可以避免同一個檔案多執行緒讀寫。目前是通過 guava cache 維護 writer 的數量,如果 writer 太多或者太長時間沒有寫訪問就會觸發 close 動作,待下批有對應目錄的 kafka 訊息在建立 writer 進行 append 操作。這樣我們可以做到在同一個 map 內對多個檔案進行寫入追加。

檢測資料消費完整性 (Commit)

clipboard.png

圖 8

MapReduce Counter 為提供我們一個視窗,觀察統計 MapReduce job 執行期的各種細節資料。並且它自帶了許多預設 Counter,可以檢測資料是否完整消費:

reader_records: 解析成功的訊息條數;

decode_records_error: 解析失敗的訊息條數;

writer_records: 寫入成功的訊息條數;

...

最後通過本次要消費 topic offset 數量、reader_records 以及 writer_records數量是否一致,來確認訊息消費是否完整。

*允許一定比例的髒資料,若超出限度會生成簡訊告警

ETL系統核心特徵

資料補跑及其優化

ETL 是如何實現資料補跑以及優化的呢?首先了解一下需要重跑的場景:clipboard.png

*當用戶呼叫 application kill 時會經歷三個階段:1) kill SIGTERM(-15) pid;2) Sleep for 250ms;3)kill SIGKILL(-9) pid 。

那麼有哪些重跑的方式呢?

clipboard.png

如圖 9 所示是第三種重跑方式的整體流程,ETL是按照小時排程的,首先將資料按小時寫到臨時目錄中,如果消費失敗會告警通知並重跑消費當前小時。如果落地成功則合併到倉庫目錄的目標檔案,合併失敗同樣會告警通知並人工重跑,將小檔案合併成目標檔案。

clipboard.png

圖 9

優化後的重跑情況分析如下表所示:

clipboard.png

自動水平擴充套件

現在離線 Kafka-ETL 是每小時 05 分排程,每次排程的 ETL 都會獲取每個 topic&partition 當前最新、最大的 latest offset,同時與上個小時消費的截止 offset 組合成本地要消費的 kafkaEvent。由於每次獲取的 latest offset 是不可控的,有些情況下某些 topic&partition 的訊息 offset 增長非常快,同時 kafka topic 的 partition 數量來不及調整,導致 ETL 消費處理延遲,影響下游的業務處理流程:

  • 由於擴容、故障等原因需要補採集漏採集的資料或者歷史資料,這種情況下 topic&&partition 的訊息 offset 增長非常快,僅僅依賴 kafka topic partiton 擴容是不靠譜的,補採集完後面還得刪除擴容的 partition;
  • 週末高峰、節假日、6.18、雙十一等使用者流量高峰期,收集的使用者行為資料會比平時翻幾倍、幾十倍,但是同樣遇到來不及擴容 topic partition 個數、擴容後需要縮容的情況;

Kafka ETL 是否能自動水平擴充套件不強依賴於 kafka topic partition 的個數。如果某個 topic kafkaEvent 需要處理的資料過大,評估在合理時間範圍單個 mapper 能消費的最大的條數,再將 kafkaEvent 水平拆分成多個子 kafkaEvent,並分配到各個 mapper 中處理,這樣就避免單個 mapper 單次需要處理過大 kafkaEvent 而導致延遲,提高水平擴充套件能力。拆分的邏輯如圖 10 所示:

clipboard.png

圖 10

後續我們將針對以下兩點進行自動水平擴充套件的優化:

  • 如果單個 mapper 處理的總訊息資料比較大,將考慮擴容 mapper 個數並生成分片 split 進行負載均衡。
  • 每種格式的訊息處理速度不一樣,分配時可能出現一些 mapper 負擔比較重,將給每個格式配置一定的權重,根據訊息條數、權重等結合一起分配 kafkaEvent。