1. 程式人生 > 實用技巧 >數倉架構(離線和實時)--企業版

數倉架構(離線和實時)--企業版

數倉架構圖–企業版

1. 背景

  1. 數倉,這是一個並不新穎的詞語。在PC時代,就有傳統數倉,當時資料一般存放在資料庫中,一般是Oracle或者Mysql叢集中。因為那時候資料量還不是非常大,所以使用資料庫叢集就可以進行資料的儲存和查詢分析,集合前端web頁面就能做資料互動式的查詢和展示。
  2. 進入網際網路時代之後,由於移動裝置和移動通訊技術快速發展,來自移動端的資料幾何倍數增加,傳統數倉技術已經無法支撐這麼海量的資料儲存和查詢分析。現代化數倉應運而生。
    PS:
  • 移動網際網路時代,因為都是敏捷開發,為了更好研究客戶喜好,改進使用者功能和體驗,不管是web頁面還是app應用,都會將PC時代就有的使用者行為埋點功能加上,採集這些使用者行為資料日誌資訊之後,進行儲存,分析,提煉有效資訊。
  • 行為資料埋點,就是當用戶開啟軟體開始到離開軟體,中間的動作和行為做記錄,如點選一個按鈕,收藏,關閉,滑動等等。觸發一個業務行為如瀏覽某個頁面,分享等等,甚至app進入後臺,關閉app等都會被記錄下來。
  • 除了行為日誌資料之外,還有來自越來越複雜業務端的埋點資料,業務資料等也都需要進行分析處理,用來提煉有效資訊。
    大數定理,當資料樣本足夠大,就一定可以將提煉的資訊儘可能接近於事實。今日頭條的推薦演算法,美國提出的高頻交易都是突破了傳統觀念的技術方向,以資料說話,就是從資料中提煉理論,而不是先設想出一個理論再去驗證使用。只要樣本足夠完整足夠大,資料不會說謊。

2. 架構

在這裡插入圖片描述

2.1. 移動端和業務後臺做埋點統計和上傳

  1. 移動端的埋點,一般分為即時埋點和延後埋點,顧名思義,即時埋點就是觸發一個動作,就將使用者行為埋點資料上傳到日誌伺服器(一般稱之為行為日誌埋點伺服器,日誌埋點伺服器,日誌伺服器等不同叫法)。延後埋點就是,考慮到移動端網路訊號不穩定原因,一般會將埋點資料一條一條存放到移動端的檔案中,等滿足觸發條件時,再批量上傳到日誌伺服器。
    注意,移動端由於裝置效能差異性較大,網路情況複雜,一般不會設定很複雜的日誌上傳機制,並且允許有一部分資料丟失來應對移動端各種複雜情況。
  2. 業務後臺埋點,當業務後臺觸發一些行為時,也會將這些資料上傳到日誌伺服器中。這主要是考慮到進行使用者行為流程分析時,一些指標不好在移動端進行採集如訂單狀態、支付狀態、貸款授信狀態等的及時變化。

2.2 flume日誌採集

  1. 因為日誌伺服器一般是以叢集形式存在,日誌檔案存放在叢集各個節點伺服器上。這時候一般採用分散式日誌採集框架進行採集,flume只是目前比較主流的框架,還可以根據業務需求選擇其他日誌框架;如果有極致要求,甚至需要自行開發日誌採集框架。
  2. flume採集架構簡單說明
  • 在日誌採集時,一般使用tailDir Source,這個可以監控多個資料夾,還可以實現斷點續傳(會記錄偏移量)
  • 選擇channel時,一般是三種選擇,file channel、memory channel、kafka channel。

file channel,使用檔案做快取,速度相對最慢,但勝在穩定,斷電也不怕資料丟失。
memory channel,速度最快,但斷電資料就會丟失
kafka channel,適合需要將資料最後匯入kafka叢集的場景。

  • 選擇sink時,可以根據需求,如jdbc sink,hdfs sink等
  1. flume是有事務功能,體現在2個階段,source到channel,channel到sink,事務的本質就是進行一個批量操作並且記錄這個批量操作的執行狀態。flume的事務在三種事務實現理念中選擇的是at least once(另外2種是at most once, exactly once)。特別情況下,資料會有重複。
  2. flume有攔截器,選擇器,監控器。
  • 攔截器就是在資料進入source之後,可以對每條資料做過濾,攔截,打標記。
  • 選擇器就是可以根據每條資料的特徵如攔截器打上的標記對資料做有選擇分發分流。
  • 監控器,顧名思義,可以對flume做狀態監控

flume的最小採集單位是agent,一個agent中由3個模組組成,source,channel,sink。
可以將一個agent看做是一個水泵,source就是進水口,channel就是水泵緩衝空間,sink就是出水口。channel是用來平衡進水口和出水口之間的速度差異的。
agent之間可以按照需求進行級聯,組成網路。

注意,如果使用hdfs sink,則注意設定rollSize、rollInterval、rollCount來避免大量小檔案產生。

2.3 kafka 削峰平谷,解耦,非同步

  1. kafka是一個分散式訊息快取框架,內部按照topic對訊息進行儲存。為了保證資料安全,會有資料副本partition。
  2. kafka中,有producer和consumer生產者和消費者2種角色
  3. partition資料副本種,也有leader和follower區分。
  4. kafka因為有資料副本機制,為了權衡資料吞吐量和資料一致性,設計了ack機制。

當ack為0時,只要kafka收到了資料,不管有沒有寫入到記憶體或者硬碟中,都返回ok。
當ack為1時,只要所有ISR(in synchronizing replication)資料副本中有一個是寫入了資料,就返回ok。
如果ack為-1,則所有ISR資料副本都寫入成功才會返回ok。
所以,一般選擇ack為1,兼顧吞吐量和資料安全性

  1. kafka的分割槽分配策略,range(預設)、roundrobin、sticky(在roundrobin上優化,不會對分割槽做再分配)
  2. exactly once = at least once + 冪等,並不是純粹的精確一次。保證一部分資料不重複,不丟失。當kafka叢集宕機,一部分資料可能會重複。
  3. kafka的高可用和zookeeper相關,在0.9版本之前,一個topic中訊息的偏移量offset儲存在zookeeper中,這一點和跟多框架是一樣的做法。在0.9版本之後,kafka自己儲存了一個offset記錄。
  4. kafka的資料讀寫,如果涉及到磁碟,採取的是順序讀寫機制,採用Pagecache,0拷貝的方式來保證讀寫效率。
  1. pagecache:
    Kafka重度依賴底層作業系統提供的PageCache功能。當上層有寫操作時,作業系統只是將資料寫入PageCache,同時標記Page屬性為Dirty。當讀操作發生時,先從PageCache中查詢,如果發生缺頁才進行磁碟排程,最終返回需要的資料。實際上PageCache是把儘可能多的空閒記憶體都當做了磁碟快取來使用。同時如果有其他程序申請記憶體,回收PageCache的代價又很小,所以現代的OS都支援PageCache。

這裡大家應該知道,為什麼程式設計師電腦標配變成了16GB記憶體,部分程式設計師開始上32GB甚至64GB記憶體,特別程式設計師直接用伺服器級別,128GB甚至256GB記憶體的原因就在於此

  1. 0拷貝
    傳統的網路I/O操作流程,大體上分為以下4步。
    <1. OS從硬碟把資料讀到核心區的PageCache。
    <2. 使用者程序把資料從核心區Copy到使用者區。
    ❤️. 然後使用者程序再把資料寫入到Socket,資料流入核心區的Socket Buffer上。
    <4. OS再把資料從Buffer中Copy到網絡卡的Buffer上,這樣完成一次傳送。
    其中2、3兩步沒有必要,完全可以直接在核心區完成資料拷貝,0拷貝就是做這個工作的
  1. kafka事務,producer事務和consumer事務;producer事務,因為維護了一個開分割槽的全域性id,即使整個伺服器重啟,因為事務狀態儲存下來,進行中事務可以得到恢復,進而可以繼續。

2.4 sqoop 結構化資料和分散式儲存之間互相傳輸(阿里的dataX也很不錯)

  1. sqoop屬於hadoop生態,底部就是執行一個mapreduce程式進行資料傳輸,不過只有map階段,沒有reduce階段。
  2. 注意sqoop匯入匯出null的問題,因為hive的null,mysql的null儲存形式不一致,需要做sqoop的引數設定來預防。
  • 這一點其實在數倉的ODS層做資料格式規範化一樣需要注意 。有時候一個公司不同部門,對於相同型別資料定義都會千差萬別,如果跨公司合作,就更需要注意這一點。
  1. sqoop底層就是一個mapreduce程式,所以也需要注意資料傾斜問題。split-by按照某一列切分表,num-mappers啟動多少個map進行資料匯入。解決資料傾斜,一般是想辦法增加並行度,調整資料劃分規則等方法。當然,有錢可以直接加機器記憶體配置,也是ok的。
  2. 如果使用sqoop往mysql匯入資料,注意不要使用orc或者parquet檔案格式,換為text檔案格式。
  • 實際上,在大資料企業開發中,hive偏愛orc格式,在hive中,orc格式檔案讀寫查詢都是很高效率的,比parquet還高。但parquet檔案格式,各個框架對它支援更好,所以很多時候大資料開發檔案儲存一般使用parquet。
  • 為什麼不使用text,因為這個不可以壓縮,資料還是最原始方式,parquet則可以壓縮,有索引,列式儲存等各種優點。
  • sqoop中設定引數,直接將parquet轉為text,也可以使用臨時表做轉換,還可以建表時就不把需要匯出資料到mysql的表建立為parquet。

2.5 spark streaming 流式處理

  1. 不是所有業務場景都適合或者使用流式處理
  2. spark streaming是一個準實時處理框架,採用微批次概念。如果是真正的流式處理,使用flink會更加合適。
  3. 常見坑
  1. spark streaming第一次執行資料丟失
    將kafka引數的auto.offset.reset設定成earliest
  2. spark streaming精準一次消費
    ①手動維護偏移量 ②處理完業務資料後,再進行提交偏移量操作
  3. spark streaming控制每秒消費資料速度
    spark.streaming.kafka.maxRatePerPartition引數來設定
    Spark Streaming從kafka分割槽每秒拉取的條數
  4. spark streaming 背壓機制
    把spark.streaming.backpressure.enabled 引數設定為ture,開啟背壓機制後Spark Streaming會根據延遲動態去kafka消費資料,上限由spark.streaming.kafka.maxRatePerPartition引數控制,所以兩個引數一般會一起使用。其實就是更加智慧化,通過kafka輸入資料流速設定來控制spark streaming的消費速度。
  5. Spark Streaming 一個stage耗時:
    Spark Streaming stage耗時由最慢的task決定,所以資料傾斜時某個task執行慢會導致整個Spark Streaming都執行非常慢。也就是木桶理論。這也是為什麼要防止分散式執行時發生資料傾斜的原因
  6. Spark Streaming 預設分割槽個數:
    Spark Streaming預設分割槽個數與所對接的kafka topic分割槽個數一致,Spark Streaming裡一般不會使用repartition運算元增大分割槽,因為repartition會進行shuffle增加耗時
  7. Spark Streaming消費kafka資料的兩種方式:
    ①ReceiverAPI:需要一個專門的Executor去接收資料,然後傳送給其他的Executor做計算。存在的問題,接收資料的Executor和計算的Executor速度會有所不同,特別在接收資料的Executor速度大於計算的Executor速度,會導致計算資料的節點記憶體溢位
    ②DirectAPI:是由計算的Executor來主動消費Kafka的資料,速度由自身控制
  8. SparkStreaming視窗函式的原理:
    視窗函式就是在原來定義的SparkStreaming計算批次大小的基礎上再次進行封裝,每次計算多個批次的資料,同時還需要傳遞一個滑動步長的引數,用來設定當次計算任務完成之後下一次從什麼地方開始計算.

2.6 HDFS和Yarn

2.6.1 HDFS

  1. HDFS負責檔案的儲存和備份,海量資料目前主流的大資料框架還是HDFS。
  2. HDFS拿到的資料一般是按照分類,一天一個目錄,特別場景,可能還需要針對小時建立子目錄。
    注意,海量資料處理中,不管是ODS,DW層(DWD、DWS)、ADS的資料,一般都會按照天進行資料儲存,這樣方便對資料做管理。分割槽表的概念在這裡體現的很徹底
  3. HDFS小檔案處理

因為HDFS機制,namenode儲存每一個檔案的元資料資訊,這個元資料資訊是佔據一定空間150位元組,太多的小檔案會極大消耗namenode,而namenode在hdfs執行時需要載入到記憶體中來應對HDFS叢集中的快速查詢和修改。
解決辦法一般如下幾種

  1. har歸檔方式,將多個小檔案放入到一個檔案中,一般自定義inputformat,將多個小檔案資料放入sequenceFile中
  2. 採用CombineTextInputFormat,這個會將小檔案合併到一次,整體達到128MB時再切片,減少切片次數,降低maptask開啟個數。(預設是一個檔案一個maptask,而分散式計算不適合處理小檔案在於任務分發,預處理,網路傳輸就需要佔據很多時間,實際處理小檔案耗時可能小於這些耗時)
  3. 開啟jvm重用(注意有小檔案才開啟)
    Hadoop的預設配置通常是使用派生JVM來執行map和Reduce任務的。這時JVM的啟動過程可能會造成相當大的開銷,尤其是執行的job包含有成百上千task任務的情況。JVM重用可以使得JVM例項在同一個job中重新使用N次。
    缺點是,開啟JVM重用將一直佔用使用到的task插槽,以便進行重用,直到任務完成後才能釋放。如果某個“不平衡的”job中有某幾個reduce task執行的時間要比其他Reduce task消耗的時間多的多的話,那麼保留的插槽就會一直空閒著卻無法被其他的job使用,直到所有的task都結束了才會釋放。
  1. shuffle過程及其優化
  1. map端join
  2. 資料傾斜預防
    – 提前在map端combine,減少傳輸資料量
    –導致資料傾斜的key,想辦法打散,如增加隨機字串
    –增加reducer數量,增加並行度
    –做自定義分割槽器,自定定義key分割槽規則
    –加機器配置(有錢土豪專用辦法,氪金大法,一個TB記憶體,四路CPU走起)

2.6.2 Yarn

  1. yarn是一個資源排程平臺,在此之上,可以執行mapreduce,可以執行spark等程式。
  2. yarn主要角色是resource manager,node manager,app master。
  • 當一個程式想要執行在yarn上時,需要先跟resource manager提交程式
  • 然後resource manager會分配一個節點做為app master,
  • 這個app master先根據提交的程式地址資訊,把程式相關資料下載下來,然後計算程式執行所需要的資源,然後再把資源資訊傳送給resource manager,resource manager將這個程式安排進任務佇列中,然後根據叢集資源情況,安排任務執行。
  • 當資源足夠時,就會安排程式執行,這時候resource manager就會把叢集節點資訊告訴app master,然後app master跟這些節點通訊,將任務和資料資訊分發下去,然後排程這些task的執行,如MR程式,就會分為map task和reduce task執行。
  1. yarn的排程器,FIFO,容量Capacity排程器,公平排程器。
  • apache預設是容量排程器,CDH預設是公平排程器,當並行度要求高,採用公平排程器。
  • yarn任務佇列,預設是default一個。
  • 一般按照業務或者執行引擎進行佇列建立,這樣有利於進行任務優先順序管理,也可以做任務隔離,防止一個程式出錯,卡死所有任務佇列
  1. 注意,Yarn對於任務程式也是有限制的,不過可以調整。同樣的,對於硬體資源預設佔用,也有設定,預設8GB記憶體,8核心佔用。如果是高配置機器,注意調整這個引數,否則配置再高,Yarn也不會去使用。
  1. 對於硬體資源佔用上,Yarn繼承了Hadoop時代的思想,21世紀初,硬體資源如CPU和記憶體還是很寶貴,所以計算時,中間會有大量資料落地到磁碟,一是保證資料穩定性,一是降低對CPU和記憶體的佔用,保證程式在低配置叢集機器上也可以執行起來。
  2. 而2016年才開源的clickhouse已經完全不一樣了,預設就會壓榨50%一半的CPU資源,速度很快。這種思想也完全不一樣了,現在硬體資源相對便宜很多,也充裕很多了。
  3. 現在有相當一部分大資料框架往純記憶體方面發展,其中技術的基礎假定就是硬體資源主要是記憶體資源更加充裕,不用懷疑,512GB記憶體甚至1TB記憶體單節點的叢集已經有土豪公司在使用了。這樣帶來的一個影響就是,純記憶體資料庫,純記憶體大資料計算框架更加旁邊,以後也會變得更多,如impala這種框架。

2.7 Hive

  1. hive本質可以看做是一個轉換器,將sql轉換為mapreduce、tez、spark程式執行
  2. hive可以將結構化資料轉換為一張表,方便使用sql對這些資料做查詢
  3. hive的表元資料儲存在資料庫中,企業開發一般配置將元資料儲存到mysql中(為什麼不是oracle,因為窮,阿里巴巴都用不起才搞去IOE化),資料本身儲存在HDFS中。
  4. hive概念
  • 客戶端,
  • meta store(元資料存放預設在derby資料庫中,一般配置在mysql中)
  • hive轉換流程(sql編譯器,解析器,優化器,執行器)
  • 執行引擎(MR、Tez、Spark)
  • 儲存,HDFS
  1. hive和mysql比較
  • hive適合存放海量結構化資料,小批量資料反而不適合
  • mysql則適合小而美資料,功能也更加完善和強大
  • 查詢速度,hive在海量資料時快,mysql在小資料量時遙遙領先
  • mysql適合增刪改查,hive只擅長查詢
  1. hive內部表和外部表
  • 內部表,hive會將hdfs中對應資料遷移到表對應資料夾下,刪除表時,會把資料檔案一起刪除。這個適合存放業務資料
  • 外部表,hive不會講將hdfs對應資料遷移到表對應資料夾下,刪除表時,資料檔案不會受影響。
  • 實際企業開發中,一般都是建立外部表,因為資料是很重要的資產。
  1. hive小技巧
  • 使用distinct時,一般可以使用先group by 再處理的方式替代distinct
  • order by是全域性排序,使用要注意
  • sort by是每個reducer中內部先排序
  • distribute by(MR中自定義分割槽,結合sort by可以很好利用分散式處理的優點),類似先在每個分割槽排序,最後結果歸併,這樣要快很多。
  • cluster by(distribute by和sort by欄位相同時使用)
  • map join預設開啟,不要關閉
  • 行列過濾
  • 建立分割槽表,一般年月日來分割槽,查詢快
  • 合理設定map,reduce個數,這個沒有定數,看業務需求而來
  • 小檔案合併
  1. 函式
  • over 開窗
  • rank 開窗
  • row number
  • first value
  • udf自定義函式(udaf,udtf)

3. 總結

  1. 數倉並不適合每個公司,數倉適合處理海量資料,如果資料量不是很大,資料分析需求不大,可以直接使用ELK套件,或者直接買神策的服務即可。
  2. 數倉目前由於技術生態以及業務需求多變的原因,沒有一個統一的框架可以應對所有業務需求,所以數倉一般靈活架構。
  • 資料會分層
  • 技術框架一般會有多套,有適合離線業務的,有適合實時處理的,有適合智慧推薦的,有適合OLAP資料處理的,有適合自定義多維度查詢的,具體業務,具體分析。沒有一個單一框架可以應對所有業務需求
  • 技術框架比較多,框架之間版本相容也是很令人頭疼的問題,CDH應運而生,但CDH問題就是相容性沒問題,但一個是高階功能需要付費,一個是裡面框架版本相對較老。使用apache 框架,則版本需要自己摸索,可能一不小心就會遇到一個版本相容問題。
  1. 數倉只是技術,最終技術為業務服務,所以很多公司的數倉會經歷很多次技術變革來應對業務變化,沒有一層不變的技術框架,有時候公司整體業務方向調整,整個數倉技術需要重新選型都是有可能的。例如先在國內已經在逐步從離線數倉往實時數倉轉變,帶來好處就是實時性更高,壞處就是技術挑戰更大,遷移成本高昂。
  2. 大資料技術日新月異,在確立了技術框架之後,每個技術層級選用什麼框架,都是需要謹慎考量的。因為大資料部門不是獨立運作,往往需要跟業務後臺,前端,運營甚至財務部門協作,需要兼顧這些部門的業務需求以及技術框架來做技術選型,還需要充分考慮後續業務發展和變化,更重要是使用和學習門檻,不是每個公司中團隊都大牛如雲,修改原始碼輕輕鬆鬆。