1. 程式人生 > >DataPipeline在大資料平臺的資料流實踐

DataPipeline在大資料平臺的資料流實踐

文 | 呂鵬 DataPipeline架構師

進入大資料時代,實時作業有著越來越重要的地位。本文將從以下幾個部分進行講解DataPipeline在大資料平臺的實時資料流實踐。


一、企業級資料面臨的主要問題和挑戰


1.資料量不斷攀升

隨著網際網路+的蓬勃發展和使用者規模的急劇擴張,企業資料量也在飛速增長,資料的量以GB為單位,逐漸的開始以TB/GB/PB/EB,甚至ZB/YB等。同時大資料也在不斷深入到金融、零售、製造等行業,發揮著越來越大的作用。

2. 資料質量的要求不斷地提升

當前比較流行的AI、資料建模,對資料質量要求高。尤其在金融領域,對於資料質量的要求是非常高的。

3. 資料平臺架構的複雜化

企業級應用架構的變化隨著企業規模而變。規模小的企業,使用者少、資料量也小,可能只需一個MySQL就搞能搞;中型企業,隨著業務量的上升,這時候可能需要讓主庫做OLTP,備庫做OLAP;當企業進入規模化,資料量非常大,原有的OLTP可能已經不能滿足了,這時候我們會做一些策略,來保證OLTP和OLAP隔離,業務系統和BI系統分開互不影響,但做了隔離後同時帶來了一個新的困難,資料流的實時同步的需求,這時企業就需要一個可擴充套件、可靠的流式傳輸工具。


二、大資料平臺上的實踐案例


下圖是一個典型的BI平臺設計場景,以MySQL為例,DataPipeline是如何實現MySQL的SourceConnector。MySQL作為Source端時:

  • 全量+ 增量;

  • 全量:通過select 方式,將資料載入到kafka中;

  • 增量:實時讀取 binlog的方式;

使用binlog時需要注意開啟row 模式並且image設定為 full。

1. MySQL SourceConnector 全量+增量實時同步的實現

下面是具體的實現流程圖,首先開啟repeatable read事務,保證在執行讀鎖之前的資料可以確實的讀到。然後進行flush table with read lock 操作,新增一個讀鎖,防止這個時候有新的資料進入影響資料的讀取,這時開始一個truncation with snapshot,我們可以記錄當前binlog的offset 並標記一個snapshot start,這時的offset 為增量讀取時開始的offset。當事務開始後可以進行全量資料的讀取。record marker這時會將生成record 寫到 kafka 中,然後commit 這個事務。當全量資料push完畢後我們解除讀鎖並且標記snapshot stop,此時全量資料已經都進入kafka了,之後從之前記錄的offset開始增量資料的同步。

2. DataPipeline做了哪些優化工作

1)以往在資料同步環節都分為全量同步和增量同步,全量同步為一個批處理。在批處理時我們都是進行all or nothing的處理,但當大資料情況下一個批量會佔用相當長的時間,時間越長可靠性就越難保障,所以往往會出現斷掉的情況,這時一個重新處理會讓很多人崩潰。DataPipeline 解決了這一痛點,通過管理資料傳輸時的position 來做到斷點續傳,這時當一個大規模的資料任務即使發生了意外,也可以重斷掉的點來繼續之前的任務,大大縮短了同步的時間,提高了同步的效率。

2)在同步多個任務的時候,很難平衡資料傳輸對源端的壓力和目的端的實時性,在大資料量下的傳輸尤其能夠體現,這時DataPipeline 在此做了大量相關測試來優化不同的連線池,開放資料傳輸效率的自定義化,供客戶針對自己的業務系統定製合適的傳輸任務,對於不同種類的資料庫的傳輸進行優化和調整,保證資料傳輸的高效性。

3)自定義異構資料型別的轉化,往往開源類大資料傳輸工具如 sqoop 等,對異構資料型別的支援不夠靈活,種類也不夠齊全。像金融領域中對資料精度要求較高的場景,在傳統資料庫向大資料平臺傳輸時造成的精度丟失是很大的一個問題。DataPipeline 對此做了更多資料型別的支援,比如hive 支援的複雜型別以及 decimal 和 timestamp 等。

3. Sink端之Hive

1)Hive的特性

  • Hive 內部表和外部表;

  • 依賴HDFS;

  • 支援事務和非事務;

  • 多種壓縮格式;

  • 分割槽分桶。

2)Hive同步的問題

  • 如何保證實時的寫入?

  • schema change了怎麼辦?

  • 怎麼擴充套件我想儲存的格式?

  • 怎麼實現多種分割槽方式?

  • 同步中斷了怎麼辦?

  • 如何保證我的資料不丟?

3)KafkaConnect HDFS 的 Hive 同步實踐

  • 使用外表:Hive外部表,能夠提高寫入效率,直接寫HDFS,減少IO消耗,而內表會比外表多一次IO;

  • Schema change:目前的做法是目的端根據源端的變化而變化,當有增加列刪除列的情況,目標端會跟隨源端改動;

  • 目前支援的儲存格式:parquet,avro ,csv

  • 外掛化的partitioner,提供多種分割槽方式,如 Wallclock RecordRecordField:wallclock是使用寫入到hive端時的系統時間,Record 使用是讀取時生成record的時間,RecordField是使用使用者自定義的時間戳來定義分割槽,未來會實現可自定義化的partitioner 來滿足不同的需求;

  • Recover 機制保障中斷後不會丟失資料;

  • 使用WAL (Write-AheadLogging)機制,保證資料目的端資料一致性。

4)Recover的機制

recover 是一種恢復的機制,在資料傳輸的階段往往可能出現各種不同的問題,如網路問題等等。當出現問題後我們需要恢復資料同步,那麼recover是怎麼保證資料正常傳輸不丟失呢?當recover開始的時候,獲取目標檔案在hdfs 上的租約,如果這時候需要讀寫的HDFS當前檔案是被佔用的,那我們需要等待它直到可以獲取到租約。當我們獲取到租約後就可以開始讀之前寫入時候的log,如果第一次會建立一個新的log,並標記一個begin,然後記錄了當時的kafka offset。這時候需要清理之前遺留下來的臨時資料,清理掉之後再重新開始同步直到同步結束會標記一個end。如果沒有結束的話就相當於正在進行中,正在進行中每次都會提交當前同步的offset,來保證出現意外後會回滾到之前offset。

5)WAL (Write-Ahead Logging)機制

Write-Ahead Logging機制其實就是核心思想在資料寫入到資料庫之前,它先寫臨時檔案,當一個批次結束後,在將這個臨時檔案改名為正式檔案,確保每次提交後的正式檔案一致性,如果中途出現寫入錯誤將臨時檔案刪除重新寫入,相當於一個回滾。hive 的同步主要利用這種實現方式來保證一致性。首先它同步資料寫入到HDFS臨時檔案上,確保一個批次的資料正常後再重新命名到正式檔案當中。正式的檔名會包含kafka offset,例如一個avro 檔案的檔名為 xxxx+001+0020.avro ,這表示當前檔案中有offset 1 到 20 的20條資料。

4. Sink端之GreenPlum

GreenPlum,是一個MPP架構的資料倉庫,底層由多個postgres資料庫作為計算節點,擅長OLAP,作為BI資料倉庫有著良好的效能。

1)DataPipeline對GreenPlum 同步實踐以及優化策略

  • greenplum 支援多種資料載入方式,目前我們使用copy的載入方式。

  • 批量處理提高sink端寫入效率,不進行insert 和 update 的操作,一律使用 delete + copy 的方式批量載入;

  • 多執行緒加預載入機制:

➢ 每個需要同步的表單獨記錄一個offset,當整個任務失敗時可以分開進行恢復;

➢ 使用一個執行緒池管理載入資料的執行緒,每個同步的表單獨一個執行緒來進行載入資料,多表同時同步;

➢ 在載入資料的時間裡,提前對kafka進行消費,快取處理好的一個數據集,當一個執行緒載入資料結束後馬上開始新的執行緒載入資料,減少處理載入資料的時間;

  • delete + copy的方式可以保證資料最終一致性;

  • source 端有主鍵的表可以通過主鍵來合併一個批次需要同步的資料,如一個需要同步的批量資料中包含一條 insert 的資料,後面跟著 update 該條資料,那就無需同步兩遍,將該資料更新到 update 之後的狀態 copy 到 gp 當中即可。

同步GreenPlum需要注意:因為是通過copy 寫入檔案的,需要檔案是結構化資料,典型的是使用CSV,CSV 寫入時需注意spiltquote,escapequote,避免出現數據錯位的現象。update主鍵的問題 , 當源端是update一個主鍵時,同時需要記錄update前的主鍵,並在目標端進行刪除。還有 \0 特殊字元的問題,因為核心是用C語言,所以在同步的時候\0需要特殊處理掉。


三、DataPipeline未來的工作


1. 目前我們碰到kafka connect rebalance的一些問題,所以我們對其進行了改造。以往的rebalance機制是假如我們增加或者刪除一個task,會導致整個叢集rebalance,這樣造成很多無謂的開銷而且頻繁的rebalance 不利於資料同步的任務的穩定。於是我們將rebalance機制改造成一個黏性的機制:

  • 當我們增加一個新的任務的時候,我們會檢查所有的worker使用率比較低的,當worker的task比較少,我們只把它加進比較少的worker就可以了,也不需要做全量的平衡,當然這時候可能還是有一些不平衡的資源浪費,這是我們可以容忍的,至少比我們做一次全量的rebalance開銷要小;

  • 假如刪除一個task,以往的機制是刪除一個task的時候也會做全量的Rebalance,新的機制不會觸發rebalance。這時候如果時間長也會造成一個資源不平衡,這是我們可以自動化rebalance一下所有的叢集;

  • 假如說叢集的某個節點宕掉了,該節點的task怎麼辦呢?我們不會馬上就把這個節點上的 task分配出去,會先等待10分鐘,因為有的時候它可能只是短暫的連線超時,過一段時間後就會恢復,如果根據這個來做一次rebalance,可能這是不太值的。當等待10分鐘後節點還是沒有恢復,我們再做rebalance,將宕掉的節點任務分配到其他節點上;

2. 源端的資料一致性,目前通過WAL的機制可以保證目的端的一致性;

3. 大資料量下的同步優化以及提高同步的穩定性。


四、總結


1. 大資料時代企業資料整合主要面臨各種複雜的架構,應對這些複雜的系統對ETL的要求也越來越高。我們能做的就是需要權衡利弊選取一個符合業務需求的框架;

2. Kafka Connect 比較適合對資料量大,且有實時性需求的業務;

3. 基於Kafka Connect 優良特性可以依據不同的資料倉庫特性來提高資料時效性和同步效率;

4. DataPipeline針對目前企業在大規模實時資料流的痛點,進行了相關的改造和優化,首先資料端到端一致性的保證是幾乎所有企業在資料同步過程中碰到的,目前已經做到基於kafka connect 的框架中 rebalance 中的優化和改造。

—end—