【讀書筆記】設計資料密集型應用-第三部分
第10章-批處理
系統分類:
- 服務(線上系統):服務等待客戶端的請求或指令到達。會盡快地返回結果
- 批處理系統(離線系統):擁有大量的輸入資料,通過跑一個job來處理它,並生成輸出資料
- 流處理系統(near-real-time system):stream job operates on events shortly after they happen, whereas a batch job operates on a fixed set of input data
使用unix工具的批處理
使用unix工具處理日誌檔案:
cat /var/log/nginx/access.log | #1 awk '{print $7}' | #2 sort | #3 uniq -c | #4 sort -r -n | #5 head -n 5 #6
使用程式處理:
counts = Hash.new(0) # 1 File.open('/var/log/nginx/access.log') do |file| file.each do |line| url = line.split[6] # 2 counts[url] += 1 # 3 end end top5 = counts.map{|url, count| [count, url] }.sort.reverse[0...5] # 4 top5.each{|count, url| puts "#{count} #{url}" } # 5
上述兩者對比?
- 當不同的URL數量較小時,程式處理較好。因為它讀入記憶體的hash表
- 當資料量非常大時。命令列更好,因為sort命令會通過溢位磁碟的技術適配大資料,且能利用磁碟順序IO的效能優勢
Unix哲學
- 讓程式只做好一件事。新功能新程式,而非新增功能
- 每個程式的輸出都能成為另外一個程式的輸入。即程式只是資料的過濾器
- 儘早地設計構建原型
- 優先使用工具來減輕程式設計任務,即使是曲線救國的方式
Unix 如何實現的組合性?
統一的介面
Unix中使用檔案(fd)來表示,它只是一串有序的位元組序列。
(web中則使用URL作為介面)
分離邏輯與佈線
使用者可以以他們想要的方式連結輸入與輸出,而該程式不知道也不關心從哪裡輸入、後又輸出到哪裡
透明度和實驗
Unix 工具非常易於除錯。
Unix命令的輸入檔案通常被視為不可變。這樣你可以隨意嘗試,而不會損壞原始資料
可以在任何時候結束管道,這很便於除錯。
可以某個階段的資料輸出儲存到檔案中,並可以使用該檔案作為下一階段的輸入。
MapReduce和分散式檔案系統
MapReduce就像Unix工具一樣,不過是分佈在數千臺機器上。
MapReduce的job通過在分散式檔案系統上寫檔案來作為類似Unix中的stdin和stdout
MapReduce任務執行
處理模式:
- 讀取一組input檔案,並拆解成記錄(records)。
- 呼叫mapper函式,從每條記錄中提取一對鍵值對。
- 按照鍵來排序所有鍵值對(MapReduce預設執行)
- 呼叫reduce函式編列排序後的鍵值對。
Mapper
負責從input檔案中提取鍵值對
Reducer
MapReduce框架拉去由mapper生成的鍵值對,收集同一鍵的所有值,並在改值列表上迭代呼叫reducer
分散式執行
資料的分佈由框架層負責,基本上採用一種就近原則:即會在存有input檔案副本的節點上執行mapper,可以節省網路傳輸資料的開銷。
對於reducer,通過對鍵雜湊以確保相同鍵的鍵值對會被傳遞到同一reducer。
鍵值對必須排序,通常是按階段排序。只要mapper讀取完input檔案,並寫完排序後的output檔案後,reducer就會開始獲取output檔案。
reducer獲取檔案,合併且保留有序性,最後對記錄處理。
MapReduce工作流
如同Unix命令一樣,單個MapReduce任務作用有限,通常是多個任務組合成工作流。
MapReduce並不原生支援,而是用過input&output檔案(目錄)來實現。且預設是前一個MapReduce任務完全結束後才能開始下一個
Reduce-Side Join and Grouping
如何處理join以及group呢?(類似SQL中的)
join
例如user_info <-> user_activity_event表:
通常是獲取user_info的一個副本,並放置與user_activity_event表所在機器的檔案系統上,最後再執行任務。
Sort-meger join: 通過歸併排序的方式,reducer將兩個mapper的output合併
將相關的資料放在一起
group by
設定mapper,使得它生成的鍵值對是以目標分組鍵為鍵。隨後分割槽和排序過程將所有相同鍵的記錄傳遞到reducer。
處理傾斜資料?
由於存在熱點資料的情況(如處理明星相關的分組),會導致負載不均等問題。如何處理?
- 預先跑一份樣本資料來判斷哪些鍵是熱點鍵
- 顯示指定熱點鍵
然後使用隨機話的策略來減輕熱點資料的分割槽
Map-Side Join
Brocadcast hash join
兩個連線輸入之一很小,所以它並沒有分割槽,而且能被完全載入進一個雜湊表中。因此,你可以為連線輸入大端的每個分割槽啟動一個Mapper,將輸入小端的散列表載入到每個Mapper中,然後掃描大端,一次一條記錄,併為每條記錄查詢散列表。
Partitioned hash joins
如果兩個連線輸入以相同的方式分割槽(使用相同的鍵,相同的雜湊函式和相同數量的分割槽),則可以獨立地對每個分割槽應用散列表方法。
批處理工作流的輸出
- 建立索引:批處理處理文件並輸出索引
- 鍵值儲存:批處理處理資料並輸出鍵值對到資料庫(或者寫入檔案,資料庫程式再讀檔案)
批處理輸出的哲學
任何先前的輸出都被新的輸出完全取代,且無任何副作用。
Hadoop和分散式資料庫的對比
儲存多樣性
Hadoop會以原始的形式手機資料,後續再處理資料模型的設計。而傳統資料庫必須提前設計好資料模型
不加區分的資料儲存轉移了負擔:資料集生成者不需要強制轉換為標準格式,資料的解釋稱為消費者的問題(如文件資料庫)
處理模型多樣性
由於可以自定義編寫各種處理函式,因此可以處理各種各樣的資料模型
針對頻繁故障的設計
兩種設計思路:處理故障和使用記憶體磁碟的方式。
批處理是離線任務,故障敏感,因此不像MPP那樣終止整個查詢,而是以單個任務的粒度重試。
而且總是積極地將資料寫入磁碟,一方面是容錯另一方面是彌補能存不足
為何這樣設計?是因為故障率往往會很高,不僅是硬體錯誤,更是軟體問題(如k8s之類的優先順序任務,應用不廣)
MapReduce 之後
物化中間狀態
materialization(物化):將中間狀態寫入到檔案的過程
MapReduce通常是全量物化,缺點?
- 任務只能在前置任務都完成後才能執行
- mapper往往是多餘的,例如簡單地讀取reducer生成的資料
- 資料副本針對臨時檔案往往是多餘的
工作流引擎
將工作流顯示地建模為資料從多個處理階段通過,這樣的系統成為資料流引擎(dataflow engines)
與MapReduce的不同(優點)?
- 沒有所謂的mapper、reducer。而是將每個函式成為operators(運算元),引擎聽過各種選項來連結各個運算元
- 排序只在必要的地方執行
- 沒有不必要的mapper
- 運算元可以在資料就緒後就開始,而不是等待前置任務完全完成
容錯
通過將中間狀態儲存在檔案系統上,故障時,通過其他節點的可用資料重新,若還不行則重新計算原始資料。
如何實現該機制?
-
例如SPARK使用RDD來跟蹤資料是如何計算的(使用了哪些分析,用來哪些運算元等)
-
必須知道計算是否是確定性的,即給出相同輸入,是否輸出相同輸出。若不確定,則下游運算元將無法處理矛盾。通常對於不確定的運算元,一般是殺死下游運算元,並重新計算
-
針對大資料量的,則可以物化中間態。減少開銷
關於物化
類比Unix,MapReduce是將每個命令的輸出寫入檔案來實現。
而如同flink之類的,則是基於管道思想,將運算元的增量傳遞給其他運算元,而無需等待輸入完成。
圖與迭代處理
針對圖的資料模型,如何處理?
Pregel處理模型
思想:一個頂點“傳送訊息“給另一個頂點,通常這些訊息沿著圖的邊進行傳送。
每次迭代,為每個頂點呼叫一個函式,將所有傳送給它的訊息再傳遞給他。不斷迭代直到圖處理完畢(與圖處理演算法的遞迴閉包處理方式類似)
容錯
當開始下一個迭代時,前置的迭代必須完全結束,且所有訊息都必須拷貝到所有其他頂點
高階API和語言
像宣告式轉變
嘗試性地加入宣告式是可行的,應用只是簡單地說明那些連結是必須的,查詢優化器絕地如何最好地執行連結。
引入宣告式的部分,又保有原來的自定義運算元的方式,將大大提高可用性。
總結
批處理:input資料是有界的,是一個已知的,固定大小的資料集合。
而流處理則是無界的,即,你任然有一個任務,但是輸入資料是無限的