1. 程式人生 > >5、Hive 資料倉庫技術

5、Hive 資料倉庫技術

一、Hive 概念

Hive 是基於 Hadoop 的資料倉庫軟體,可以查詢和管理 PB 級別的分散式資料。資料倉庫已用多種方式定義,很難給出一種嚴格的定義。寬泛來講,資料倉庫是一種資料庫,他與單位的操作資料庫分別維護。資料倉庫系統允許將各種應用系統整合在一起,為統一的歷史資料分析提供堅實的平臺,對資訊處理提供支援。資料倉庫是一個面向主題的、整合的、時變的、非易失的資料集合,支援管理者的決策過程。

面向主題的:資料倉庫圍繞一些重要主題,如顧客、供應商、產品和銷售組織等。資料倉庫關注決策者的資料建模與分析,而不是單位的日常操作和事務處理。因此,資料倉庫通常排除對於決策無用的資料,提供特定主題的簡明檢視。

整合的:通常,構造資料倉庫是將多個異構資料來源,如關係型資料庫、一般檔案和聯機事務處理記錄整合在一起。使用資料清理和資料整合技術,確保命名約定、編碼結構、屬性度量等的一致性。

時變的:資料儲存從歷史的角度提供資訊。資料倉庫中的關鍵結構都隱式或顯式的包含時間元素。

非易失的:資料倉庫總是物理的分離存放資料,這些資料來源於操作環境下的應用資料。由於這種分離,資料倉庫不需要事務處理、恢復和併發控制機制。通常,他只需要兩種資料訪問操作:資料的初始化裝入和資料訪問。

相比於傳統資料庫,資料倉庫可以提供對於異構多源資料庫的支援,他更關注的是哪些資料是挖掘需要的,這些資料都是顯隱或帶有時間屬性的,而且基於 Hadoop 理論,他們都認為硬體是不可靠的

 

Hive 是構建在 Hadoop 上的資料倉庫軟體框架,支援使用 SQL 來讀,寫和管理大規模資料集合。Hive 入門非常簡單,功能非常強大,所以非常流行.通常來說,Hive 只支援資料查詢和載入,但後面的版本也支援了插入,更新和刪除以及流式 api。Hive 具有目前 Hadoop 上最豐富最全的 SQL 語法,也擁有最慢最穩定的執行。是目前 Hadoop 上幾乎標準的 ETL 和資料倉庫工具。Hive 這個特點與其它 AdHoc 查詢工具如 Impala,Spark SQL 或者 Presto 有著應用場景的區別,也就是雖然都是即席查詢工具,前者適用與穩定作業執行,排程以及 ETL,或者更傾向於交戶式。一個典型的場景是分析師使用 Impala 去探測資料,驗證想法,並把資料產品部署在 Hive 上執行。

在我們講 Hive 原理和查詢優化前,讓我們先回顧一下 Hadoop 基本原理。

 

Hadoop 是一個分散式系統,有 HDFS 和 Yarn。HDFS 用於執行儲存,Yarn 用於資源排程和計算。MapReduce 是跑在 Yarn 上的一種計算作業,此外還有 Spark 等。 Hive 通常意義上來說,是把一個 SQL 轉化成一個分散式作業,如 MapReduce, Spark 或者 Tez。無論 Hive 的底層執行框架是 MapReduce、Spark 還是 Tez,其原理基本都類似。而目前,由於 MapReduce 穩定,容錯性好,大量資料情況下使用磁碟,能處理的資料量大,所以目前 Hive 的主流執行框架是 MapReduce,但效能相比 Spark 和 Tez 也就較低,等下講到 Group By 和 JOIN 原理時會解釋這方面的原因。目前的 Hive 除了支援在 MapReduce 上執行,還支援在 Spark 和 Tez 上執行。我們以 MapReduce 為例來說明的 Hive 的原理。先回顧一下 MapReduce 原理。

兩個 Mapper 各自輸入一塊資料,由鍵值對構成,對它進行加工(加上了個字元n),然後按加工後的資料的鍵進行分組,相同的鍵到相同的機器。這樣的話,第一臺機器分到了鍵 nk1 和 nk3,第二臺機器分到了鍵 nk2。接下來再在這些 Reducers 上執行聚合操作(這裡執行的是是 count),輸出就是 nk1 出現了 2 次,nk3 出現了 1 次,nk2 出現了 3 次。從全域性上來看,MapReduce 就是一個分散式的 GroupBy 的過程。從上圖可以看到,Global Shuffle 左邊,兩臺機器執行的是 Map。Global Shuffle 右邊,兩臺機器執行的是 Reduce。所以 Hive,實際上就是一個編譯器,一個翻譯機。把 SQL 翻譯成 MapReduce 之類的作業。

它提供瞭如下功能:

(1)靈活方便的 ETL(extract/transform/load)

我們可以將資料倉庫理解成為是一個多資料庫資料的集合體,本身來說,資料庫是面向於業務的,資料倉庫就像上面所說,是面向於主題的,比如今天作為作為一個數據分析員,他關注的就不再是業務人員關注的問題了,而是屬性維度對於整體的影響。關於這些問題,我們已經在 Hbase 中詳細贅述了,這裡就不在多說。資料倉庫其實我們可以將其認為是一個大型的資料庫,只是這個資料庫不是給業 務人員用的,是給分析師使用的。作為分析師,我們更關注的是資料的完整度和 全面性,另外關注的就是這個資料是否是乾淨的,所謂乾淨就是指這個資料是不 是存在一些干擾或者是問題的。就好比我們去買菜,回家之後也肯定不會直接就 炒菜了,需要經過洗菜切菜的步驟。這就和資料倉庫的工作很類似,資料倉庫中 的資料是直接供給做資料分析使用的,所以資料倉庫需要將資料收集整合到本地, 並且對資料進行預處理操作,排除掉資料中的一些分析無關的資料以及一些會分 析準確度產生影響的資料。那麼為了實現資料的整合,我們就需要從各個位置收 集並且儲存資料,並且對原始資料進行預處理。關於資料的預處理操作的步驟詳 見第一章大資料基本概念那麼我們實現了資料的載入和轉換,其實這些相關的操作都依託於,我們要有足夠多型別的相關介面,由於資料倉庫中的資料並非是自己產生的,就像菜市場的菜不是在菜市場種出來的一樣,我們需要從各種型別的資料庫中將資料下載到本地。這個時候,我們就需要針對不同型別的資料庫提供連線支援,所以介面作為一個很重要的部分也被加入到了資料倉庫中。資料倉庫有了介面和多資料型別的支援之後,下一個需要解決的問題就是如何保證資料成功匯入,針對於大量資料的匯入也是一個問題,如何讓大量資料儘快匯入資料倉庫目前有兩種方式解決,第一個資料倉庫本身需要能夠承載海量資料的匯入壓力,另外一個就是外部程式實現,比如 GDS 等。

(2)多種檔案格式的元資料服務資料倉庫集成了檔案之後,另外一個問題就是如何對這些檔案進行讀取,我們之前也說過目前的資料型別非常多,結構化、非結構化、半結構化資料型別廣泛,那麼這些檔案被資料倉庫整合之後,如何能夠正常的被讀取,這就需要資料倉庫支援相關的元資料,對各種型別的資料都能夠支援訪問。

(3)直接訪問 HDFS 檔案以及 Hbase 資料在進行整合的時候,由於 HIVE 是搭載在 Hadoop 上的,本身各個元件之間就可以進行交流,如果還需要通過其他元件做轉換還會導致整體程序延遲過高,所以 Hive 支援直接讀取 HDFS 和 Hbase 中的檔案,降低整體資料整合延遲。

(4)支援 MapReduce,Tez,Spark 等多種計算引擎資料倉庫的出現並不是為了資料的儲存,而是為了更好的進行分析計算,而且我們對於不同型別的資料也有對應型別的計算引擎去做相關的分析操作,所以 Hive 對於計算引擎的支援度也應該足夠大,才能滿足業務的需求。

二、Hive 優缺點

優點:

(1) Hive 具有高可靠性和高容錯性,採用叢集模式保障可靠性,元資料節點同時也採用主備保障,並且在連線超時之後提供重試機制。

(2) 通過類 SQL 語法,易於維護。

(3) 通過自定義的儲存格式和函式,可以提供高拓展性。

(4) 對外提供多介面,可以使使用者通過多種模式呼叫。

缺點:

(1) 由於 Hive 預設使用 MapReduce 作為計算引擎,作為離線計算工具,MapReduce 的延遲較高。

(2) 雖然 Hive 提供了檢視的概念,但是還是不支援在檢視上進行操作。不支援列級別的增刪改操作(3) 當前版本還不能支援儲存過程,只能通過 UDF 來實現一些邏輯處理

三、Hive 和傳統資料倉庫比較

四、Hive 架構

Hive 分為三個角色 HiveServer、MetaStore、WebHcat。

(1) HiveServer 將使用者提交的 HQL 語句進行編譯,解析成對應的 Yarn 任務、Spark 任務或者 HDFS 操作,從而完成資料的提取、轉換、分析。

(2) MetaStore 提供元資料服務。

(3) WebHcat 對外提供基於 https 協議的元資料訪問、DDL 查詢等服務。

Hive 的服務端元件

  1. Driver 元件:該元件包括:Compiler、Optimizer、Executor,它可以將 Hive的編譯、解析、優化轉化為  MapReduce  任務提交給  Hadoop1  中的JobTracker 或者是 Hadoop2 中的 SourceManager 來進行實際的執行相應的任務。
  2. MetaStore 元件:儲存著 hive 的元資料資訊,將自己的元資料儲存到了關係型資料庫當中,支援的資料庫主要有:Mysql、Derby、支援把 metastore 獨立出來放在遠端的叢集上面,使得 hive 更加健壯。元資料主要包括了表的名稱、表的列、分割槽和屬性、表的屬性(是不是外部表等等)、表的資料所在的目錄。
  3. 使用者介面:CLI(Command Line Interface)(常用的介面:命令列模式)、
  4. Client:Hive 的客戶端使用者連線至 Hive Server ,在啟動 Client 的時候,需要制定 Hive Server 所在的節點,並且在該節點上啟動 Hive Server、WUI:通過瀏覽器的方式訪問 Hive。

 

 WebHCat 提供 Rest 介面,使使用者能夠通過安全的 HTTPS 協議執行以下操作:執行 Hive DDL 操作;執行 Hive HQL 任務; 執行 MapReduce 任務;注:當前版本暫不提供 Pig 介面。

五、Hive 資料儲存模型

資料庫:建立表時如果不指定資料庫,則預設為 default 資料庫表:物理概念,實際對應 HDFS 上的一個目錄分割槽:對應所在表所在目錄下的一個子目錄

桶:對應表或分割槽所在路徑的一個檔案傾斜資料:資料集中於個別欄位值的場景,比如按照城市分割槽時,80%的資料都來自某個大城市正常資料:不存在傾斜的資料分割槽和分桶

分割槽:資料表可以按照某個欄位的值劃分分割槽,每個分割槽是一個目錄,分割槽數量不固定,分割槽下可再有分割槽或者桶,分割槽可以很明顯的提高查詢效率桶:資料可以根據桶的方式將不同資料放入不同的桶中,每個桶是一個檔案,建表時指定桶個數,桶內可排序,資料按照某個欄位的值 Hash 後放入某個桶中,對於資料抽樣、特定 join 的優化很有意義託管表和外部表Hive 預設建立託管表,由 Hive 來管理資料,意味著 Hive 會將資料移動到資料倉庫目錄。另外一種選擇是建立外部表,這時 Hive 會到倉庫目錄以外的位置訪問資料。

 

六、增強特性

(1)Colocation 同分布

 

Colocation (同分布):將存在關聯關係的資料或可能要進行關聯操作的資料儲存在相同的儲存節點上。檔案級同分布實現檔案的快速訪問,避免了因資料搬遷帶來的大量網路開銷。

比如有兩份資料,一份為學生表(ID, name, sex),另一份資料為成績表(ID, subject, score),此時要查詢男女生平均成績分別為多少。這時必須對這兩份資料進行關聯操作(join),我們認為這兩份資料具有關聯關係。此時採用同分布特性可以減少資料移動等網路開銷,直接在本地進行關聯操作即可。

(2)HBase 記錄批量刪除

在 Hive on HBase 功能中,FusionInsight HD Hive 提供了對 HBase 表的單條資料的刪除功能,通過特定的語法,Hive 可以將 HBase 表中符合條件的一條或者多條資料清除。如果要刪除某張 HBase 表中的某些資料,可以執行 HQL 語句: remove table hbase_table where expression;其中 expression 規定要刪除資料的篩選條件。(3)流控特性通過流控特性,可以實現:當前已經建立的總連線數閾值控制; 每個使用者已經建立的連線數閾值控制; 單位時間內所建立的連線數閾值控制;

 

七、Hive 的作業編譯

Hive 作業的執行過程實際上是 SQL 翻譯成作業的過程?那麼,它是怎麼翻譯的?

一條 SQL,進入的 Hive。經過上述的過程,其實也是一個比較典型的編譯過程變成了一個作業。

(1)首先,Driver 會輸入一個字串 SQL,然後經過 Parser 變成 AST,這個變成 AST 的過程是通過 Antlr 來完成的,也就是 Anltr 根據語法檔案來將 SQL 變成AST。

(2)AST 進入 SemanticAnalyzer(核心)變成 QB,也就是所謂的 QueryBlock。一個最簡的查詢塊,通常來講,一個 From 子句會生成一個 QB。生成 QB 是一個遞迴過程,生成的QB經過 GenLogicalPlan 過程,變成了一個 Operator 圖,也是一個有向無環圖。

(3)OPDAG 經過邏輯優化器,對這個圖上的邊或者結點進行調整,順序修訂,變成了一個優化後的有向無環圖。這些優化過程可能包括謂詞下推(PredicatePushDown),分割槽剪裁(PartitionPrunner),關聯排序(JoinReorder)等等

(4)經過了邏輯優化,這個有向無環圖還要能夠執行。所以有了生成物理執行計劃的過程。GenTasks。Hive 的作法通常是碰到需要分發的地方,切上一刀,生成一道 MapReduce 作業。如 GroupBy 切一刀,Join 切一刀,DistributeBy 切一刀,Distinct 切一刀。這麼很多刀砍下去之後,剛才那個邏輯執行計劃,也就是那個邏輯有向無環圖,就被切成了很多個子圖,每個子圖構成一個結點。這些結點又連成了一個執行計劃圖,也就是 TaskTree.把這些個 TaskTree 還可以有一些優化,比如基於輸入選擇執行路徑,增加備份作業等。進行調整。這個優化就是由 PhysicalOptimizer 來完成的。經過PhysicalOptimizer,這每一個結點就是一個 MapReduce 作業或者本地作業,就可以執行了。這就是一個 SQL 如何變成 MapReduce 作業的過程。

八、Hive 的 GroupBy 和 Join 操作實現原理

Hive 最重要的部分是 Group By 和 Join。下面分別講解一下:首先是 Group By例如我們有一條 SQL 語句:INSERT INTO TABLE pageid_age_sum SELECT pageid, age, count(1) FROM pv_usersGROUP BY pageid, age;把每個網頁的閱讀數按年齡進行分組統計。由於前面介紹了,MapReduce 就是一個 Group By 的過程,這個 SQL 翻譯成 MapReduce 就是相對簡單的。

Hive 實現成 MapReduce 的原理如下:

 也就是說 Map 分發到 Reduce 的時候,會使用 pageid 和 userid 作為聯合分發鍵,再去聚合(Count),輸出結果。

介紹了這麼多原理,重點還是為了使用,為了適應場景和業務,為了優化。從原理上可以看出,當遇到 GroupBy 的查詢時,會按 GroupBy 鍵進行分發?如果鍵很多,撐爆了機器會怎麼樣?

對於 Spark,為了快,key 在記憶體中,爆是經常的。爆了就失敗了。對於 Hive, Key 在硬碟,本身就比 Impala,Spark 的處理能力大上幾萬倍。但……不幸的是,硬碟也有可能爆。

當然,硬碟速度也比記憶體慢上不少,這也是 Hive 總是被吐槽的原因,場景不同,要明白自己使用的場景。當 GroupByKey 大到連硬碟都能撐爆時……這個時候可能就需要優化了。GroupBy 優化通常有 Map 端資料聚合和傾斜資料分發兩種方式。也就是執行 SQL前先執行 sethive.map.aggr=true;它的原理是 Map 端在發到 Reduce 端之前先部分聚合一下。來減少資料量。因為我們剛才已經知道,聚合操作是在 Reduce 端完成的,只要能有效的減少 Reduce 端收到的資料量,就能有效的優化聚合速度,避免爆機,快速拿到結果。

另外一種方式則是針對傾斜的 key 做兩道作業的聚合。什麼是傾斜的資料?比如某貓雙 11 交易,華為賣了 1 億臺,蘋果賣了 10 萬臺。華為就是典型的傾斜資料了。如果要統計華為和蘋果,會用兩個 Reduce 作 GroupBy,一個處理 1 億臺,一個處理 10 萬臺,那個 1 億臺的就是傾餘。由於按 key 分發,遇到傾斜資料怎麼辦?

 

 

可以使用 hive.groupby.skewindata 選項,通過兩道 MapReduce 作業來處理。當選項設定為 true,生成的查詢計劃會有兩個 MRJob。第一個 MRJob 中,Map 的輸出結果集合會隨機分佈到 Reduce 中,每個 Reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 GroupByKey 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MRJob 再根據預處理的資料結果按照 GroupByKey 分佈到 Reduce 中(這個過程可以保證相同的 GroupByKey 被分佈到同一個 Reduce 中),最後完成最終的聚合操作。第一道作業:Map 隨機分發,按 gbykey 部分聚合第二道作業:第一道作業結果 Map 傾斜的 key 分發,按 gbkkey 進行最終聚合無論你使用 Map 端,或者兩道作業。其原理都是通過部分聚合來來減少資料量。能不能部分聚合,部分聚合能不能有效減少資料量,通常與 UDAF,也就是聚合函式有關。也就是隻對代數聚合函式有效,對整體聚合函式無效。所謂代數聚合函式,就是由部分結果可以彙總出整體結果的函式,如 count,sum。所謂整體聚合函式,就是無法由部分結果彙總出整體結果的函式,如 avg,mean。比如,sum,count,知道部分結果可以加和得到最終結果。而對於,mean,avg,知道部分資料的中位數或者平均數,是求不出整體資料的中位數和平均數的。在遇到複雜邏輯的時候,還是要具體問題具體分析,根據系統的原理,優化邏輯。剛才說了,Hive 最重要的是 GroupBy 和 Join,所以下面我們講 Join.

例如這樣一個查詢:

INSERT INTO TABLE pv_users SELECT pv.pageid ,u.age

FROM page_viewpv JOIN user u ON (pv.userid=u.userid);

把訪問和使用者表進行關聯,生成訪問使用者表。Hive 的 Join 也是通過 MapReduce 來完成的。

 就上面的查詢,在MapReduce 的 Join 的實現過程如下:

 

Map 端會分別讀入各個表的一部分資料,把這部分資料進行打標,例如 pv 表標

1,user 表標 2。

Map 讀取是分散式進行的。標完完後分發到 Reduce 端,Reduce 端根據 JoinKey,也就是關聯鍵進行分組。然後按打的標進行排序,也就是圖上的 ShuffleSort。在每一個 Reduce 分組中,Key 為 111 的在一起,也就是一臺機器上。同時,pv表的資料在這臺機器的上端,user 表的資料在這臺機器的下端。

這時候,Reduce 把 pv 表的資料讀入到記憶體裡,然後逐條與硬碟上 user 表的資料做 Join 就可以了。從這個實現可以看出,我們在寫 HiveJoin 的時候,應該儘可能把小表(分佈均勻的表)寫在左邊,大表(或傾斜表)寫在右邊。這樣可以有效利用記憶體和硬碟的關係,增強 Hive 的處理能力。同時由於使用 JoinKey 進行分發,Hive 也只支援等值 Join,不支援非等值 Join。由於 Join 和 GroupBy 一樣存在分發,所以也同樣存在著傾斜的問題。所以 Join 也要對抗傾斜資料,提升查詢執行效能。

Hive 從 0.14 開始支援 ACID。也就是支援了 UpdateInsertDelete 及一些流式的API。也就是這個原因,Hive 把 0.14.1BugFixes 版本改成了 Hive1.0,也就是認為功能基本穩定和健全了。由於 HDFS 是不支援本地檔案更改的,同時在寫的時候也不支援讀。表或者分割槽內的資料作為基礎資料。事務產生的新資料如 Insert/Update/Flume/Storm 等會儲存在增量檔案(DeltaFiles)中。讀取這個檔案的時候,通常是 TableScan 階段,會合並更改,使讀出的資料一致。HiveMetastore 上面增加若干個執行緒,會週期性地合併併合並刪除這些增量檔案。 Hive 適合做什麼?由於多年積累,Hive 比較穩定,幾乎是 Hadoop 上事實的 SQL 標準。Hive 適合離線 ETL,適合大資料離線 Ad-Hoc 查詢。適合特大規模資料集合需要精確結果的查詢。對於互動式 Ad-Hoc 查詢,通常還會有別的解決方案,比如 Impala, Presto 等等。特大規模的離線資料處理,尤其是大表關聯,特大規模資料聚集,很適合使用Hive。講了這麼多原理,最重要的還是應用,還是創造價值。對 Hive 來說,資料量再大,都不怕。資料傾斜,是大難題。但有很多優化方法和業務改進方法可以避過。Hive 執行穩定,函式多,擴充套件性強,資料吞吐量大,瞭解原理,有助於用好和選型。