美團DB資料同步到資料倉庫的架構與實踐
背景
在資料倉庫建模中,未經任何加工處理的原始業務層資料,我們稱之為ODS(Operational Data Store)資料。在網際網路企業中,常見的ODS資料有業務日誌資料(Log)和業務DB資料(DB)兩類。對於業務DB資料來說,從MySQL等關係型資料庫的業務資料進行採集,然後匯入到Hive中,是進行資料倉庫生產的重要環節。
如何準確、高效地把MySQL資料同步到Hive中?一般常用的解決方案是批量取數並Load:直連MySQL去Select表中的資料,然後存到本地檔案作為中間儲存,最後把檔案Load到Hive表中。這種方案的優點是實現簡單,但是隨著業務的發展,缺點也逐漸暴露出來:
- 效能瓶頸:隨著業務規模的增長,Select From MySQL -> Save to Localfile -> Load to Hive這種資料流花費的時間越來越長,無法滿足下游數倉生產的時間要求。
- 直接從MySQL中Select大量資料,對MySQL的影響非常大,容易造成慢查詢,影響業務線上的正常服務。
- 由於Hive本身的語法不支援更新、刪除等SQL原語,對於MySQL中發生Update/Delete的資料無法很好地進行支援。
為了徹底解決這些問題,我們逐步轉向CDC (Change Data Capture) + Merge的技術方案,即實時Binlog採集 + 離線處理Binlog還原業務資料這樣一套解決方案。Binlog是MySQL的二進位制日誌,記錄了MySQL中發生的所有資料變更,MySQL叢集自身的主從同步就是基於Binlog做的。
本文主要從Binlog實時採集和離線處理Binlog還原業務資料兩個方面,來介紹如何實現DB資料準確、高效地進入數倉。
整體架構
整體的架構如上圖所示。在Binlog實時採集方面,我們採用了阿里巴巴的開源專案Canal,負責從MySQL實時拉取Binlog並完成適當解析。Binlog採集後會暫存到Kafka上供下游消費。整體實時採集部分如圖中紅色箭頭所示。
離線處理Binlog的部分,如圖中黑色箭頭所示,通過下面的步驟在Hive上還原一張MySQL表:
- 採用Linkedin的開源專案Camus,負責每小時把Kafka上的Binlog資料拉取到Hive上。
- 對每張ODS表,首先需要一次性製作快照(Snapshot),把MySQL裡的存量資料讀取到Hive上,這一過程底層採用直連MySQL去Select資料的方式。
- 對每張ODS表,每天基於存量資料和當天增量產生的Binlog做Merge,從而還原出業務資料。
我們回過頭來看看,背景中介紹的批量取數並Load方案遇到的各種問題,為什麼用這種方案能解決上面的問題呢?
- 首先,Binlog是流式產生的,通過對Binlog的實時採集,把部分資料處理需求由每天一次的批處理分攤到實時流上。無論從效能上還是對MySQL的訪問壓力上,都會有明顯地改善。
- 第二,Binlog本身記錄了資料變更的型別(Insert/Update/Delete),通過一些語義方面的處理,完全能夠做到精準的資料還原。
Binlog實時採集
對Binlog的實時採集包含兩個主要模組:一是CanalManager,主要負責採集任務的分配、監控報警、元資料管理以及和外部依賴系統的對接;二是真正執行採集任務的Canal和CanalClient。
當用戶提交某個DB的Binlog採集請求時,CanalManager首先會呼叫DBA平臺的相關介面,獲取這一DB所在MySQL例項的相關資訊,目的是從中選出最適合Binlog採集的機器。然後把採集例項(Canal Instance)分發到合適的Canal伺服器上,即CanalServer上。在選擇具體的CanalServer時,CanalManager會考慮負載均衡、跨機房傳輸等因素,優先選擇負載較低且同地域傳輸的機器。
CanalServer收到採集請求後,會在ZooKeeper上對收集資訊進行註冊。註冊的內容包括:
- 以Instance名稱命名的永久節點。
- 在該永久節點下注冊以自身ip:port命名的臨時節點。
這樣做的目的有兩個:
- 高可用:CanalManager對Instance進行分發時,會選擇兩臺CanalServer,一臺是Running節點,另一臺作為Standby節點。Standby節點會對該Instance進行監聽,當Running節點出現故障後,臨時節點消失,然後Standby節點進行搶佔。這樣就達到了容災的目的。
- 與CanalClient互動:CanalClient檢測到自己負責的Instance所在的Running CanalServer後,便會進行連線,從而接收到CanalServer發來的Binlog資料。
對Binlog的訂閱以MySQL的DB為粒度,一個DB的Binlog對應了一個Kafka Topic。底層實現時,一個MySQL例項下所有訂閱的DB,都由同一個Canal Instance進行處理。這是因為Binlog的產生是以MySQL例項為粒度的。CanalServer會拋棄掉未訂閱的Binlog資料,然後CanalClient將接收到的Binlog按DB粒度分發到Kafka上。
離線還原MySQL資料
完成Binlog採集後,下一步就是利用Binlog來還原業務資料。首先要解決的第一個問題是把Binlog從Kafka同步到Hive上。
Kafka2Hive
整個Kafka2Hive任務的管理,在美團資料平臺的ETL框架下進行,包括任務原語的表達和排程機制等,都同其他ETL類似。而底層採用LinkedIn的開源專案Camus,並進行了有針對性的二次開發,來完成真正的Kafka2Hive資料傳輸工作。
對Camus的二次開發
Kafka上儲存的Binlog未帶Schema,而Hive表必須有Schema,並且其分割槽、欄位等的設計,都要便於下游的高效消費。對Camus做的第一個改造,便是將Kafka上的Binlog解析成符合目標Schema的格式。
對Camus做的第二個改造,由美團的ETL框架所決定。在我們的任務排程系統中,目前只對同調度佇列的任務做上下游依賴關係的解析,跨排程佇列是不能建立依賴關係的。而在MySQL2Hive的整個流程中,Kafka2Hive的任務需要每小時執行一次(小時佇列),Merge任務每天執行一次(天佇列)。而Merge任務的啟動必須要嚴格依賴小時Kafka2Hive任務的完成。
為了解決這一問題,我們引入了Checkdone任務。Checkdone任務是天任務,主要負責檢測前一天的Kafka2Hive是否成功完成。如果成功完成了,則Checkdone任務執行成功,這樣下游的Merge任務就可以正確啟動了。
Checkdone的檢測邏輯
Checkdone是怎樣檢測的呢?每個Kafka2Hive任務成功完成資料傳輸後,由Camus負責在相應的HDFS目錄下記錄該任務的啟動時間。Checkdone會掃描前一天的所有時間戳,如果最大的時間戳已經超過了0點,就說明前一天的Kafka2Hive任務都成功完成了,這樣Checkdone就完成了檢測。
此外,由於Camus本身只是完成了讀Kafka然後寫HDFS檔案的過程,還必須完成對Hive分割槽的載入才能使下游查詢到。因此,整個Kafka2Hive任務的最後一步是載入Hive分割槽。這樣,整個任務才算成功執行。
每個Kafka2Hive任務負責讀取一個特定的Topic,把Binlog資料寫入original_binlog庫下的一張表中,即前面圖中的original_binlog.db,其中儲存的是對應到一個MySQL DB的全部Binlog。
上圖說明了一個Kafka2Hive完成後,檔案在HDFS上的目錄結構。假如一個MySQL DB叫做user,對應的Binlog儲存在original_binlog.user表中。ready目錄中,按天儲存了當天所有成功執行的Kafka2Hive任務的啟動時間,供Checkdone使用。每張表的Binlog,被組織到一個分割槽中,例如userinfo表的Binlog,儲存在table_name=userinfo這一分割槽中。每個table_name一級分割槽下,按dt組織二級分割槽。圖中的xxx.lzo和xxx.lzo.index檔案,儲存的是經過lzo壓縮的Binlog資料。
Merge
Binlog成功入倉後,下一步要做的就是基於Binlog對MySQL資料進行還原。Merge流程做了兩件事,首先把當天生成的Binlog資料存放到Delta表中,然後和已有的存量資料做一個基於主鍵的Merge。Delta表中的資料是當天的最新資料,當一條資料在一天內發生多次變更時,Delta表中只儲存最後一次變更後的資料。
把Delta資料和存量資料進行Merge的過程中,需要有唯一鍵來判定是否是同一條資料。如果同一條資料既出現在存量表中,又出現在Delta表中,說明這一條資料發生了更新,則選取Delta表的資料作為最終結果;否則說明沒有發生任何變動,保留原來存量表中的資料作為最終結果。Merge的結果資料會Insert Overwrite到原表中,即圖中的origindb.table。
Merge流程舉例
下面用一個例子來具體說明Merge的流程。
資料表共id、value兩列,其中id是主鍵。在提取Delta資料時,對同一條資料的多次更新,只選擇最後更新的一條。所以對id=1的資料,Delta表中記錄最後一條更新後的值value=120。Delta資料和存量資料做Merge後,最終結果中,新插入一條資料(id=4),兩條資料發生了更新(id=1和id=2),一條資料未變(id=3)。
預設情況下,我們採用MySQL表的主鍵作為這一判重的唯一鍵,業務也可以根據實際情況配置不同於MySQL的唯一鍵。
上面介紹了基於Binlog的資料採集和ODS資料還原的整體架構。下面主要從兩個方面介紹我們解決的實際業務問題。
實踐一:分庫分表的支援
隨著業務規模的擴大,MySQL的分庫分表情況越來越多,很多業務的分表數目都在幾千個這樣的量級。而一般資料開發同學需要把這些資料聚合到一起進行分析。如果對每個分表都進行手動同步,再在Hive上進行聚合,這個成本很難被我們接受。因此,我們需要在ODS層就完成分表的聚合。
首先,在Binlog實時採集時,我們支援把不同DB的Binlog寫入到同一個Kafka Topic。使用者可以在申請Binlog採集時,同時勾選同一個業務邏輯下的多個物理DB。通過在Binlog採集層的彙集,所有分庫的Binlog會寫入到同一張Hive表中,這樣下游在進行Merge時,依然只需要讀取一張Hive表。
第二,Merge任務的配置支援正則匹配。通過配置符合業務分表命名規則的正則表示式,Merge任務就能瞭解自己需要聚合哪些MySQL表的Binlog,從而選取相應分割槽的資料來執行。
這樣通過兩個層面的工作,就完成了分庫分表在ODS層的合併。
這裡面有一個技術上的優化,在進行Kafka2Hive時,我們按業務分表規則對錶名進行了處理,把物理表名轉換成了邏輯表名。例如userinfo123這張表名會被轉換為userinfo,其Binlog資料儲存在original_binlog.user表的table_name=userinfo分割槽中。這樣做的目的是防止過多的HDFS小檔案和Hive分割槽造成的底層壓力。
實踐二:刪除事件的支援
Delete操作在MySQL中非常常見,由於Hive不支援Delete,如果想把MySQL中刪除的資料在Hive中刪掉,需要採用“迂迴”的方式進行。
對需要處理Delete事件的Merge流程,採用如下兩個步驟:
- 首先,提取出發生了Delete事件的資料,由於Binlog本身記錄了事件型別,這一步很容易做到。將存量資料(表A)與被刪掉的資料(表B)在主鍵上做左外連線(Left outer join),如果能夠全部join到雙方的資料,說明該條資料被刪掉了。因此,選擇結果中表B對應的記錄為NULL的資料,即是應當被保留的資料。
- 然後,對上面得到的被保留下來的資料,按照前面描述的流程做常規的Merge。
總結與展望
作為資料倉庫生產的基礎,美團資料平臺提供的基於Binlog的MySQL2Hive服務,基本覆蓋了美團內部的各個業務線,目前已經能夠滿足絕大部分業務的資料同步需求,實現DB資料準確、高效地入倉。在後面的發展中,我們會集中解決CanalManager的單點問題,並構建跨機房容災的架構,從而更加穩定地支撐業務的發展。
本文主要從Binlog流式採集和基於Binlog的ODS資料還原兩方面,介紹了這一服務的架構,並介紹了我們在實踐中遇到的一些典型問題和解決方案。希望能夠給其他開發者一些參考價值,同時也歡迎大家和我們一起交流。
招聘
如果你對我們的工作內容比較感興趣,歡迎傳送簡歷給 [email protected],和我們一起致力於解決海量資料採集和傳輸的問題中來吧!