1. 程式人生 > 實用技巧 >【讀書筆記】設計資料密集型應用-第三部分

【讀書筆記】設計資料密集型應用-第三部分

第10章-批處理

系統分類:

  • 服務(線上系統):服務等待客戶端的請求或指令到達。會盡快地返回結果
  • 批處理系統(離線系統):擁有大量的輸入資料,通過跑一個job來處理它,並生成輸出資料
  • 流處理系統(near-real-time system):stream job operates on events shortly after they happen, whereas a batch job operates on a fixed set of input data

使用unix工具的批處理

使用unix工具處理日誌檔案:

cat /var/log/nginx/access.log | #1
	awk '{print $7}' | #2
	sort             | #3
	uniq -c          | #4
	sort -r -n       | #5
	head -n 5          #6

使用程式處理:

counts = Hash.new(0)         # 1
File.open('/var/log/nginx/access.log') do |file| 
    file.each do |line|
        url = line.split[6]  # 2
        counts[url] += 1     # 3
    end
end

top5 = counts.map{|url, count| [count, url] }.sort.reverse[0...5] # 4
top5.each{|count, url| puts "#{count} #{url}" }                   # 5

上述兩者對比?

  • 當不同的URL數量較小時,程式處理較好。因為它讀入記憶體的hash表
  • 當資料量非常大時。命令列更好,因為sort命令會通過溢位磁碟的技術適配大資料,且能利用磁碟順序IO的效能優勢

Unix哲學

  1. 讓程式只做好一件事。新功能新程式,而非新增功能
  2. 每個程式的輸出都能成為另外一個程式的輸入。即程式只是資料的過濾器
  3. 儘早地設計構建原型
  4. 優先使用工具來減輕程式設計任務,即使是曲線救國的方式

Unix 如何實現的組合性?

統一的介面

Unix中使用檔案(fd)來表示,它只是一串有序的位元組序列。

(web中則使用URL作為介面)

分離邏輯與佈線

使用者可以以他們想要的方式連結輸入與輸出,而該程式不知道也不關心從哪裡輸入、後又輸出到哪裡

透明度和實驗

Unix 工具非常易於除錯。

Unix命令的輸入檔案通常被視為不可變。這樣你可以隨意嘗試,而不會損壞原始資料

可以在任何時候結束管道,這很便於除錯。

可以某個階段的資料輸出儲存到檔案中,並可以使用該檔案作為下一階段的輸入。

MapReduce和分散式檔案系統

MapReduce就像Unix工具一樣,不過是分佈在數千臺機器上。

MapReduce的job通過在分散式檔案系統上寫檔案來作為類似Unix中的stdin和stdout

MapReduce任務執行

處理模式:

  1. 讀取一組input檔案,並拆解成記錄(records)。
  2. 呼叫mapper函式,從每條記錄中提取一對鍵值對。
  3. 按照鍵來排序所有鍵值對(MapReduce預設執行)
  4. 呼叫reduce函式編列排序後的鍵值對。

Mapper

負責從input檔案中提取鍵值對

Reducer

MapReduce框架拉去由mapper生成的鍵值對,收集同一鍵的所有值,並在改值列表上迭代呼叫reducer

分散式執行

資料的分佈由框架層負責,基本上採用一種就近原則:即會在存有input檔案副本的節點上執行mapper,可以節省網路傳輸資料的開銷。

對於reducer,通過對鍵雜湊以確保相同鍵的鍵值對會被傳遞到同一reducer。

鍵值對必須排序,通常是按階段排序。只要mapper讀取完input檔案,並寫完排序後的output檔案後,reducer就會開始獲取output檔案。

reducer獲取檔案,合併且保留有序性,最後對記錄處理。

MapReduce工作流

如同Unix命令一樣,單個MapReduce任務作用有限,通常是多個任務組合成工作流。

MapReduce並不原生支援,而是用過input&output檔案(目錄)來實現。且預設是前一個MapReduce任務完全結束後才能開始下一個

Reduce-Side Join and Grouping

如何處理join以及group呢?(類似SQL中的)

join

例如user_info <-> user_activity_event表:

通常是獲取user_info的一個副本,並放置與user_activity_event表所在機器的檔案系統上,最後再執行任務。

Sort-meger join: 通過歸併排序的方式,reducer將兩個mapper的output合併

將相關的資料放在一起

group by

設定mapper,使得它生成的鍵值對是以目標分組鍵為鍵。隨後分割槽和排序過程將所有相同鍵的記錄傳遞到reducer。

處理傾斜資料?

由於存在熱點資料的情況(如處理明星相關的分組),會導致負載不均等問題。如何處理?

  • 預先跑一份樣本資料來判斷哪些鍵是熱點鍵
  • 顯示指定熱點鍵

然後使用隨機話的策略來減輕熱點資料的分割槽

Map-Side Join

Brocadcast hash join

兩個連線輸入之一很小,所以它並沒有分割槽,而且能被完全載入進一個雜湊表中。因此,你可以為連線輸入大端的每個分割槽啟動一個Mapper,將輸入小端的散列表載入到每個Mapper中,然後掃描大端,一次一條記錄,併為每條記錄查詢散列表。

Partitioned hash joins

如果兩個連線輸入以相同的方式分割槽(使用相同的鍵,相同的雜湊函式和相同數量的分割槽),則可以獨立地對每個分割槽應用散列表方法。

批處理工作流的輸出

  • 建立索引:批處理處理文件並輸出索引
  • 鍵值儲存:批處理處理資料並輸出鍵值對到資料庫(或者寫入檔案,資料庫程式再讀檔案)

批處理輸出的哲學

任何先前的輸出都被新的輸出完全取代,且無任何副作用。

Hadoop和分散式資料庫的對比

儲存多樣性

Hadoop會以原始的形式手機資料,後續再處理資料模型的設計。而傳統資料庫必須提前設計好資料模型

不加區分的資料儲存轉移了負擔:資料集生成者不需要強制轉換為標準格式,資料的解釋稱為消費者的問題(如文件資料庫)

處理模型多樣性

由於可以自定義編寫各種處理函式,因此可以處理各種各樣的資料模型

針對頻繁故障的設計

兩種設計思路:處理故障和使用記憶體磁碟的方式。

批處理是離線任務,故障敏感,因此不像MPP那樣終止整個查詢,而是以單個任務的粒度重試。

而且總是積極地將資料寫入磁碟,一方面是容錯另一方面是彌補能存不足

為何這樣設計?是因為故障率往往會很高,不僅是硬體錯誤,更是軟體問題(如k8s之類的優先順序任務,應用不廣)

MapReduce 之後

物化中間狀態

materialization(物化):將中間狀態寫入到檔案的過程

MapReduce通常是全量物化,缺點?

  • 任務只能在前置任務都完成後才能執行
  • mapper往往是多餘的,例如簡單地讀取reducer生成的資料
  • 資料副本針對臨時檔案往往是多餘的

工作流引擎

將工作流顯示地建模為資料從多個處理階段通過,這樣的系統成為資料流引擎(dataflow engines)

與MapReduce的不同(優點)?

  • 沒有所謂的mapper、reducer。而是將每個函式成為operators(運算元),引擎聽過各種選項來連結各個運算元
  • 排序只在必要的地方執行
  • 沒有不必要的mapper
  • 運算元可以在資料就緒後就開始,而不是等待前置任務完全完成

容錯

通過將中間狀態儲存在檔案系統上,故障時,通過其他節點的可用資料重新,若還不行則重新計算原始資料。

如何實現該機制?

  • 例如SPARK使用RDD來跟蹤資料是如何計算的(使用了哪些分析,用來哪些運算元等)

  • 必須知道計算是否是確定性的,即給出相同輸入,是否輸出相同輸出。若不確定,則下游運算元將無法處理矛盾。通常對於不確定的運算元,一般是殺死下游運算元,並重新計算

  • 針對大資料量的,則可以物化中間態。減少開銷

關於物化

類比Unix,MapReduce是將每個命令的輸出寫入檔案來實現。

而如同flink之類的,則是基於管道思想,將運算元的增量傳遞給其他運算元,而無需等待輸入完成。

圖與迭代處理

針對圖的資料模型,如何處理?

Pregel處理模型

思想:一個頂點“傳送訊息“給另一個頂點,通常這些訊息沿著圖的邊進行傳送。

每次迭代,為每個頂點呼叫一個函式,將所有傳送給它的訊息再傳遞給他。不斷迭代直到圖處理完畢(與圖處理演算法的遞迴閉包處理方式類似)

容錯

當開始下一個迭代時,前置的迭代必須完全結束,且所有訊息都必須拷貝到所有其他頂點

高階API和語言

像宣告式轉變

嘗試性地加入宣告式是可行的,應用只是簡單地說明那些連結是必須的,查詢優化器絕地如何最好地執行連結。

引入宣告式的部分,又保有原來的自定義運算元的方式,將大大提高可用性。

總結

批處理:input資料是有界的,是一個已知的,固定大小的資料集合。

而流處理則是無界的,即,你任然有一個任務,但是輸入資料是無限的