《設計資料密集型應用》- Designing Data-Intensive Application - 第10章 批處理 讀書筆記
批處理
使用Unix工具的批處理
分析簡單日誌
-
以分析網站URL統計量並倒序排序為例
-
命令鏈與自定義程式
- 藉助SHELL命令鏈或指令碼語言均可用來分析日誌
-
排序 VS 記憶體中的聚合
- 對於上述統計網站的例子,如果URL種類很少,則使用記憶體中的聚合較合適
- GNU Coreutils(Linux)中的sort 程式通過溢位至磁碟的方式來自動應對大於記憶體的資料集,並能同時使用多個CPU核進行並行排序,可對大資料集進行處理
Unix哲學
-
統一的介面
- 使用檔案描述符在管道中進行輸入輸出互動
-
邏輯與佈線相分離
- 程式與輸入輸出管道相分離
- stdin stdout的限制:同時多輸入、多輸出困難
-
透明度和實驗
MapReduce和分散式檔案系統
常見分散式檔案系統:HDFS GlusterFS QFS
MapReduce作業執行
-
mapper reducer基本概念
-
分散式執行MapReduce
- 在MapReduce中,mapper reducer基於JAVA,Mongodb CouchDB則基於javascript
-
MapReduce工作流
Reduce端連線與分組
-
MapReduce沒有索引的概念,為何?
- 多數應用需要聚合全量資料
-
示例:分析使用者活動事件
- 如果在分析中需要關聯其它表,建議從資料庫的副本提取資料,並將其放入分散式檔案系統中,還要保證A、B表的相關記錄能夠在同一地方MapReduce
-
排序合併連線
- 概念:設有兩個表A、B,對A表的按JOIN列從小到大排序,B表同樣按JOIN列從小到大排序,以A表作為驅動表,從A表的第一行開始,在B表JOIN列找對應值,由於JOIN列已經排序,則可以避免Simple Nested-Loop每次都需要從頭開始遍歷JOIN的問題
- 在Reducer中,對採取已經進行分割槽和排序的A、B表資料進行JOIN
-
把相關資料放在一起
-
GROUP BY
-
處理傾斜
- 在PIG中,傾斜連線方法道德進行抽樣處理,確定哪些鍵是熱鍵,然後隨機地將熱鍵關聯記錄分發至幾個Reducer,但需要將資料連線另一側的資料均傳送至多個Reducer
Map端連線
-
廣播雜湊連線(Map端JOIN)
- 對於大資料集JOIN 小資料集的情況,通常將小資料集廣播至各節點的記憶體散列表中
- 或者將較小資料集儲存在本地磁碟上的只讀索引,而索引中經常使用的部分將保留在作業系統的頁面快取
-
分割槽雜湊連線
-
在HIVE中稱為bucketed map join,如果A表的bucket數量與B表的bucket數量成倍數關係,則可以使用bucketed map join,如圖所示表,加了分桶如果在user_id列上進行JOIN,則相比沒有加分桶JOIN會更高效
-
-
Map端合併連線
- 當輸入資料集體以相同的方式分割槽與相同的鍵排序,則mapper可以按鍵遞增的順序依次讀取兩個輸入檔案(雙指標遞增)進行JOIN
-
MapReduce工作流與Map端連線
批處理工作流的輸出(批處理輸出的應用)
-
建立搜尋索引
-
Hadoop Mapreduce仍然是建立Lucene索引的好方法,存在索引一旦建立便不可修改的問題
- 定期全量更新
- 增量建立索引,並在後臺非同步合併壓縮段檔案
-
-
鍵值儲存作為批處理輸出
- 構建機器學習系統,如分類器、推薦系統供前端使用者查詢
- 如果在MapReduce作業中直接基於使用者查詢的資料庫運算,則有可能會出現資料庫負載過重導致系統性能變差。
-
批處理輸出的哲學
Hadoop與分散式資料庫的對比
-
儲存多樣性
- HADOOP儲存基於HDFS,儲存檔案型別可以是文字、影象等各種資料;而MPP資料庫需要對資料和查詢模式進行仔細的前期建模。
-
處理模型多樣性
- MPP資料僅能做SQL查詢而HADOOP可進行多種處理如構建機器學習與推薦系統
- HADOOP可同時相容OLTP與MPP資料庫,如Hbase Impala
-
針對頻繁故障設計
- 批處理對故障不太敏感,即使失敗了也不會影響使用者,還可自動重試;而MPP查詢作業通常執行幾秒或幾分鐘,出現錯誤後會提示使用者重新提交查詢
- MapReduce可容忍單個Map或Reduce任務的失敗,還可從失敗的斷點恢復繼續執行。(如此設計主要來源於Google,Google的資料中心運行了線上生產服務與離線批處理作業,每個任務都有分配的CPU、記憶體與磁碟資源,其中高優先順序的任務可以終止同一臺機器上的低優先順序的任務,而低優先順序的任務可以使用叢集其它的機器資源,以提高叢集資源利用率。總結:不是因為硬體很不可靠,而是因為任意終止程序的自由有利於提高計算叢集中的資源利用率。)
- 開源叢集排程器中,YARN,Mesos或Kubernetes不支援通用優先順序搶佔
MapReduce之後
物化(持久化)中間狀態
-
Mapreduce物化的不足
- Mapreduce作業只有在前驅作業都完成後才能啟動,多節點時資料傾斜會拖慢工作流
- Mapper通常是多餘的,如果Mapper與Reducer有相同的分割槽和排序,則不用shuffle
- 運算的中間資料通常會同步至多個複製節點,會造成中間資料的冗餘
-
資料流引擎
-
如Spark Tez Flink
-
相比MapReduce的優點
-
排序僅在需要排序的地方進行,而非每個Map與Reduce階段出現
-
沒有不必要的Map任務,因為Mapper任務可以和前一Reducer任務合併
-
由於資料流所有連線與資料依賴都顯式可見,因此可以使用排程器優化,將前後依賴資料的運算放在同一臺機器上,避免網路傳輸
-
寬依賴
- 寬依賴仍然需要shuffle,無法避免網路傳輸
-
窄依賴
- 由於分割槽依賴的確定性,窄依賴的處理可以在一個執行緒內完成
-
-
運算元可以在前驅資料準備好後立即執行,無需等待前驅階段全部完成後再開始
-
多階段運算JVM無需重啟,節約開銷
-
中間狀態不落盤,運算效率高
-
-
Tez需要依賴於YARN shuffle服務來實現節點間資料的實際複製,而Spark和Flink則是包含了獨立網路通訊層,排程器,及使用者向API的大型框架
-
-
容錯
-
如果一臺機器發生故障,且該臺的中間狀態丟失,則從先前的中間狀態或原始輸入資料進行重新運算
- Spark使用RDD跟蹤資料的譜系
- Flink對運算元狀態存檔
-
當運算元中採用瞭如雜湊表迭代(不能保證輸出結果的順序性)、隨機逄法、依賴外部資料來源或時鐘,重新計算會產生不同的結果,此時需要消除這些不確定因素,如使用固定的種子生成偽隨機數
-
-
關於物化的討論
- 排序運算元一般需要等待該階段所有計算完成後才能進行下一階段
- 作業的輸入與輸出一般仍是持久化到檔案系統,而中間狀態無需寫入檔案系統
圖與迭代處理
-
Pregel處理模型
-
初始訊息分發
- 在叢集中選出一臺Master,多臺Worker
- 由Master將圖劃分為多個分割槽並分發至各Worker,每個Worker儲存所有分割槽資訊
- Master將輸入劃分多個部分,並分發給worker,如果Worker拿到的是屬於自己分割槽的資料,則更新本節點資料,否則將資料分發到對應的頂點ID上
- 當所有資料載入完成,標記節點狀態為Active狀態
-
超步運算
- 1.Master向Worker發起開始超步運算指令
- 2.各節點進行S-1步運算
- 3.各節點進行訊息傳播,訊息將會在S+1步被接收
- 4.各節點在S+1步獲取由邊傳播過來的訊息,如果沒有訊息,則置本節點狀態為Inactive
- 5.重複2-4步驟,直至所有節點均變為Inacive後將結果返回Master
-
-
容錯
- 基於每步運算完成後在磁碟上的持久化
-
並行執行
- 實踐中,頂的分配以隨機方式進行,而不按照頂點的相關性,將相關的頂點分為一組
高階API和語言
-
向宣告式查詢語言的轉變
- Hive,Spark和Flink基於代價的查詢優化器可以自動選擇聯結器(JOIN)
- 相比傳統Mapreduce,宣告式語言可以在優化器層面提升程式效能。比如如果只需要提取資料表中的某些欄位,基於Mapreduce的程式可能需要在讀取回調程式中過濾,而宣告式語言在藉助資料庫本身特性(如Hbase的列式儲存),可以在IO層面減少了訪問資料量讀取,提升效能。
-
專業化的不同領域